001/*
002 * Copyright (c) 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.ecm.core.storage.mongodb;
013
014import static java.lang.Boolean.TRUE;
015import static org.nuxeo.ecm.core.storage.State.NOP;
016import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
017import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
018import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
019import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
020import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
038
039import java.io.Serializable;
040import java.lang.reflect.Array;
041import java.net.UnknownHostException;
042import java.util.ArrayList;
043import java.util.Arrays;
044import java.util.Calendar;
045import java.util.Date;
046import java.util.HashSet;
047import java.util.List;
048import java.util.Map;
049import java.util.Map.Entry;
050import java.util.Set;
051import java.util.UUID;
052
053import org.apache.commons.lang.StringUtils;
054import org.apache.commons.logging.Log;
055import org.apache.commons.logging.LogFactory;
056import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
057import org.nuxeo.ecm.core.api.DocumentNotFoundException;
058import org.nuxeo.ecm.core.api.Lock;
059import org.nuxeo.ecm.core.api.NuxeoException;
060import org.nuxeo.ecm.core.api.PartialList;
061import org.nuxeo.ecm.core.api.model.Delta;
062import org.nuxeo.ecm.core.blob.BlobManager;
063import org.nuxeo.ecm.core.model.LockManager;
064import org.nuxeo.ecm.core.model.Repository;
065import org.nuxeo.ecm.core.query.sql.model.Expression;
066import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
067import org.nuxeo.ecm.core.query.sql.model.SelectClause;
068import org.nuxeo.ecm.core.storage.State;
069import org.nuxeo.ecm.core.storage.State.ListDiff;
070import org.nuxeo.ecm.core.storage.State.StateDiff;
071import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
072import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
073import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
074import org.nuxeo.runtime.api.Framework;
075
076import com.mongodb.BasicDBObject;
077import com.mongodb.DB;
078import com.mongodb.DBCollection;
079import com.mongodb.DBCursor;
080import com.mongodb.DBObject;
081import com.mongodb.MongoClient;
082import com.mongodb.MongoClientURI;
083import com.mongodb.QueryOperators;
084import com.mongodb.ServerAddress;
085import com.mongodb.WriteResult;
086
087/**
088 * MongoDB implementation of a {@link Repository}.
089 *
090 * @since 5.9.4
091 */
092public class MongoDBRepository extends DBSRepositoryBase {
093
094    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
095
096    private static final Long ZERO = Long.valueOf(0);
097
098    private static final Long ONE = Long.valueOf(1);
099
100    private static final Long MINUS_ONE = Long.valueOf(-11);
101
102    public static final String DB_DEFAULT = "nuxeo";
103
104    public static final String MONGODB_ID = "_id";
105
106    public static final String MONGODB_INC = "$inc";
107
108    public static final String MONGODB_SET = "$set";
109
110    public static final String MONGODB_UNSET = "$unset";
111
112    public static final String MONGODB_PUSH = "$push";
113
114    public static final String MONGODB_EACH = "$each";
115
116    public static final String MONGODB_META = "$meta";
117
118    public static final String MONGODB_TEXT_SCORE = "textScore";
119
120    private static final String MONGODB_INDEX_TEXT = "text";
121
122    private static final String MONGODB_INDEX_NAME = "name";
123
124    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
125
126    private static final String FULLTEXT_INDEX_NAME = "fulltext";
127
128    private static final String LANGUAGE_FIELD = "__language";
129
130    protected static final String COUNTER_NAME_UUID = "ecm:id";
131
132    protected static final String COUNTER_FIELD = "seq";
133
134    protected MongoClient mongoClient;
135
136    protected DBCollection coll;
137
138    protected DBCollection countersColl;
139
140    public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) {
141        super(descriptor.name, descriptor.getFulltextDisabled());
142        try {
143            mongoClient = newMongoClient(descriptor);
144            coll = getCollection(descriptor, mongoClient);
145            countersColl = getCountersCollection(descriptor, mongoClient);
146        } catch (UnknownHostException e) {
147            throw new RuntimeException(e);
148        }
149        initRepository();
150    }
151
152    @Override
153    public void shutdown() {
154        super.shutdown();
155        mongoClient.close();
156    }
157
158    // used also by unit tests
159    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
160        String server = descriptor.server;
161        if (StringUtils.isBlank(server)) {
162            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
163        }
164        if (server.startsWith("mongodb://")) {
165            // allow mongodb:// URI syntax for the server, to pass everything in one string
166            return new MongoClient(new MongoClientURI(server));
167        } else {
168            return new MongoClient(new ServerAddress(server));
169        }
170    }
171
172    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
173        if (StringUtils.isBlank(dbname)) {
174            dbname = DB_DEFAULT;
175        }
176        DB db = mongoClient.getDB(dbname);
177        return db.getCollection(collection);
178    }
179
180    // used also by unit tests
181    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
182        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
183    }
184
185    // used also by unit tests
186    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
187        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
188    }
189
190    protected Object valueToBson(Object value) {
191        if (value instanceof State) {
192            return stateToBson((State) value);
193        } else if (value instanceof List) {
194            @SuppressWarnings("unchecked")
195            List<Object> values = (List<Object>) value;
196            return listToBson(values);
197        } else if (value instanceof Object[]) {
198            return listToBson(Arrays.asList((Object[]) value));
199        } else {
200            return serializableToBson(value);
201        }
202    }
203
204    protected DBObject stateToBson(State state) {
205        DBObject ob = new BasicDBObject();
206        for (Entry<String, Serializable> en : state.entrySet()) {
207            Object val = valueToBson(en.getValue());
208            if (val != null) {
209                ob.put(en.getKey(), val);
210            }
211        }
212        return ob;
213    }
214
215    protected List<Object> listToBson(List<Object> values) {
216        ArrayList<Object> objects = new ArrayList<Object>(values.size());
217        for (Object value : values) {
218            objects.add(valueToBson(value));
219        }
220        return objects;
221    }
222
223    protected State bsonToState(DBObject ob) {
224        if (ob == null) {
225            return null;
226        }
227        State state = new State(ob.keySet().size());
228        for (String key : ob.keySet()) {
229            Object val = ob.get(key);
230            Serializable value;
231            if (val instanceof List) {
232                @SuppressWarnings("unchecked")
233                List<Object> list = (List<Object>) val;
234                if (list.isEmpty()) {
235                    value = null;
236                } else {
237                    if (list.get(0) instanceof DBObject) {
238                        List<Serializable> l = new ArrayList<>(list.size());
239                        for (Object el : list) {
240                            l.add(bsonToState((DBObject) el));
241                        }
242                        value = (Serializable) l;
243                    } else {
244                        // turn the list into a properly-typed array
245                        Class<?> klass = Object.class;
246                        for (Object o : list) {
247                            if (o != null) {
248                                klass = o.getClass();
249                                break;
250                            }
251                        }
252                        Object[] ar = (Object[]) Array.newInstance(klass, list.size());
253                        int i = 0;
254                        for (Object el : list) {
255                            ar[i++] = scalarToSerializable(el);
256                        }
257                        value = ar;
258                    }
259                }
260            } else if (val instanceof DBObject) {
261                value = bsonToState((DBObject) val);
262            } else {
263                if (MONGODB_ID.equals(key)) {
264                    // skip ObjectId
265                    continue;
266                }
267                value = scalarToSerializable(val);
268            }
269            state.put(key, value);
270        }
271        return state;
272    }
273
274    public static class Updates {
275        public BasicDBObject set = new BasicDBObject();
276
277        public BasicDBObject unset = new BasicDBObject();
278
279        public BasicDBObject push = new BasicDBObject();
280
281        public BasicDBObject inc = new BasicDBObject();
282    }
283
284    /**
285     * Constructs a list of MongoDB updates from the given {@link StateDiff}.
286     * <p>
287     * We need a list because some cases need two operations to avoid conflicts.
288     */
289    protected List<DBObject> diffToBson(StateDiff diff) {
290        Updates updates = new Updates();
291        diffToUpdates(diff, null, updates);
292        UpdateListBuilder builder = new UpdateListBuilder();
293        for (Entry<String, Object> en : updates.set.entrySet()) {
294            builder.update(MONGODB_SET, en.getKey(), en.getValue());
295        }
296        for (Entry<String, Object> en : updates.unset.entrySet()) {
297            builder.update(MONGODB_UNSET, en.getKey(), en.getValue());
298        }
299        for (Entry<String, Object> en : updates.push.entrySet()) {
300            builder.update(MONGODB_PUSH, en.getKey(), en.getValue());
301        }
302        for (Entry<String, Object> en : updates.inc.entrySet()) {
303            builder.update(MONGODB_INC, en.getKey(), en.getValue());
304        }
305        return builder.updateList;
306    }
307
308    /**
309     * Update list builder to prevent several updates of the same field.
310     * <p>
311     * This happens if two operations act on two fields where one is a prefix of the other.
312     * <p>
313     * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837)
314     *
315     * @since 5.9.5
316     */
317    protected static class UpdateListBuilder {
318
319        protected List<DBObject> updateList = new ArrayList<>(1);
320
321        protected DBObject update;
322
323        protected List<String> keys;
324
325        protected UpdateListBuilder() {
326            newUpdate();
327        }
328
329        protected void newUpdate() {
330            updateList.add(update = new BasicDBObject());
331            keys = new ArrayList<>();
332        }
333
334        protected void update(String op, String key, Object value) {
335            if (conflicts(key, keys)) {
336                newUpdate();
337            }
338            keys.add(key);
339            DBObject map = (DBObject) update.get(op);
340            if (map == null) {
341                update.put(op, map = new BasicDBObject());
342            }
343            map.put(key, value);
344        }
345
346        /**
347         * Checks if the key conflicts with one of the previous keys.
348         * <p>
349         * A conflict occurs if one key is equals to or is a prefix of the other.
350         */
351        protected boolean conflicts(String key, List<String> previousKeys) {
352            String keydot = key + '.';
353            for (String prev : previousKeys) {
354                if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) {
355                    return true;
356                }
357            }
358            return false;
359        }
360    }
361
362    protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) {
363        String elemPrefix = prefix == null ? "" : prefix + '.';
364        for (Entry<String, Serializable> en : diff.entrySet()) {
365            String name = elemPrefix + en.getKey();
366            Serializable value = en.getValue();
367            if (value instanceof StateDiff) {
368                diffToUpdates((StateDiff) value, name, updates);
369            } else if (value instanceof ListDiff) {
370                diffToUpdates((ListDiff) value, name, updates);
371            } else if (value instanceof Delta) {
372                diffToUpdates((Delta) value, name, updates);
373            } else {
374                // not a diff
375                updates.set.put(name, valueToBson(value));
376            }
377        }
378    }
379
380    protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) {
381        if (listDiff.diff != null) {
382            String elemPrefix = prefix == null ? "" : prefix + '.';
383            int i = 0;
384            for (Object value : listDiff.diff) {
385                String name = elemPrefix + i;
386                if (value instanceof StateDiff) {
387                    diffToUpdates((StateDiff) value, name, updates);
388                } else if (value != NOP) {
389                    // set value
390                    updates.set.put(name, valueToBson(value));
391                }
392                i++;
393            }
394        }
395        if (listDiff.rpush != null) {
396            Object pushed;
397            if (listDiff.rpush.size() == 1) {
398                // no need to use $each for one element
399                pushed = valueToBson(listDiff.rpush.get(0));
400            } else {
401                pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush));
402            }
403            updates.push.put(prefix, pushed);
404        }
405    }
406
407    protected void diffToUpdates(Delta delta, String prefix, Updates updates) {
408        Object inc = valueToBson(delta.getDeltaValue());
409        updates.inc.put(prefix, inc);
410    }
411
412    protected Object serializableToBson(Object value) {
413        if (value instanceof Calendar) {
414            return ((Calendar) value).getTime();
415        }
416        return value;
417    }
418
419    protected Serializable scalarToSerializable(Object val) {
420        if (val instanceof Date) {
421            Calendar cal = Calendar.getInstance();
422            cal.setTime((Date) val);
423            return cal;
424        }
425        return (Serializable) val;
426    }
427
428    protected void initRepository() {
429        // create required indexes
430        // code does explicit queries on those
431        coll.createIndex(new BasicDBObject(KEY_ID, ONE));
432        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
433        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
434        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
435        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
436        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
437        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
438        DBObject parentChild = new BasicDBObject();
439        parentChild.put(KEY_PARENT_ID, ONE);
440        parentChild.put(KEY_NAME, ONE);
441        coll.createIndex(parentChild);
442        // often used in user-generated queries
443        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
444        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
445        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
446        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
447        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
448        // TODO configure these from somewhere else
449        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
450        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
451        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
452        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
453        if (!fulltextDisabled) {
454            DBObject indexKeys = new BasicDBObject();
455            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
456            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
457            DBObject indexOptions = new BasicDBObject();
458            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
459            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
460            coll.createIndex(indexKeys, indexOptions);
461        }
462        // check root presence
463        DBObject query = new BasicDBObject(KEY_ID, getRootId());
464        if (coll.findOne(query, justPresenceField()) != null) {
465            return;
466        }
467        // create basic repository structure needed
468        if (DEBUG_UUIDS) {
469            // create the id counter
470            DBObject idCounter = new BasicDBObject();
471            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
472            idCounter.put(COUNTER_FIELD, ZERO);
473            countersColl.insert(idCounter);
474        }
475        initRoot();
476    }
477
478    protected Long getNextUuidSeq() {
479        DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
480        DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE));
481        boolean returnNew = true;
482        DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false);
483        if (idCounter == null) {
484            throw new RuntimeException("Repository id counter not initialized");
485        }
486        return (Long) idCounter.get(COUNTER_FIELD);
487    }
488
489    @Override
490    public String generateNewId() {
491        if (DEBUG_UUIDS) {
492            Long id = getNextUuidSeq();
493            return "UUID_" + id;
494        } else {
495            return UUID.randomUUID().toString();
496        }
497    }
498
499    @Override
500    public void createState(State state) {
501        DBObject ob = stateToBson(state);
502        if (log.isTraceEnabled()) {
503            log.trace("MongoDB: CREATE " + ob);
504        }
505        coll.insert(ob);
506        // TODO dupe exception
507        // throw new DocumentException("Already exists: " + id);
508    }
509
510    @Override
511    public State readState(String id) {
512        DBObject query = new BasicDBObject(KEY_ID, id);
513        return findOne(query);
514    }
515
516    @Override
517    public List<State> readStates(List<String> ids) {
518        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
519        return findAll(query, ids.size());
520    }
521
522    @Override
523    public void updateState(String id, StateDiff diff) {
524        DBObject query = new BasicDBObject(KEY_ID, id);
525        for (DBObject update : diffToBson(diff)) {
526            if (log.isTraceEnabled()) {
527                log.trace("MongoDB: UPDATE " + id + ": " + update);
528            }
529            coll.update(query, update);
530            // TODO dupe exception
531            // throw new DocumentException("Missing: " + id);
532        }
533    }
534
535    @Override
536    public void deleteStates(Set<String> ids) {
537        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
538        if (log.isTraceEnabled()) {
539            log.trace("MongoDB: REMOVE " + ids);
540        }
541        WriteResult w = coll.remove(query);
542        if (w.getN() != ids.size()) {
543            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
544        }
545    }
546
547    @Override
548    public State readChildState(String parentId, String name, Set<String> ignored) {
549        DBObject query = getChildQuery(parentId, name, ignored);
550        return findOne(query);
551    }
552
553    protected void logQuery(String id, DBObject fields) {
554        logQuery(new BasicDBObject(KEY_ID, id), fields);
555    }
556
557    protected void logQuery(DBObject query, DBObject fields) {
558        if (fields == null) {
559            log.trace("MongoDB: QUERY " + query);
560        } else {
561            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
562        }
563    }
564
565    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
566        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
567                + " OFFSET " + offset + " LIMIT " + limit);
568    }
569
570    @Override
571    public boolean hasChild(String parentId, String name, Set<String> ignored) {
572        DBObject query = getChildQuery(parentId, name, ignored);
573        if (log.isTraceEnabled()) {
574            logQuery(query, justPresenceField());
575        }
576        return coll.findOne(query, justPresenceField()) != null;
577    }
578
579    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
580        DBObject query = new BasicDBObject();
581        query.put(KEY_PARENT_ID, parentId);
582        query.put(KEY_NAME, name);
583        addIgnoredIds(query, ignored);
584        return query;
585    }
586
587    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
588        if (!ignored.isEmpty()) {
589            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
590            query.put(KEY_ID, notInIds);
591        }
592    }
593
594    @Override
595    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
596        DBObject query = new BasicDBObject(key, value);
597        addIgnoredIds(query, ignored);
598        return findAll(query, 0);
599    }
600
601    @Override
602    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
603        DBObject query = new BasicDBObject(key1, value1);
604        query.put(key2, value2);
605        addIgnoredIds(query, ignored);
606        return findAll(query, 0);
607    }
608
609    @Override
610    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
611            Map<String, Object[]> targetProxies) {
612        DBObject query = new BasicDBObject(key, value);
613        DBObject fields = new BasicDBObject();
614        fields.put(MONGODB_ID, ZERO);
615        fields.put(KEY_ID, ONE);
616        fields.put(KEY_IS_PROXY, ONE);
617        fields.put(KEY_PROXY_TARGET_ID, ONE);
618        fields.put(KEY_PROXY_IDS, ONE);
619        if (log.isTraceEnabled()) {
620            logQuery(query, fields);
621        }
622        DBCursor cursor = coll.find(query, fields);
623        try {
624            for (DBObject ob : cursor) {
625                String id = (String) ob.get(KEY_ID);
626                ids.add(id);
627                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
628                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
629                    proxyTargets.put(id, targetId);
630                }
631                if (targetProxies != null) {
632                    Object[] proxyIds = (Object[]) ob.get(KEY_PROXY_IDS);
633                    if (proxyIds != null) {
634                        targetProxies.put(id, proxyIds);
635                    }
636                }
637            }
638        } finally {
639            cursor.close();
640        }
641    }
642
643    @Override
644    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
645        DBObject query = new BasicDBObject(key, value);
646        addIgnoredIds(query, ignored);
647        if (log.isTraceEnabled()) {
648            logQuery(query, justPresenceField());
649        }
650        return coll.findOne(query, justPresenceField()) != null;
651    }
652
653    protected State findOne(DBObject query) {
654        if (log.isTraceEnabled()) {
655            logQuery(query, null);
656        }
657        return bsonToState(coll.findOne(query));
658    }
659
660    protected List<State> findAll(DBObject query, int sizeHint) {
661        if (log.isTraceEnabled()) {
662            logQuery(query, null);
663        }
664        DBCursor cursor = coll.find(query);
665        Set<String> seen = new HashSet<>();
666        try {
667            List<State> list = new ArrayList<>(sizeHint);
668            for (DBObject ob : cursor) {
669                if (!seen.add((String) ob.get(KEY_ID))) {
670                    // MongoDB cursors may return the same
671                    // object several times
672                    continue;
673                }
674                list.add(bsonToState(ob));
675            }
676            return list;
677        } finally {
678            cursor.close();
679        }
680    }
681
682    protected DBObject justPresenceField() {
683        return new BasicDBObject(MONGODB_ID, ONE);
684    }
685
686    @Override
687    public PartialList<State> queryAndFetch(Expression expression, SelectClause selectClause, OrderByClause orderByClause,
688            int limit, int offset, int countUpTo, DBSExpressionEvaluator evaluator, boolean deepCopy) {
689        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(expression, selectClause, orderByClause,
690                evaluator.pathResolver);
691        builder.walk();
692        if (builder.hasFulltext && fulltextDisabled) {
693            throw new RuntimeException("Fulltext disabled by configuration");
694        }
695        DBObject query = builder.getQuery();
696        addPrincipals(query, evaluator.principals);
697        DBObject orderBy = builder.getOrderBy();
698        DBObject keys = builder.getProjection();
699
700        if (log.isTraceEnabled()) {
701            logQuery(query, keys, orderBy, limit, offset);
702        }
703
704        List<State> list;
705        long totalSize;
706        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
707        try {
708            if (orderBy != null) {
709                cursor = cursor.sort(orderBy);
710            }
711            list = new ArrayList<>();
712            for (DBObject ob : cursor) {
713                list.add(bsonToState(ob));
714            }
715            if (countUpTo == -1) {
716                // count full size
717                if (limit == 0) {
718                    totalSize = list.size();
719                } else {
720                    totalSize = cursor.count();
721                }
722            } else if (countUpTo == 0) {
723                // no count
724                totalSize = -1; // not counted
725            } else {
726                // count only if less than countUpTo
727                if (limit == 0) {
728                    totalSize = list.size();
729                } else {
730                    totalSize = cursor.copy().limit(countUpTo + 1).count();
731                }
732                if (totalSize > countUpTo) {
733                    totalSize = -2; // truncated
734                }
735            }
736        } finally {
737            cursor.close();
738        }
739        if (log.isTraceEnabled() && list.size() != 0) {
740            log.trace("MongoDB:    -> " + list.size());
741        }
742        return new PartialList<>(list, totalSize);
743    }
744
745    protected void addPrincipals(DBObject query, Set<String> principals) {
746        if (principals != null) {
747            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
748            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
749        }
750    }
751
752    /** Keys used for document projection when marking all binaries for GC. */
753    protected DBObject binaryKeys;
754
755    @Override
756    protected void initBlobsPaths() {
757        MongoDBBlobFinder finder = new MongoDBBlobFinder();
758        finder.visit();
759        binaryKeys = finder.binaryKeys;
760    }
761
762    protected static class MongoDBBlobFinder extends BlobFinder {
763        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
764
765        @Override
766        protected void recordBlobPath() {
767            path.addLast(KEY_BLOB_DATA);
768            binaryKeys.put(StringUtils.join(path, "."), ONE);
769            path.removeLast();
770        }
771    }
772
773    @Override
774    public void markReferencedBinaries() {
775        BlobManager blobManager = Framework.getService(BlobManager.class);
776        // TODO add a query to not scan all documents
777        if (log.isTraceEnabled()) {
778            logQuery(new BasicDBObject(), binaryKeys);
779        }
780        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
781        try {
782            for (DBObject ob : cursor) {
783                markReferencedBinaries(ob, blobManager);
784            }
785        } finally {
786            cursor.close();
787        }
788    }
789
790    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
791        for (String key : ob.keySet()) {
792            Object value = ob.get(key);
793            if (value instanceof List) {
794                @SuppressWarnings("unchecked")
795                List<Object> list = (List<Object>) value;
796                for (Object v : list) {
797                    if (v instanceof DBObject) {
798                        markReferencedBinaries((DBObject) v, blobManager);
799                    } else {
800                        markReferencedBinary(v, blobManager);
801                    }
802                }
803            } else if (value instanceof Object[]) {
804                for (Object v : (Object[]) value) {
805                    markReferencedBinary(v, blobManager);
806                }
807            } else if (value instanceof DBObject) {
808                markReferencedBinaries((DBObject) value, blobManager);
809            } else {
810                markReferencedBinary(value, blobManager);
811            }
812        }
813    }
814
815    protected void markReferencedBinary(Object value, BlobManager blobManager) {
816        if (!(value instanceof String)) {
817            return;
818        }
819        String key = (String) value;
820        blobManager.markReferencedBinary(key, repositoryName);
821    }
822
823    protected static final DBObject LOCK_FIELDS;
824
825    static {
826        LOCK_FIELDS = new BasicDBObject();
827        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
828        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
829    }
830
831    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
832
833    @Override
834    public Lock getLock(String id) {
835        if (log.isTraceEnabled()) {
836            logQuery(id, LOCK_FIELDS);
837        }
838        DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
839        if (res == null) {
840            // document not found
841            throw new DocumentNotFoundException(id);
842        }
843        String owner = (String) res.get(KEY_LOCK_OWNER);
844        if (owner == null) {
845            // not locked
846            return null;
847        }
848        Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED));
849        return new Lock(owner, created);
850    }
851
852    @Override
853    public Lock setLock(String id, Lock lock) {
854        DBObject query = new BasicDBObject(KEY_ID, id);
855        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
856        DBObject setLock = new BasicDBObject();
857        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
858        setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated()));
859        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
860        if (log.isTraceEnabled()) {
861            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
862        }
863        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
864        if (res != null) {
865            // found a doc to lock
866            return null;
867        } else {
868            // doc not found, or lock owner already set
869            // get the old lock
870            if (log.isTraceEnabled()) {
871                logQuery(id, LOCK_FIELDS);
872            }
873            DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
874            if (old == null) {
875                // document not found
876                throw new DocumentNotFoundException(id);
877            }
878            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
879            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
880            if (oldOwner != null) {
881                return new Lock(oldOwner, oldCreated);
882            }
883            // no lock -- there was a race condition
884            // TODO do better
885            throw new ConcurrentUpdateException("Lock " + id);
886        }
887    }
888
889    @Override
890    public Lock removeLock(String id, String owner) {
891        DBObject query = new BasicDBObject(KEY_ID, id);
892        if (owner != null) {
893            // remove if owner matches or null
894            // implements LockManager.canLockBeRemoved inside MongoDB
895            Object ownerOrNull = Arrays.asList(owner, null);
896            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
897        } // else unconditional remove
898        // remove the lock
899        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
900        if (old != null) {
901            // found a doc and removed the lock, return previous lock
902            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
903            if (oldOwner == null) {
904                // was not locked
905                return null;
906            } else {
907                // return previous lock
908                Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
909                return new Lock(oldOwner, oldCreated);
910            }
911        } else {
912            // doc not found, or lock owner didn't match
913            // get the old lock
914            if (log.isTraceEnabled()) {
915                logQuery(id, LOCK_FIELDS);
916            }
917            old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
918            if (old == null) {
919                // document not found
920                throw new DocumentNotFoundException(id);
921            }
922            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
923            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
924            if (oldOwner != null) {
925                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
926                    // existing mismatched lock, flag failure
927                    return new Lock(oldOwner, oldCreated, true);
928                }
929                // old owner should have matched -- there was a race condition
930                // TODO do better
931                throw new ConcurrentUpdateException("Unlock " + id);
932            }
933            // old owner null, should have matched -- there was a race condition
934            // TODO do better
935            throw new ConcurrentUpdateException("Unlock " + id);
936        }
937    }
938
939    @Override
940    public void closeLockManager() {
941
942    }
943
944    @Override
945    public void clearLockManagerCaches() {
946    }
947
948}