001/*
002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_TRASHED;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
042
043import java.io.Serializable;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Calendar;
047import java.util.Collection;
048import java.util.Collections;
049import java.util.HashSet;
050import java.util.List;
051import java.util.Map;
052import java.util.Set;
053import java.util.Spliterators;
054import java.util.UUID;
055import java.util.stream.Collectors;
056import java.util.stream.Stream;
057import java.util.stream.StreamSupport;
058
059import javax.resource.spi.ConnectionManager;
060
061import org.apache.commons.lang3.StringUtils;
062import org.apache.commons.logging.Log;
063import org.apache.commons.logging.LogFactory;
064import org.bson.Document;
065import org.bson.conversions.Bson;
066import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
067import org.nuxeo.ecm.core.api.CursorService;
068import org.nuxeo.ecm.core.api.DocumentNotFoundException;
069import org.nuxeo.ecm.core.api.Lock;
070import org.nuxeo.ecm.core.api.NuxeoException;
071import org.nuxeo.ecm.core.api.PartialList;
072import org.nuxeo.ecm.core.api.ScrollResult;
073import org.nuxeo.ecm.core.blob.DocumentBlobManager;
074import org.nuxeo.ecm.core.model.LockManager;
075import org.nuxeo.ecm.core.model.Repository;
076import org.nuxeo.ecm.core.query.QueryParseException;
077import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
078import org.nuxeo.ecm.core.storage.State;
079import org.nuxeo.ecm.core.storage.State.StateDiff;
080import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
081import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
082import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
083import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
084import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
085import org.nuxeo.runtime.api.Framework;
086import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
087
088import com.mongodb.Block;
089import com.mongodb.QueryOperators;
090import com.mongodb.client.FindIterable;
091import com.mongodb.client.MongoCollection;
092import com.mongodb.client.MongoCursor;
093import com.mongodb.client.MongoDatabase;
094import com.mongodb.client.MongoIterable;
095import com.mongodb.client.model.CountOptions;
096import com.mongodb.client.model.Filters;
097import com.mongodb.client.model.FindOneAndUpdateOptions;
098import com.mongodb.client.model.IndexOptions;
099import com.mongodb.client.model.Indexes;
100import com.mongodb.client.model.Projections;
101import com.mongodb.client.model.ReturnDocument;
102import com.mongodb.client.model.Updates;
103import com.mongodb.client.result.DeleteResult;
104import com.mongodb.client.result.UpdateResult;
105
106/**
107 * MongoDB implementation of a {@link Repository}.
108 *
109 * @since 5.9.4
110 */
111public class MongoDBRepository extends DBSRepositoryBase {
112
113    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
114
115    /**
116     * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}.
117     * <p />
118     * The connection id will be {@code repository/[REPOSITORY_NAME]}.
119     */
120    public static final String REPOSITORY_CONNECTION_PREFIX = "repository/";
121
122    public static final Long LONG_ZERO = Long.valueOf(0);
123
124    public static final Double ZERO = Double.valueOf(0);
125
126    public static final Double ONE = Double.valueOf(1);
127
128    public static final String MONGODB_ID = "_id";
129
130    public static final String MONGODB_INC = "$inc";
131
132    public static final String MONGODB_SET = "$set";
133
134    public static final String MONGODB_UNSET = "$unset";
135
136    public static final String MONGODB_PUSH = "$push";
137
138    public static final String MONGODB_EACH = "$each";
139
140    public static final String MONGODB_META = "$meta";
141
142    public static final String MONGODB_TEXT_SCORE = "textScore";
143
144    private static final String FULLTEXT_INDEX_NAME = "fulltext";
145
146    private static final String LANGUAGE_FIELD = "__language";
147
148    protected static final String COUNTER_NAME_UUID = "ecm:id";
149
150    protected static final String COUNTER_FIELD = "seq";
151
152    protected final MongoCollection<Document> coll;
153
154    protected final MongoCollection<Document> countersColl;
155
156    /** The key to use to store the id in the database. */
157    protected String idKey;
158
159    /** True if we don't use MongoDB's native "_id" key to store the id. */
160    protected boolean useCustomId;
161
162    /** Number of values still available in the in-memory sequence. */
163    protected long sequenceLeft;
164
165    /** Last value used from the in-memory sequence. */
166    protected long sequenceLastValue;
167
168    /** Sequence allocation block size. */
169    protected long sequenceBlockSize;
170
171    protected final MongoDBConverter converter;
172
173    protected final CursorService<MongoCursor<Document>, Document, String> cursorService;
174
175    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
176        super(cm, descriptor.name, descriptor);
177        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
178        // prefix with repository/ to group repository connection
179        MongoDatabase database = mongoService.getDatabase(REPOSITORY_CONNECTION_PREFIX + descriptor.name);
180        coll = database.getCollection(descriptor.name);
181        countersColl = database.getCollection(descriptor.name + ".counters");
182        if (Boolean.TRUE.equals(descriptor.nativeId)) {
183            idKey = MONGODB_ID;
184        } else {
185            idKey = KEY_ID;
186        }
187        useCustomId = KEY_ID.equals(idKey);
188        if (idType == IdType.sequence || DEBUG_UUIDS) {
189            Integer sbs = descriptor.sequenceBlockSize;
190            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
191            sequenceLeft = 0;
192        }
193        converter = new MongoDBConverter(useCustomId ? null : KEY_ID);
194        cursorService = new CursorService<>(ob -> (String) ob.get(converter.keyToBson(KEY_ID)));
195        initRepository();
196    }
197
198    @Override
199    public List<IdType> getAllowedIdTypes() {
200        return Arrays.asList(IdType.varchar, IdType.sequence);
201    }
202
203    @Override
204    public void shutdown() {
205        super.shutdown();
206        cursorService.clear();
207    }
208
209    protected void initRepository() {
210        // create required indexes
211        // code does explicit queries on those
212        if (useCustomId) {
213            coll.createIndex(Indexes.ascending(idKey));
214        }
215        coll.createIndex(Indexes.ascending(KEY_PARENT_ID));
216        coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS));
217        coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID));
218        coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID));
219        coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID));
220        coll.createIndex(Indexes.ascending(KEY_READ_ACL));
221        coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME));
222        // often used in user-generated queries
223        coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE));
224        coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE));
225        coll.createIndex(Indexes.ascending(KEY_IS_TRASHED));
226        if(!isFulltextDisabled()) {
227            coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID));
228        }
229        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER));
230        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS));
231        // TODO configure these from somewhere else
232        coll.createIndex(Indexes.descending("dc:modified"));
233        coll.createIndex(Indexes.ascending("rend:renditionName"));
234        coll.createIndex(Indexes.ascending("drv:subscriptions.enabled"));
235        coll.createIndex(Indexes.ascending("collectionMember:collectionIds"));
236        coll.createIndex(Indexes.ascending("nxtag:tags"));
237        if (!isFulltextSearchDisabled()) {
238            Bson indexKeys = Indexes.compoundIndex( //
239                    Indexes.text(KEY_FULLTEXT_SIMPLE), //
240                    Indexes.text(KEY_FULLTEXT_BINARY) //
241            );
242            IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD);
243            coll.createIndex(indexKeys, indexOptions);
244        }
245        // check root presence
246        if (coll.count(Filters.eq(idKey, getRootId())) > 0) {
247            return;
248        }
249        // create basic repository structure needed
250        if (idType == IdType.sequence || DEBUG_UUIDS) {
251            // create the id counter
252            Document idCounter = new Document();
253            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
254            idCounter.put(COUNTER_FIELD, LONG_ZERO);
255            countersColl.insertOne(idCounter);
256        }
257        initRoot();
258    }
259
260    protected synchronized Long getNextSequenceId() {
261        if (sequenceLeft == 0) {
262            // allocate a new sequence block
263            // the database contains the last value from the last block
264            Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID);
265            Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize));
266            Document idCounter = countersColl.findOneAndUpdate(filter, update,
267                    new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
268            if (idCounter == null) {
269                throw new NuxeoException("Repository id counter not initialized");
270            }
271            sequenceLeft = sequenceBlockSize;
272            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
273        }
274        sequenceLeft--;
275        sequenceLastValue++;
276        return Long.valueOf(sequenceLastValue);
277    }
278
279    @Override
280    public String generateNewId() {
281        if (idType == IdType.sequence || DEBUG_UUIDS) {
282            Long id = getNextSequenceId();
283            if (DEBUG_UUIDS) {
284                return "UUID_" + id;
285            }
286            return id.toString();
287        } else {
288            return UUID.randomUUID().toString();
289        }
290    }
291
292    @Override
293    public void createState(State state) {
294        Document doc = converter.stateToBson(state);
295        if (log.isTraceEnabled()) {
296            log.trace("MongoDB: CREATE " + doc.get(idKey) + ": " + doc);
297        }
298        coll.insertOne(doc);
299        // TODO dupe exception
300        // throw new DocumentException("Already exists: " + id);
301    }
302
303    @Override
304    public void createStates(List<State> states) {
305        List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList());
306        if (log.isTraceEnabled()) {
307            log.trace("MongoDB: CREATE ["
308                    + docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", "))
309                    + "]: " + docs);
310        }
311        coll.insertMany(docs);
312    }
313
314    @Override
315    public State readState(String id) {
316        return findOne(Filters.eq(idKey, id));
317    }
318
319    @Override
320    public State readPartialState(String id, Collection<String> keys) {
321        Document fields = new Document();
322        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
323        return findOne(Filters.eq(idKey, id), fields);
324    }
325
326    @Override
327    public List<State> readStates(List<String> ids) {
328        return findAll(Filters.in(idKey, ids));
329    }
330
331    @Override
332    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
333        List<Document> updates = converter.diffToBson(diff);
334        for (Document update : updates) {
335            Document filter = new Document(idKey, id);
336            if (changeTokenUpdater == null) {
337                if (log.isTraceEnabled()) {
338                    log.trace("MongoDB: UPDATE " + id + ": " + update);
339                }
340            } else {
341                // assume bson is identical to dbs internals
342                // condition works even if value is null
343                Map<String, Serializable> conditions = changeTokenUpdater.getConditions();
344                Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates();
345                if (update.containsKey(MONGODB_SET)) {
346                    ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates);
347                } else {
348                    Document set = new Document();
349                    set.putAll(tokenUpdates);
350                    update.put(MONGODB_SET, set);
351                }
352                if (log.isTraceEnabled()) {
353                    log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update);
354                }
355                filter.putAll(conditions);
356            }
357            UpdateResult w = coll.updateMany(filter, update);
358            if (w.getModifiedCount() != 1) {
359                log.trace("MongoDB:    -> CONCURRENT UPDATE: " + id);
360                throw new ConcurrentUpdateException(id);
361            }
362            // TODO dupe exception
363            // throw new DocumentException("Missing: " + id);
364        }
365    }
366
367    @Override
368    public void deleteStates(Set<String> ids) {
369        Bson filter = Filters.in(idKey, ids);
370        if (log.isTraceEnabled()) {
371            log.trace("MongoDB: REMOVE " + ids);
372        }
373        DeleteResult w = coll.deleteMany(filter);
374        if (w.getDeletedCount() != ids.size()) {
375            if (log.isDebugEnabled()) {
376                log.debug("Removed " + w.getDeletedCount() + " docs for " + ids.size() + " ids: " + ids);
377            }
378        }
379    }
380
381    @Override
382    public State readChildState(String parentId, String name, Set<String> ignored) {
383        Bson filter = getChildQuery(parentId, name, ignored);
384        return findOne(filter);
385    }
386
387    protected void logQuery(String id, Bson fields) {
388        logQuery(Filters.eq(idKey, id), fields);
389    }
390
391    protected void logQuery(Bson filter, Bson fields) {
392        if (fields == null) {
393            log.trace("MongoDB: QUERY " + filter);
394        } else {
395            log.trace("MongoDB: QUERY " + filter + " KEYS " + fields);
396        }
397    }
398
399    protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) {
400        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
401                + " OFFSET " + offset + " LIMIT " + limit);
402    }
403
404    @Override
405    public boolean hasChild(String parentId, String name, Set<String> ignored) {
406        Document filter = getChildQuery(parentId, name, ignored);
407        return exists(filter);
408    }
409
410    protected Document getChildQuery(String parentId, String name, Set<String> ignored) {
411        Document filter = new Document();
412        filter.put(KEY_PARENT_ID, parentId);
413        filter.put(KEY_NAME, name);
414        addIgnoredIds(filter, ignored);
415        return filter;
416    }
417
418    protected void addIgnoredIds(Document filter, Set<String> ignored) {
419        if (!ignored.isEmpty()) {
420            Document notInIds = new Document(QueryOperators.NIN, new ArrayList<>(ignored));
421            filter.put(idKey, notInIds);
422        }
423    }
424
425    @Override
426    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
427        Document filter = new Document(converter.keyToBson(key), value);
428        addIgnoredIds(filter, ignored);
429        return findAll(filter);
430    }
431
432    @Override
433    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
434        Document filter = new Document(converter.keyToBson(key1), value1);
435        filter.put(converter.keyToBson(key2), value2);
436        addIgnoredIds(filter, ignored);
437        return findAll(filter);
438    }
439
440    @Override
441    public Stream<State> getDescendants(String rootId, Set<String> keys) {
442        return getDescendants(rootId, keys, 0);
443    }
444
445    @Override
446    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
447        Bson filter = Filters.eq(KEY_ANCESTOR_IDS, rootId);
448        Document fields = new Document();
449        if (useCustomId) {
450            fields.put(MONGODB_ID, ZERO);
451        }
452        fields.put(idKey, ONE);
453        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
454        return stream(filter, fields, limit);
455    }
456
457    @Override
458    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
459        Document filter = new Document(converter.keyToBson(key), value);
460        addIgnoredIds(filter, ignored);
461        return exists(filter);
462    }
463
464    protected boolean exists(Bson filter) {
465        return exists(filter, justPresenceField());
466    }
467
468    protected boolean exists(Bson filter, Bson projection) {
469        if (log.isTraceEnabled()) {
470            logQuery(filter, projection);
471        }
472        return coll.find(filter).projection(projection).first() != null;
473    }
474
475    protected State findOne(Bson filter) {
476        return findOne(filter, null);
477    }
478
479    protected State findOne(Bson filter, Bson projection) {
480        try (Stream<State> stream = stream(filter, projection)) {
481            return stream.findAny().orElse(null);
482        }
483    }
484
485    protected List<State> findAll(Bson filter) {
486        try (Stream<State> stream = stream(filter)) {
487            return stream.collect(Collectors.toList());
488        }
489    }
490
491    protected Stream<State> stream(Bson filter) {
492        return stream(filter, null, 0);
493    }
494
495    protected Stream<State> stream(Bson filter, Bson projection) {
496        return stream(filter, projection, 0);
497    }
498
499    /**
500     * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}.
501     * <p />
502     * We should rely on this method, because it correctly handles cursor closed state.
503     * <p />
504     * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case
505     * on some call to {@link MongoIterable#first()}.
506     *
507     * @return a closeable {@link Stream} instance linked to {@link MongoCursor}
508     */
509    protected Stream<State> stream(Bson filter, Bson projection, int limit) {
510        if (filter == null) {
511            // empty filter
512            filter = new Document();
513        }
514        // it's ok if projection is null
515        if (log.isTraceEnabled()) {
516            logQuery(filter, projection);
517        }
518
519        boolean completedAbruptly = true;
520        MongoCursor<Document> cursor = coll.find(filter).limit(limit).projection(projection).iterator();
521        try {
522            Set<String> seen = new HashSet<>();
523            Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) //
524                                                .onClose(cursor::close)
525                                                .filter(doc -> seen.add(doc.getString(idKey)))
526                                                // MongoDB cursors may return the same
527                                                // object several times
528                                                .map(converter::bsonToState);
529            // the stream takes responsibility for closing the session
530            completedAbruptly = false;
531            return stream;
532        } finally {
533            if (completedAbruptly) {
534                cursor.close();
535            }
536        }
537    }
538
539    protected Document justPresenceField() {
540        return new Document(MONGODB_ID, ONE);
541    }
542
543    @Override
544    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
545            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
546        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
547        MongoDBRepositoryQueryBuilder builder = new MongoDBRepositoryQueryBuilder(this, evaluator.getExpression(),
548                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
549        builder.walk();
550        if (builder.hasFulltext && isFulltextSearchDisabled()) {
551            throw new QueryParseException("Fulltext search disabled by configuration");
552        }
553        Document filter = builder.getQuery();
554        addPrincipals(filter, evaluator.principals);
555        Bson orderBy = builder.getOrderBy();
556        Bson keys = builder.getProjection();
557        // Don't do manual projection if there are no projection wildcards, as this brings no new
558        // information and is costly. The only difference is several identical rows instead of one.
559        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
560        if (manualProjection) {
561            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
562            // so we need the full state from the database
563            keys = null;
564            evaluator.parse();
565        }
566
567        if (log.isTraceEnabled()) {
568            logQuery(filter, keys, orderBy, limit, offset);
569        }
570
571        List<Map<String, Serializable>> projections;
572        long totalSize;
573        try (MongoCursor<Document> cursor = coll.find(filter)
574                                                .projection(keys)
575                                                .skip(offset)
576                                                .limit(limit)
577                                                .sort(orderBy)
578                                                .iterator()) {
579            projections = new ArrayList<>();
580            DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys);
581            Iterable<Document> docs = () -> cursor;
582            for (Document doc : docs) {
583                State state = converter.bsonToState(doc);
584                if (manualProjection) {
585                    projections.addAll(evaluator.matches(state));
586                } else {
587                    projections.add(flattener.flatten(state));
588                }
589            }
590        }
591        if (countUpTo == -1) {
592            // count full size
593            if (limit == 0) {
594                totalSize = projections.size();
595            } else if (manualProjection) {
596                totalSize = -1; // unknown due to manual projection
597            } else {
598                totalSize = coll.count(filter);
599            }
600        } else if (countUpTo == 0) {
601            // no count
602            totalSize = -1; // not counted
603        } else {
604            // count only if less than countUpTo
605            if (limit == 0) {
606                totalSize = projections.size();
607            } else if (manualProjection) {
608                totalSize = -1; // unknown due to manual projection
609            } else {
610                totalSize = coll.count(filter, new CountOptions().limit(countUpTo + 1));
611            }
612            if (totalSize > countUpTo) {
613                totalSize = -2; // truncated
614            }
615        }
616        if (log.isTraceEnabled() && projections.size() != 0) {
617            log.trace("MongoDB:    -> " + projections.size());
618        }
619        return new PartialList<>(projections, totalSize);
620    }
621
622    @Override
623    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
624        cursorService.checkForTimedOutScroll();
625        MongoDBRepositoryQueryBuilder builder = new MongoDBRepositoryQueryBuilder(this, evaluator.getExpression(),
626                evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
627        builder.walk();
628        if (builder.hasFulltext && isFulltextSearchDisabled()) {
629            throw new QueryParseException("Fulltext search disabled by configuration");
630        }
631        Bson filter = builder.getQuery();
632        addPrincipals((Document) filter, evaluator.principals);
633        Bson keys = builder.getProjection();
634        if (log.isTraceEnabled()) {
635            logQuery(filter, keys, null, 0, 0);
636        }
637
638        MongoCursor<Document> cursor = coll.find(filter).projection(keys).batchSize(batchSize).iterator();
639        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
640        return scroll(scrollId);
641    }
642
643    @Override
644    public ScrollResult<String> scroll(String scrollId) {
645        return cursorService.scroll(scrollId);
646    }
647
648    protected void addPrincipals(Document query, Set<String> principals) {
649        if (principals != null) {
650            Document inPrincipals = new Document(QueryOperators.IN, new ArrayList<>(principals));
651            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
652        }
653    }
654
655    /** Keys used for document projection when marking all binaries for GC. */
656    protected Bson binaryKeys;
657
658    @Override
659    protected void initBlobsPaths() {
660        MongoDBBlobFinder finder = new MongoDBBlobFinder();
661        finder.visit();
662        binaryKeys = Projections.fields(finder.binaryKeys);
663    }
664
665    protected static class MongoDBBlobFinder extends BlobFinder {
666        protected List<Bson> binaryKeys = new ArrayList<>(Collections.singleton(Projections.excludeId()));
667
668        @Override
669        protected void recordBlobPath() {
670            path.addLast(KEY_BLOB_DATA);
671            binaryKeys.add(Projections.include(StringUtils.join(path, ".")));
672            path.removeLast();
673        }
674    }
675
676    @Override
677    public void markReferencedBinaries() {
678        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
679        // TODO add a query to not scan all documents
680        if (log.isTraceEnabled()) {
681            logQuery(new Document(), binaryKeys);
682        }
683        Block<Document> block = doc -> markReferencedBinaries(doc, blobManager);
684        coll.find().projection(binaryKeys).forEach(block);
685    }
686
687    protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) {
688        for (String key : ob.keySet()) {
689            Object value = ob.get(key);
690            if (value instanceof List) {
691                @SuppressWarnings("unchecked")
692                List<Object> list = (List<Object>) value;
693                for (Object v : list) {
694                    if (v instanceof Document) {
695                        markReferencedBinaries((Document) v, blobManager);
696                    } else {
697                        markReferencedBinary(v, blobManager);
698                    }
699                }
700            } else if (value instanceof Object[]) {
701                for (Object v : (Object[]) value) {
702                    markReferencedBinary(v, blobManager);
703                }
704            } else if (value instanceof Document) {
705                markReferencedBinaries((Document) value, blobManager);
706            } else {
707                markReferencedBinary(value, blobManager);
708            }
709        }
710    }
711
712    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
713        if (!(value instanceof String)) {
714            return;
715        }
716        String key = (String) value;
717        blobManager.markReferencedBinary(key, repositoryName);
718    }
719
720    protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED);
721
722    protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER),
723            Updates.unset(KEY_LOCK_CREATED));
724
725    @Override
726    public Lock getLock(String id) {
727        if (log.isTraceEnabled()) {
728            logQuery(id, LOCK_FIELDS);
729        }
730        Document res = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
731        if (res == null) {
732            // document not found
733            throw new DocumentNotFoundException(id);
734        }
735        String owner = res.getString(KEY_LOCK_OWNER);
736        if (owner == null) {
737            // not locked
738            return null;
739        }
740        Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED));
741        return new Lock(owner, created);
742    }
743
744    @Override
745    public Lock setLock(String id, Lock lock) {
746        Bson filter = Filters.and( //
747                Filters.eq(idKey, id), //
748                Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set
749        );
750        Bson setLock = Updates.combine( //
751                Updates.set(KEY_LOCK_OWNER, lock.getOwner()), //
752                Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())) //
753        );
754        if (log.isTraceEnabled()) {
755            log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + setLock);
756        }
757        Document res = coll.findOneAndUpdate(filter, setLock);
758        if (res != null) {
759            // found a doc to lock
760            return null;
761        } else {
762            // doc not found, or lock owner already set
763            // get the old lock
764            if (log.isTraceEnabled()) {
765                logQuery(id, LOCK_FIELDS);
766            }
767            Document old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
768            if (old == null) {
769                // document not found
770                throw new DocumentNotFoundException(id);
771            }
772            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
773            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
774            if (oldOwner != null) {
775                return new Lock(oldOwner, oldCreated);
776            }
777            // no lock -- there was a race condition
778            // TODO do better
779            throw new ConcurrentUpdateException("Lock " + id);
780        }
781    }
782
783    @Override
784    public Lock removeLock(String id, String owner) {
785        Document filter = new Document(idKey, id);
786        if (owner != null) {
787            // remove if owner matches or null
788            // implements LockManager.canLockBeRemoved inside MongoDB
789            Object ownerOrNull = Arrays.asList(owner, null);
790            filter.put(KEY_LOCK_OWNER, new Document(QueryOperators.IN, ownerOrNull));
791        } // else unconditional remove
792        // remove the lock
793        if (log.isTraceEnabled()) {
794            log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + UNSET_LOCK_UPDATE);
795        }
796        Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE);
797        if (old != null) {
798            // found a doc and removed the lock, return previous lock
799            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
800            if (oldOwner == null) {
801                // was not locked
802                return null;
803            } else {
804                // return previous lock
805                Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
806                return new Lock(oldOwner, oldCreated);
807            }
808        } else {
809            // doc not found, or lock owner didn't match
810            // get the old lock
811            if (log.isTraceEnabled()) {
812                logQuery(id, LOCK_FIELDS);
813            }
814            old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
815            if (old == null) {
816                // document not found
817                throw new DocumentNotFoundException(id);
818            }
819            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
820            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
821            if (oldOwner != null) {
822                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
823                    // existing mismatched lock, flag failure
824                    return new Lock(oldOwner, oldCreated, true);
825                }
826                // old owner should have matched -- there was a race condition
827                // TODO do better
828                throw new ConcurrentUpdateException("Unlock " + id);
829            }
830            // old owner null, should have matched -- there was a race condition
831            // TODO do better
832            throw new ConcurrentUpdateException("Unlock " + id);
833        }
834    }
835
836    @Override
837    public void closeLockManager() {
838
839    }
840
841    @Override
842    public void clearLockManagerCaches() {
843    }
844
845}