001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
044
045import java.io.Serializable;
046import java.net.UnknownHostException;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Calendar;
050import java.util.HashSet;
051import java.util.List;
052import java.util.Map;
053import java.util.Set;
054import java.util.UUID;
055import java.util.stream.Collectors;
056
057import javax.resource.spi.ConnectionManager;
058
059import org.apache.commons.lang.StringUtils;
060import org.apache.commons.logging.Log;
061import org.apache.commons.logging.LogFactory;
062import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
063import org.nuxeo.ecm.core.api.CursorService;
064import org.nuxeo.ecm.core.api.DocumentNotFoundException;
065import org.nuxeo.ecm.core.api.Lock;
066import org.nuxeo.ecm.core.api.NuxeoException;
067import org.nuxeo.ecm.core.api.PartialList;
068import org.nuxeo.ecm.core.api.ScrollResult;
069import org.nuxeo.ecm.core.blob.DocumentBlobManager;
070import org.nuxeo.ecm.core.model.LockManager;
071import org.nuxeo.ecm.core.model.Repository;
072import org.nuxeo.ecm.core.query.QueryParseException;
073import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
074import org.nuxeo.ecm.core.storage.State;
075import org.nuxeo.ecm.core.storage.State.StateDiff;
076import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
077import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
078import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
079import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
080import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
081import org.nuxeo.runtime.api.Framework;
082
083import com.mongodb.BasicDBObject;
084import com.mongodb.DB;
085import com.mongodb.DBCollection;
086import com.mongodb.DBCursor;
087import com.mongodb.DBObject;
088import com.mongodb.MongoClient;
089import com.mongodb.MongoClientOptions;
090import com.mongodb.MongoClientURI;
091import com.mongodb.QueryOperators;
092import com.mongodb.ServerAddress;
093import com.mongodb.WriteResult;
094
095/**
096 * MongoDB implementation of a {@link Repository}.
097 *
098 * @since 5.9.4
099 */
100public class MongoDBRepository extends DBSRepositoryBase {
101
102    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
103
104    public static final Long ZERO = Long.valueOf(0);
105
106    public static final Long ONE = Long.valueOf(1);
107
108    public static final Long MINUS_ONE = Long.valueOf(-1);
109
110    public static final String DB_DEFAULT = "nuxeo";
111
112    public static final String MONGODB_ID = "_id";
113
114    public static final String MONGODB_INC = "$inc";
115
116    public static final String MONGODB_SET = "$set";
117
118    public static final String MONGODB_UNSET = "$unset";
119
120    public static final String MONGODB_PUSH = "$push";
121
122    public static final String MONGODB_EACH = "$each";
123
124    public static final String MONGODB_META = "$meta";
125
126    public static final String MONGODB_TEXT_SCORE = "textScore";
127
128    private static final String MONGODB_INDEX_TEXT = "text";
129
130    private static final String MONGODB_INDEX_NAME = "name";
131
132    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
133
134    private static final String FULLTEXT_INDEX_NAME = "fulltext";
135
136    private static final String LANGUAGE_FIELD = "__language";
137
138    protected static final String COUNTER_NAME_UUID = "ecm:id";
139
140    protected static final String COUNTER_FIELD = "seq";
141
142    protected static final int MONGODB_OPTION_CONNECTION_TIMEOUT_MS = 30000;
143
144    protected static final int MONGODB_OPTION_SOCKET_TIMEOUT_MS = 60000;
145
146    protected MongoClient mongoClient;
147
148    protected DBCollection coll;
149
150    protected DBCollection countersColl;
151
152    /** The key to use to store the id in the database. */
153    protected String idKey;
154
155    /** True if we don't use MongoDB's native "_id" key to store the id. */
156    protected boolean useCustomId;
157
158    /** Number of values still available in the in-memory sequence. */
159    protected long sequenceLeft;
160
161    /** Last value used from the in-memory sequence. */
162    protected long sequenceLastValue;
163
164    /** Sequence allocation block size. */
165    protected long sequenceBlockSize;
166
167    protected final MongoDBConverter converter;
168
169    protected final CursorService<DBCursor, DBObject> cursorService = new CursorService<>();
170
171    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
172        super(cm, descriptor.name, descriptor);
173        try {
174            mongoClient = newMongoClient(descriptor);
175            coll = getCollection(descriptor, mongoClient);
176            countersColl = getCountersCollection(descriptor, mongoClient);
177        } catch (UnknownHostException e) {
178            throw new RuntimeException(e);
179        }
180        if (Boolean.TRUE.equals(descriptor.nativeId)) {
181            idKey = MONGODB_ID;
182        } else {
183            idKey = KEY_ID;
184        }
185        useCustomId = KEY_ID.equals(idKey);
186        if (idType == IdType.sequence || DEBUG_UUIDS) {
187            Integer sbs = descriptor.sequenceBlockSize;
188            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
189            sequenceLeft = 0;
190        }
191        converter = new MongoDBConverter(idKey);
192        initRepository();
193    }
194
195    @Override
196    public List<IdType> getAllowedIdTypes() {
197        return Arrays.asList(IdType.varchar, IdType.sequence);
198    }
199
200    @Override
201    public void shutdown() {
202        super.shutdown();
203        cursorService.clear();
204        mongoClient.close();
205    }
206
207    // used also by unit tests
208    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
209        MongoClient ret;
210        String server = descriptor.server;
211        if (StringUtils.isBlank(server)) {
212            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
213        }
214        MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder()
215                // Can help to prevent firewall disconnects inactive connection, option not available from URI
216                .socketKeepAlive(true)
217                // don't wait for ever by default, can be overridden using URI options
218                .connectTimeout(MONGODB_OPTION_CONNECTION_TIMEOUT_MS)
219                .socketTimeout(MONGODB_OPTION_SOCKET_TIMEOUT_MS)
220                .description("Nuxeo");
221        if (server.startsWith("mongodb://")) {
222            // allow mongodb:// URI syntax for the server, to pass everything in one string
223            ret = new MongoClient(new MongoClientURI(server, optionsBuilder));
224        } else {
225            ret = new MongoClient(new ServerAddress(server), optionsBuilder.build());
226        }
227        if (log.isDebugEnabled()) {
228            log.debug("MongoClient initialized with options: " + ret.getMongoClientOptions().toString());
229        }
230        return ret;
231    }
232
233    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
234        if (StringUtils.isBlank(dbname)) {
235            dbname = DB_DEFAULT;
236        }
237        DB db = mongoClient.getDB(dbname);
238        return db.getCollection(collection);
239    }
240
241    // used also by unit tests
242    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
243        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
244    }
245
246    // used also by unit tests
247    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
248        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
249    }
250
251    protected void initRepository() {
252        // create required indexes
253        // code does explicit queries on those
254        if (useCustomId) {
255            coll.createIndex(new BasicDBObject(idKey, ONE));
256        }
257        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
258        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
259        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
260        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
261        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
262        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
263        DBObject parentChild = new BasicDBObject();
264        parentChild.put(KEY_PARENT_ID, ONE);
265        parentChild.put(KEY_NAME, ONE);
266        coll.createIndex(parentChild);
267        // often used in user-generated queries
268        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
269        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
270        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
271        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
272        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
273        // TODO configure these from somewhere else
274        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
275        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
276        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
277        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
278        if (!isFulltextDisabled()) {
279            DBObject indexKeys = new BasicDBObject();
280            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
281            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
282            DBObject indexOptions = new BasicDBObject();
283            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
284            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
285            coll.createIndex(indexKeys, indexOptions);
286        }
287        // check root presence
288        DBObject query = new BasicDBObject(idKey, getRootId());
289        if (coll.findOne(query, justPresenceField()) != null) {
290            return;
291        }
292        // create basic repository structure needed
293        if (idType == IdType.sequence || DEBUG_UUIDS) {
294            // create the id counter
295            DBObject idCounter = new BasicDBObject();
296            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
297            idCounter.put(COUNTER_FIELD, ZERO);
298            countersColl.insert(idCounter);
299        }
300        initRoot();
301    }
302
303    protected synchronized Long getNextSequenceId() {
304        if (sequenceLeft == 0) {
305            // allocate a new sequence block
306            // the database contains the last value from the last block
307            DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
308            DBObject update = new BasicDBObject(MONGODB_INC,
309                    new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)));
310            DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false);
311            if (idCounter == null) {
312                throw new NuxeoException("Repository id counter not initialized");
313            }
314            sequenceLeft = sequenceBlockSize;
315            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
316        }
317        sequenceLeft--;
318        sequenceLastValue++;
319        return Long.valueOf(sequenceLastValue);
320    }
321
322    @Override
323    public String generateNewId() {
324        if (idType == IdType.sequence || DEBUG_UUIDS) {
325            Long id = getNextSequenceId();
326            if (DEBUG_UUIDS) {
327                return "UUID_" + id;
328            }
329            return id.toString();
330        } else {
331            return UUID.randomUUID().toString();
332        }
333    }
334
335    @Override
336    public void createState(State state) {
337        DBObject ob = converter.stateToBson(state);
338        if (log.isTraceEnabled()) {
339            log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob);
340        }
341        coll.insert(ob);
342        // TODO dupe exception
343        // throw new DocumentException("Already exists: " + id);
344    }
345
346    @Override
347    public void createStates(List<State> states) {
348        List<DBObject> obs = states.stream().map(converter::stateToBson).collect(Collectors.toList());
349        if (log.isTraceEnabled()) {
350            log.trace("MongoDB: CREATE ["
351                    + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", "))
352                    + "]: " + obs);
353        }
354        coll.insert(obs);
355    }
356
357    @Override
358    public State readState(String id) {
359        DBObject query = new BasicDBObject(idKey, id);
360        return findOne(query);
361    }
362
363    @Override
364    public List<State> readStates(List<String> ids) {
365        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
366        return findAll(query, ids.size());
367    }
368
369    @Override
370    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
371        List<DBObject> updates = converter.diffToBson(diff);
372        for (DBObject update : updates) {
373            DBObject query = new BasicDBObject(idKey, id);
374            if (changeTokenUpdater == null) {
375                if (log.isTraceEnabled()) {
376                    log.trace("MongoDB: UPDATE " + id + ": " + update);
377                }
378            } else {
379                // assume bson is identical to dbs internals
380                // condition works even if value is null
381                Map<String, Serializable> conditions = changeTokenUpdater.getConditions();
382                Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates();
383                if (update.containsField(MONGODB_SET)) {
384                    ((DBObject) update.get(MONGODB_SET)).putAll(tokenUpdates);
385                } else {
386                    DBObject set = new BasicDBObject();
387                    set.putAll(tokenUpdates);
388                    update.put(MONGODB_SET, set);
389                }
390                if (log.isTraceEnabled()) {
391                    log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update);
392                }
393                query.putAll(conditions);
394            }
395            WriteResult w = coll.update(query, update);
396            if (w.getN() != 1) {
397                log.trace("MongoDB:    -> CONCURRENT UPDATE: " + id);
398                throw new ConcurrentUpdateException(id);
399            }
400            // TODO dupe exception
401            // throw new DocumentException("Missing: " + id);
402        }
403    }
404
405    @Override
406    public void deleteStates(Set<String> ids) {
407        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
408        if (log.isTraceEnabled()) {
409            log.trace("MongoDB: REMOVE " + ids);
410        }
411        WriteResult w = coll.remove(query);
412        if (w.getN() != ids.size()) {
413            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
414        }
415    }
416
417    @Override
418    public State readChildState(String parentId, String name, Set<String> ignored) {
419        DBObject query = getChildQuery(parentId, name, ignored);
420        return findOne(query);
421    }
422
423    protected void logQuery(String id, DBObject fields) {
424        logQuery(new BasicDBObject(idKey, id), fields);
425    }
426
427    protected void logQuery(DBObject query, DBObject fields) {
428        if (fields == null) {
429            log.trace("MongoDB: QUERY " + query);
430        } else {
431            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
432        }
433    }
434
435    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
436        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
437                + " OFFSET " + offset + " LIMIT " + limit);
438    }
439
440    @Override
441    public boolean hasChild(String parentId, String name, Set<String> ignored) {
442        DBObject query = getChildQuery(parentId, name, ignored);
443        if (log.isTraceEnabled()) {
444            logQuery(query, justPresenceField());
445        }
446        return coll.findOne(query, justPresenceField()) != null;
447    }
448
449    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
450        DBObject query = new BasicDBObject();
451        query.put(KEY_PARENT_ID, parentId);
452        query.put(KEY_NAME, name);
453        addIgnoredIds(query, ignored);
454        return query;
455    }
456
457    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
458        if (!ignored.isEmpty()) {
459            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<>(ignored));
460            query.put(idKey, notInIds);
461        }
462    }
463
464    @Override
465    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
466        DBObject query = new BasicDBObject(converter.keyToBson(key), value);
467        addIgnoredIds(query, ignored);
468        return findAll(query, 0);
469    }
470
471    @Override
472    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
473        DBObject query = new BasicDBObject(converter.keyToBson(key1), value1);
474        query.put(converter.keyToBson(key2), value2);
475        addIgnoredIds(query, ignored);
476        return findAll(query, 0);
477    }
478
479    @Override
480    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
481            Map<String, Object[]> targetProxies) {
482        DBObject query = new BasicDBObject(key, value);
483        DBObject fields = new BasicDBObject();
484        if (useCustomId) {
485            fields.put(MONGODB_ID, ZERO);
486        }
487        fields.put(idKey, ONE);
488        fields.put(KEY_IS_PROXY, ONE);
489        fields.put(KEY_PROXY_TARGET_ID, ONE);
490        fields.put(KEY_PROXY_IDS, ONE);
491        if (log.isTraceEnabled()) {
492            logQuery(query, fields);
493        }
494        try (DBCursor cursor = coll.find(query, fields)) {
495            for (DBObject ob : cursor) {
496                String id = (String) ob.get(idKey);
497                ids.add(id);
498                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
499                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
500                    proxyTargets.put(id, targetId);
501                }
502                if (targetProxies != null) {
503                    Object[] proxyIds = (Object[]) converter.bsonToValue(ob.get(KEY_PROXY_IDS));
504                    if (proxyIds != null) {
505                        targetProxies.put(id, proxyIds);
506                    }
507                }
508            }
509        }
510    }
511
512    @Override
513    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
514        DBObject query = new BasicDBObject(key, value);
515        addIgnoredIds(query, ignored);
516        if (log.isTraceEnabled()) {
517            logQuery(query, justPresenceField());
518        }
519        return coll.findOne(query, justPresenceField()) != null;
520    }
521
522    protected State findOne(DBObject query) {
523        if (log.isTraceEnabled()) {
524            logQuery(query, null);
525        }
526        return converter.bsonToState(coll.findOne(query));
527    }
528
529    protected List<State> findAll(DBObject query, int sizeHint) {
530        if (log.isTraceEnabled()) {
531            logQuery(query, null);
532        }
533        Set<String> seen = new HashSet<>();
534        try (DBCursor cursor = coll.find(query)) {
535            List<State> list = new ArrayList<>(sizeHint);
536            for (DBObject ob : cursor) {
537                if (!seen.add((String) ob.get(idKey))) {
538                    // MongoDB cursors may return the same
539                    // object several times
540                    continue;
541                }
542                list.add(converter.bsonToState(ob));
543            }
544            return list;
545        }
546    }
547
548    protected DBObject justPresenceField() {
549        return new BasicDBObject(MONGODB_ID, ONE);
550    }
551
552    @Override
553    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
554            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
555        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
556        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
557                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
558        builder.walk();
559        if (builder.hasFulltext && isFulltextDisabled()) {
560            throw new QueryParseException("Fulltext search disabled by configuration");
561        }
562        DBObject query = builder.getQuery();
563        addPrincipals(query, evaluator.principals);
564        DBObject orderBy = builder.getOrderBy();
565        DBObject keys = builder.getProjection();
566        // Don't do manual projection if there are no projection wildcards, as this brings no new
567        // information and is costly. The only difference is several identical rows instead of one.
568        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
569        if (manualProjection) {
570            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
571            // so we need the full state from the database
572            keys = new BasicDBObject();
573            evaluator.parse();
574        }
575
576        if (log.isTraceEnabled()) {
577            logQuery(query, keys, orderBy, limit, offset);
578        }
579
580        List<Map<String, Serializable>> projections;
581        long totalSize;
582        try (DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit)) {
583            if (orderBy != null) {
584                cursor.sort(orderBy);
585            }
586            projections = new ArrayList<>();
587            for (DBObject ob : cursor) {
588                State state = converter.bsonToState(ob);
589                if (manualProjection) {
590                    projections.addAll(evaluator.matches(state));
591                } else {
592                    projections.add(DBSStateFlattener.flatten(state));
593                }
594            }
595            if (countUpTo == -1) {
596                // count full size
597                if (limit == 0) {
598                    totalSize = projections.size();
599                } else {
600                    totalSize = cursor.count();
601                }
602            } else if (countUpTo == 0) {
603                // no count
604                totalSize = -1; // not counted
605            } else {
606                // count only if less than countUpTo
607                if (limit == 0) {
608                    totalSize = projections.size();
609                } else {
610                    totalSize = cursor.copy().limit(countUpTo + 1).count();
611                }
612                if (totalSize > countUpTo) {
613                    totalSize = -2; // truncated
614                }
615            }
616        }
617        if (log.isTraceEnabled() && projections.size() != 0) {
618            log.trace("MongoDB:    -> " + projections.size());
619        }
620        return new PartialList<>(projections, totalSize);
621    }
622
623    @Override
624    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
625        cursorService.checkForTimedOutScroll();
626        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
627                evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
628        builder.walk();
629        if (builder.hasFulltext && isFulltextDisabled()) {
630            throw new QueryParseException("Fulltext search disabled by configuration");
631        }
632        DBObject query = builder.getQuery();
633        DBObject keys = builder.getProjection();
634        if (log.isTraceEnabled()) {
635            logQuery(query, keys, null, 0, 0);
636        }
637
638        DBCursor cursor = coll.find(query, keys);
639        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
640        return scroll(scrollId);
641    }
642
643    @Override
644    public ScrollResult scroll(String scrollId) {
645        return cursorService.scroll(scrollId, ob -> (String) ob.get(converter.keyToBson(KEY_ID)));
646    }
647
648    protected void addPrincipals(DBObject query, Set<String> principals) {
649        if (principals != null) {
650            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<>(principals));
651            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
652        }
653    }
654
655    /** Keys used for document projection when marking all binaries for GC. */
656    protected DBObject binaryKeys;
657
658    @Override
659    protected void initBlobsPaths() {
660        MongoDBBlobFinder finder = new MongoDBBlobFinder();
661        finder.visit();
662        binaryKeys = finder.binaryKeys;
663    }
664
665    protected static class MongoDBBlobFinder extends BlobFinder {
666        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
667
668        @Override
669        protected void recordBlobPath() {
670            path.addLast(KEY_BLOB_DATA);
671            binaryKeys.put(StringUtils.join(path, "."), ONE);
672            path.removeLast();
673        }
674    }
675
676    @Override
677    public void markReferencedBinaries() {
678        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
679        // TODO add a query to not scan all documents
680        if (log.isTraceEnabled()) {
681            logQuery(new BasicDBObject(), binaryKeys);
682        }
683        try (DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys)) {
684            for (DBObject ob : cursor) {
685                markReferencedBinaries(ob, blobManager);
686            }
687        }
688    }
689
690    protected void markReferencedBinaries(DBObject ob, DocumentBlobManager blobManager) {
691        for (String key : ob.keySet()) {
692            Object value = ob.get(key);
693            if (value instanceof List) {
694                @SuppressWarnings("unchecked")
695                List<Object> list = (List<Object>) value;
696                for (Object v : list) {
697                    if (v instanceof DBObject) {
698                        markReferencedBinaries((DBObject) v, blobManager);
699                    } else {
700                        markReferencedBinary(v, blobManager);
701                    }
702                }
703            } else if (value instanceof Object[]) {
704                for (Object v : (Object[]) value) {
705                    markReferencedBinary(v, blobManager);
706                }
707            } else if (value instanceof DBObject) {
708                markReferencedBinaries((DBObject) value, blobManager);
709            } else {
710                markReferencedBinary(value, blobManager);
711            }
712        }
713    }
714
715    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
716        if (!(value instanceof String)) {
717            return;
718        }
719        String key = (String) value;
720        blobManager.markReferencedBinary(key, repositoryName);
721    }
722
723    protected static final DBObject LOCK_FIELDS;
724
725    static {
726        LOCK_FIELDS = new BasicDBObject();
727        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
728        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
729    }
730
731    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
732
733    @Override
734    public Lock getLock(String id) {
735        if (log.isTraceEnabled()) {
736            logQuery(id, LOCK_FIELDS);
737        }
738        DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
739        if (res == null) {
740            // document not found
741            throw new DocumentNotFoundException(id);
742        }
743        String owner = (String) res.get(KEY_LOCK_OWNER);
744        if (owner == null) {
745            // not locked
746            return null;
747        }
748        Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED));
749        return new Lock(owner, created);
750    }
751
752    @Override
753    public Lock setLock(String id, Lock lock) {
754        DBObject query = new BasicDBObject(idKey, id);
755        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
756        DBObject setLock = new BasicDBObject();
757        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
758        setLock.put(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated()));
759        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
760        if (log.isTraceEnabled()) {
761            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
762        }
763        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
764        if (res != null) {
765            // found a doc to lock
766            return null;
767        } else {
768            // doc not found, or lock owner already set
769            // get the old lock
770            if (log.isTraceEnabled()) {
771                logQuery(id, LOCK_FIELDS);
772            }
773            DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
774            if (old == null) {
775                // document not found
776                throw new DocumentNotFoundException(id);
777            }
778            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
779            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
780            if (oldOwner != null) {
781                return new Lock(oldOwner, oldCreated);
782            }
783            // no lock -- there was a race condition
784            // TODO do better
785            throw new ConcurrentUpdateException("Lock " + id);
786        }
787    }
788
789    @Override
790    public Lock removeLock(String id, String owner) {
791        DBObject query = new BasicDBObject(idKey, id);
792        if (owner != null) {
793            // remove if owner matches or null
794            // implements LockManager.canLockBeRemoved inside MongoDB
795            Object ownerOrNull = Arrays.asList(owner, null);
796            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
797        } // else unconditional remove
798        // remove the lock
799        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
800        if (old != null) {
801            // found a doc and removed the lock, return previous lock
802            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
803            if (oldOwner == null) {
804                // was not locked
805                return null;
806            } else {
807                // return previous lock
808                Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
809                return new Lock(oldOwner, oldCreated);
810            }
811        } else {
812            // doc not found, or lock owner didn't match
813            // get the old lock
814            if (log.isTraceEnabled()) {
815                logQuery(id, LOCK_FIELDS);
816            }
817            old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
818            if (old == null) {
819                // document not found
820                throw new DocumentNotFoundException(id);
821            }
822            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
823            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
824            if (oldOwner != null) {
825                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
826                    // existing mismatched lock, flag failure
827                    return new Lock(oldOwner, oldCreated, true);
828                }
829                // old owner should have matched -- there was a race condition
830                // TODO do better
831                throw new ConcurrentUpdateException("Unlock " + id);
832            }
833            // old owner null, should have matched -- there was a race condition
834            // TODO do better
835            throw new ConcurrentUpdateException("Unlock " + id);
836        }
837    }
838
839    @Override
840    public void closeLockManager() {
841
842    }
843
844    @Override
845    public void clearLockManagerCaches() {
846    }
847
848}