001/*
002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_TRASHED;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
042
043import java.io.Serializable;
044import java.util.ArrayList;
045import java.util.Arrays;
046import java.util.Calendar;
047import java.util.Collection;
048import java.util.Collections;
049import java.util.HashSet;
050import java.util.List;
051import java.util.Map;
052import java.util.Set;
053import java.util.Spliterators;
054import java.util.UUID;
055import java.util.stream.Collectors;
056import java.util.stream.Stream;
057import java.util.stream.StreamSupport;
058
059import javax.resource.spi.ConnectionManager;
060
061import org.apache.commons.lang3.StringUtils;
062import org.apache.commons.logging.Log;
063import org.apache.commons.logging.LogFactory;
064import org.bson.Document;
065import org.bson.conversions.Bson;
066import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
067import org.nuxeo.ecm.core.api.CursorService;
068import org.nuxeo.ecm.core.api.DocumentNotFoundException;
069import org.nuxeo.ecm.core.api.Lock;
070import org.nuxeo.ecm.core.api.NuxeoException;
071import org.nuxeo.ecm.core.api.PartialList;
072import org.nuxeo.ecm.core.api.ScrollResult;
073import org.nuxeo.ecm.core.blob.DocumentBlobManager;
074import org.nuxeo.ecm.core.model.LockManager;
075import org.nuxeo.ecm.core.model.Repository;
076import org.nuxeo.ecm.core.query.QueryParseException;
077import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
078import org.nuxeo.ecm.core.storage.State;
079import org.nuxeo.ecm.core.storage.State.StateDiff;
080import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
081import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
082import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
083import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
084import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
085import org.nuxeo.runtime.api.Framework;
086import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
087
088import com.mongodb.Block;
089import com.mongodb.QueryOperators;
090import com.mongodb.client.FindIterable;
091import com.mongodb.client.MongoCollection;
092import com.mongodb.client.MongoCursor;
093import com.mongodb.client.MongoDatabase;
094import com.mongodb.client.MongoIterable;
095import com.mongodb.client.model.CountOptions;
096import com.mongodb.client.model.Filters;
097import com.mongodb.client.model.FindOneAndUpdateOptions;
098import com.mongodb.client.model.IndexOptions;
099import com.mongodb.client.model.Indexes;
100import com.mongodb.client.model.Projections;
101import com.mongodb.client.model.ReturnDocument;
102import com.mongodb.client.model.Updates;
103import com.mongodb.client.result.DeleteResult;
104import com.mongodb.client.result.UpdateResult;
105
106/**
107 * MongoDB implementation of a {@link Repository}.
108 *
109 * @since 5.9.4
110 */
111public class MongoDBRepository extends DBSRepositoryBase {
112
113    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
114
115    /**
116     * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}.
117     * <p />
118     * The connection id will be {@code repository/[REPOSITORY_NAME]}.
119     */
120    public static final String REPOSITORY_CONNECTION_PREFIX = "repository/";
121
122    public static final Long LONG_ZERO = Long.valueOf(0);
123
124    public static final Double ZERO = Double.valueOf(0);
125
126    public static final Double ONE = Double.valueOf(1);
127
128    public static final String MONGODB_ID = "_id";
129
130    public static final String MONGODB_INC = "$inc";
131
132    public static final String MONGODB_SET = "$set";
133
134    public static final String MONGODB_UNSET = "$unset";
135
136    public static final String MONGODB_PUSH = "$push";
137
138    public static final String MONGODB_EACH = "$each";
139
140    public static final String MONGODB_META = "$meta";
141
142    public static final String MONGODB_TEXT_SCORE = "textScore";
143
144    private static final String FULLTEXT_INDEX_NAME = "fulltext";
145
146    private static final String LANGUAGE_FIELD = "__language";
147
148    protected static final String COUNTER_NAME_UUID = "ecm:id";
149
150    protected static final String COUNTER_FIELD = "seq";
151
152    protected final MongoCollection<Document> coll;
153
154    protected final MongoCollection<Document> countersColl;
155
156    /** The key to use to store the id in the database. */
157    protected String idKey;
158
159    /** True if we don't use MongoDB's native "_id" key to store the id. */
160    protected boolean useCustomId;
161
162    /** Number of values still available in the in-memory sequence. */
163    protected long sequenceLeft;
164
165    /** Last value used from the in-memory sequence. */
166    protected long sequenceLastValue;
167
168    /** Sequence allocation block size. */
169    protected long sequenceBlockSize;
170
171    protected final MongoDBConverter converter;
172
173    protected final CursorService<MongoCursor<Document>, Document, String> cursorService;
174
175    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
176        super(cm, descriptor.name, descriptor);
177        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
178        // prefix with repository/ to group repository connection
179        MongoDatabase database = mongoService.getDatabase(REPOSITORY_CONNECTION_PREFIX + descriptor.name);
180        coll = database.getCollection(descriptor.name);
181        countersColl = database.getCollection(descriptor.name + ".counters");
182        if (Boolean.TRUE.equals(descriptor.nativeId)) {
183            idKey = MONGODB_ID;
184        } else {
185            idKey = KEY_ID;
186        }
187        useCustomId = KEY_ID.equals(idKey);
188        if (idType == IdType.sequence || DEBUG_UUIDS) {
189            Integer sbs = descriptor.sequenceBlockSize;
190            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
191            sequenceLeft = 0;
192        }
193        converter = new MongoDBConverter(idKey);
194        cursorService = new CursorService<>(ob -> (String) ob.get(converter.keyToBson(KEY_ID)));
195        initRepository();
196    }
197
198    @Override
199    public List<IdType> getAllowedIdTypes() {
200        return Arrays.asList(IdType.varchar, IdType.sequence);
201    }
202
203    @Override
204    public void shutdown() {
205        super.shutdown();
206        cursorService.clear();
207    }
208
209    protected void initRepository() {
210        // create required indexes
211        // code does explicit queries on those
212        if (useCustomId) {
213            coll.createIndex(Indexes.ascending(idKey));
214        }
215        coll.createIndex(Indexes.ascending(KEY_PARENT_ID));
216        coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS));
217        coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID));
218        coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID));
219        coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID));
220        coll.createIndex(Indexes.ascending(KEY_READ_ACL));
221        coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME));
222        // often used in user-generated queries
223        coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE));
224        coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE));
225        coll.createIndex(Indexes.ascending(KEY_IS_TRASHED));
226        coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID));
227        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER));
228        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS));
229        // TODO configure these from somewhere else
230        coll.createIndex(Indexes.descending("dc:modified"));
231        coll.createIndex(Indexes.ascending("rend:renditionName"));
232        coll.createIndex(Indexes.ascending("drv:subscriptions.enabled"));
233        coll.createIndex(Indexes.ascending("collectionMember:collectionIds"));
234        coll.createIndex(Indexes.ascending("nxtag:tags"));
235        if (!isFulltextDisabled()) {
236            Bson indexKeys = Indexes.compoundIndex( //
237                    Indexes.text(KEY_FULLTEXT_SIMPLE), //
238                    Indexes.text(KEY_FULLTEXT_BINARY) //
239            );
240            IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD);
241            coll.createIndex(indexKeys, indexOptions);
242        }
243        // check root presence
244        if (coll.count(Filters.eq(idKey, getRootId())) > 0) {
245            return;
246        }
247        // create basic repository structure needed
248        if (idType == IdType.sequence || DEBUG_UUIDS) {
249            // create the id counter
250            Document idCounter = new Document();
251            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
252            idCounter.put(COUNTER_FIELD, LONG_ZERO);
253            countersColl.insertOne(idCounter);
254        }
255        initRoot();
256    }
257
258    protected synchronized Long getNextSequenceId() {
259        if (sequenceLeft == 0) {
260            // allocate a new sequence block
261            // the database contains the last value from the last block
262            Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID);
263            Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize));
264            Document idCounter = countersColl.findOneAndUpdate(filter, update,
265                    new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
266            if (idCounter == null) {
267                throw new NuxeoException("Repository id counter not initialized");
268            }
269            sequenceLeft = sequenceBlockSize;
270            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
271        }
272        sequenceLeft--;
273        sequenceLastValue++;
274        return Long.valueOf(sequenceLastValue);
275    }
276
277    @Override
278    public String generateNewId() {
279        if (idType == IdType.sequence || DEBUG_UUIDS) {
280            Long id = getNextSequenceId();
281            if (DEBUG_UUIDS) {
282                return "UUID_" + id;
283            }
284            return id.toString();
285        } else {
286            return UUID.randomUUID().toString();
287        }
288    }
289
290    @Override
291    public void createState(State state) {
292        Document doc = converter.stateToBson(state);
293        if (log.isTraceEnabled()) {
294            log.trace("MongoDB: CREATE " + doc.get(idKey) + ": " + doc);
295        }
296        coll.insertOne(doc);
297        // TODO dupe exception
298        // throw new DocumentException("Already exists: " + id);
299    }
300
301    @Override
302    public void createStates(List<State> states) {
303        List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList());
304        if (log.isTraceEnabled()) {
305            log.trace("MongoDB: CREATE ["
306                    + docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", "))
307                    + "]: " + docs);
308        }
309        coll.insertMany(docs);
310    }
311
312    @Override
313    public State readState(String id) {
314        return findOne(Filters.eq(idKey, id));
315    }
316
317    @Override
318    public State readPartialState(String id, Collection<String> keys) {
319        Document fields = new Document();
320        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
321        return findOne(Filters.eq(idKey, id), fields);
322    }
323
324    @Override
325    public List<State> readStates(List<String> ids) {
326        return findAll(Filters.in(idKey, ids));
327    }
328
329    @Override
330    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
331        List<Document> updates = converter.diffToBson(diff);
332        for (Document update : updates) {
333            Document filter = new Document(idKey, id);
334            if (changeTokenUpdater == null) {
335                if (log.isTraceEnabled()) {
336                    log.trace("MongoDB: UPDATE " + id + ": " + update);
337                }
338            } else {
339                // assume bson is identical to dbs internals
340                // condition works even if value is null
341                Map<String, Serializable> conditions = changeTokenUpdater.getConditions();
342                Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates();
343                if (update.containsKey(MONGODB_SET)) {
344                    ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates);
345                } else {
346                    Document set = new Document();
347                    set.putAll(tokenUpdates);
348                    update.put(MONGODB_SET, set);
349                }
350                if (log.isTraceEnabled()) {
351                    log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update);
352                }
353                filter.putAll(conditions);
354            }
355            UpdateResult w = coll.updateMany(filter, update);
356            if (w.getModifiedCount() != 1) {
357                log.trace("MongoDB:    -> CONCURRENT UPDATE: " + id);
358                throw new ConcurrentUpdateException(id);
359            }
360            // TODO dupe exception
361            // throw new DocumentException("Missing: " + id);
362        }
363    }
364
365    @Override
366    public void deleteStates(Set<String> ids) {
367        Bson filter = Filters.in(idKey, ids);
368        if (log.isTraceEnabled()) {
369            log.trace("MongoDB: REMOVE " + ids);
370        }
371        DeleteResult w = coll.deleteMany(filter);
372        if (w.getDeletedCount() != ids.size()) {
373            if (log.isDebugEnabled()) {
374                log.debug("Removed " + w.getDeletedCount() + " docs for " + ids.size() + " ids: " + ids);
375            }
376        }
377    }
378
379    @Override
380    public State readChildState(String parentId, String name, Set<String> ignored) {
381        Bson filter = getChildQuery(parentId, name, ignored);
382        return findOne(filter);
383    }
384
385    protected void logQuery(String id, Bson fields) {
386        logQuery(Filters.eq(idKey, id), fields);
387    }
388
389    protected void logQuery(Bson filter, Bson fields) {
390        if (fields == null) {
391            log.trace("MongoDB: QUERY " + filter);
392        } else {
393            log.trace("MongoDB: QUERY " + filter + " KEYS " + fields);
394        }
395    }
396
397    protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) {
398        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
399                + " OFFSET " + offset + " LIMIT " + limit);
400    }
401
402    @Override
403    public boolean hasChild(String parentId, String name, Set<String> ignored) {
404        Document filter = getChildQuery(parentId, name, ignored);
405        return exists(filter);
406    }
407
408    protected Document getChildQuery(String parentId, String name, Set<String> ignored) {
409        Document filter = new Document();
410        filter.put(KEY_PARENT_ID, parentId);
411        filter.put(KEY_NAME, name);
412        addIgnoredIds(filter, ignored);
413        return filter;
414    }
415
416    protected void addIgnoredIds(Document filter, Set<String> ignored) {
417        if (!ignored.isEmpty()) {
418            Document notInIds = new Document(QueryOperators.NIN, new ArrayList<>(ignored));
419            filter.put(idKey, notInIds);
420        }
421    }
422
423    @Override
424    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
425        Document filter = new Document(converter.keyToBson(key), value);
426        addIgnoredIds(filter, ignored);
427        return findAll(filter);
428    }
429
430    @Override
431    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
432        Document filter = new Document(converter.keyToBson(key1), value1);
433        filter.put(converter.keyToBson(key2), value2);
434        addIgnoredIds(filter, ignored);
435        return findAll(filter);
436    }
437
438    @Override
439    public Stream<State> getDescendants(String rootId, Set<String> keys) {
440        return getDescendants(rootId, keys, 0);
441    }
442
443    @Override
444    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
445        Bson filter = Filters.eq(KEY_ANCESTOR_IDS, rootId);
446        Document fields = new Document();
447        if (useCustomId) {
448            fields.put(MONGODB_ID, ZERO);
449        }
450        fields.put(idKey, ONE);
451        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
452        return stream(filter, fields, limit);
453    }
454
455    @Override
456    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
457        Document filter = new Document(converter.keyToBson(key), value);
458        addIgnoredIds(filter, ignored);
459        return exists(filter);
460    }
461
462    protected boolean exists(Bson filter) {
463        return exists(filter, justPresenceField());
464    }
465
466    protected boolean exists(Bson filter, Bson projection) {
467        if (log.isTraceEnabled()) {
468            logQuery(filter, projection);
469        }
470        return coll.find(filter).projection(projection).first() != null;
471    }
472
473    protected State findOne(Bson filter) {
474        return findOne(filter, null);
475    }
476
477    protected State findOne(Bson filter, Bson projection) {
478        try (Stream<State> stream = stream(filter, projection)) {
479            return stream.findAny().orElse(null);
480        }
481    }
482
483    protected List<State> findAll(Bson filter) {
484        try (Stream<State> stream = stream(filter)) {
485            return stream.collect(Collectors.toList());
486        }
487    }
488
489    protected Stream<State> stream(Bson filter) {
490        return stream(filter, null, 0);
491    }
492
493    protected Stream<State> stream(Bson filter, Bson projection) {
494        return stream(filter, projection, 0);
495    }
496
497    /**
498     * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}.
499     * <p />
500     * We should rely on this method, because it correctly handles cursor closed state.
501     * <p />
502     * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case
503     * on some call to {@link MongoIterable#first()}.
504     *
505     * @return a closeable {@link Stream} instance linked to {@link MongoCursor}
506     */
507    protected Stream<State> stream(Bson filter, Bson projection, int limit) {
508        if (filter == null) {
509            // empty filter
510            filter = new Document();
511        }
512        // it's ok if projection is null
513        if (log.isTraceEnabled()) {
514            logQuery(filter, projection);
515        }
516
517        boolean completedAbruptly = true;
518        MongoCursor<Document> cursor = coll.find(filter).limit(limit).projection(projection).iterator();
519        try {
520            Set<String> seen = new HashSet<>();
521            Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) //
522                                                .onClose(cursor::close)
523                                                .filter(doc -> seen.add(doc.getString(idKey)))
524                                                // MongoDB cursors may return the same
525                                                // object several times
526                                                .map(converter::bsonToState);
527            // the stream takes responsibility for closing the session
528            completedAbruptly = false;
529            return stream;
530        } finally {
531            if (completedAbruptly) {
532                cursor.close();
533            }
534        }
535    }
536
537    protected Document justPresenceField() {
538        return new Document(MONGODB_ID, ONE);
539    }
540
541    @Override
542    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
543            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
544        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
545        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
546                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
547        builder.walk();
548        if (builder.hasFulltext && isFulltextDisabled()) {
549            throw new QueryParseException("Fulltext search disabled by configuration");
550        }
551        Document filter = builder.getQuery();
552        addPrincipals(filter, evaluator.principals);
553        Bson orderBy = builder.getOrderBy();
554        Bson keys = builder.getProjection();
555        // Don't do manual projection if there are no projection wildcards, as this brings no new
556        // information and is costly. The only difference is several identical rows instead of one.
557        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
558        if (manualProjection) {
559            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
560            // so we need the full state from the database
561            keys = null;
562            evaluator.parse();
563        }
564
565        if (log.isTraceEnabled()) {
566            logQuery(filter, keys, orderBy, limit, offset);
567        }
568
569        List<Map<String, Serializable>> projections;
570        long totalSize;
571        try (MongoCursor<Document> cursor = coll.find(filter)
572                                                .projection(keys)
573                                                .skip(offset)
574                                                .limit(limit)
575                                                .sort(orderBy)
576                                                .iterator()) {
577            projections = new ArrayList<>();
578            DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys);
579            Iterable<Document> docs = () -> cursor;
580            for (Document doc : docs) {
581                State state = converter.bsonToState(doc);
582                if (manualProjection) {
583                    projections.addAll(evaluator.matches(state));
584                } else {
585                    projections.add(flattener.flatten(state));
586                }
587            }
588        }
589        if (countUpTo == -1) {
590            // count full size
591            if (limit == 0) {
592                totalSize = projections.size();
593            } else if (manualProjection) {
594                totalSize = -1; // unknown due to manual projection
595            } else {
596                totalSize = coll.count(filter);
597            }
598        } else if (countUpTo == 0) {
599            // no count
600            totalSize = -1; // not counted
601        } else {
602            // count only if less than countUpTo
603            if (limit == 0) {
604                totalSize = projections.size();
605            } else if (manualProjection) {
606                totalSize = -1; // unknown due to manual projection
607            } else {
608                totalSize = coll.count(filter, new CountOptions().limit(countUpTo + 1));
609            }
610            if (totalSize > countUpTo) {
611                totalSize = -2; // truncated
612            }
613        }
614        if (log.isTraceEnabled() && projections.size() != 0) {
615            log.trace("MongoDB:    -> " + projections.size());
616        }
617        return new PartialList<>(projections, totalSize);
618    }
619
620    @Override
621    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
622        cursorService.checkForTimedOutScroll();
623        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
624                evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
625        builder.walk();
626        if (builder.hasFulltext && isFulltextDisabled()) {
627            throw new QueryParseException("Fulltext search disabled by configuration");
628        }
629        Bson filter = builder.getQuery();
630        Bson keys = builder.getProjection();
631        if (log.isTraceEnabled()) {
632            logQuery(filter, keys, null, 0, 0);
633        }
634
635        MongoCursor<Document> cursor = coll.find(filter).projection(keys).batchSize(batchSize).iterator();
636        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
637        return scroll(scrollId);
638    }
639
640    @Override
641    public ScrollResult<String> scroll(String scrollId) {
642        return cursorService.scroll(scrollId);
643    }
644
645    protected void addPrincipals(Document query, Set<String> principals) {
646        if (principals != null) {
647            Document inPrincipals = new Document(QueryOperators.IN, new ArrayList<>(principals));
648            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
649        }
650    }
651
652    /** Keys used for document projection when marking all binaries for GC. */
653    protected Bson binaryKeys;
654
655    @Override
656    protected void initBlobsPaths() {
657        MongoDBBlobFinder finder = new MongoDBBlobFinder();
658        finder.visit();
659        binaryKeys = Projections.fields(finder.binaryKeys);
660    }
661
662    protected static class MongoDBBlobFinder extends BlobFinder {
663        protected List<Bson> binaryKeys = new ArrayList<>(Collections.singleton(Projections.excludeId()));
664
665        @Override
666        protected void recordBlobPath() {
667            path.addLast(KEY_BLOB_DATA);
668            binaryKeys.add(Projections.include(StringUtils.join(path, ".")));
669            path.removeLast();
670        }
671    }
672
673    @Override
674    public void markReferencedBinaries() {
675        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
676        // TODO add a query to not scan all documents
677        if (log.isTraceEnabled()) {
678            logQuery(new Document(), binaryKeys);
679        }
680        Block<Document> block = doc -> markReferencedBinaries(doc, blobManager);
681        coll.find().projection(binaryKeys).forEach(block);
682    }
683
684    protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) {
685        for (String key : ob.keySet()) {
686            Object value = ob.get(key);
687            if (value instanceof List) {
688                @SuppressWarnings("unchecked")
689                List<Object> list = (List<Object>) value;
690                for (Object v : list) {
691                    if (v instanceof Document) {
692                        markReferencedBinaries((Document) v, blobManager);
693                    } else {
694                        markReferencedBinary(v, blobManager);
695                    }
696                }
697            } else if (value instanceof Object[]) {
698                for (Object v : (Object[]) value) {
699                    markReferencedBinary(v, blobManager);
700                }
701            } else if (value instanceof Document) {
702                markReferencedBinaries((Document) value, blobManager);
703            } else {
704                markReferencedBinary(value, blobManager);
705            }
706        }
707    }
708
709    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
710        if (!(value instanceof String)) {
711            return;
712        }
713        String key = (String) value;
714        blobManager.markReferencedBinary(key, repositoryName);
715    }
716
717    protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED);
718
719    protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER),
720            Updates.unset(KEY_LOCK_CREATED));
721
722    @Override
723    public Lock getLock(String id) {
724        if (log.isTraceEnabled()) {
725            logQuery(id, LOCK_FIELDS);
726        }
727        Document res = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
728        if (res == null) {
729            // document not found
730            throw new DocumentNotFoundException(id);
731        }
732        String owner = res.getString(KEY_LOCK_OWNER);
733        if (owner == null) {
734            // not locked
735            return null;
736        }
737        Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED));
738        return new Lock(owner, created);
739    }
740
741    @Override
742    public Lock setLock(String id, Lock lock) {
743        Bson filter = Filters.and( //
744                Filters.eq(idKey, id), //
745                Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set
746        );
747        Bson setLock = Updates.combine( //
748                Updates.set(KEY_LOCK_OWNER, lock.getOwner()), //
749                Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())) //
750        );
751        if (log.isTraceEnabled()) {
752            log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + setLock);
753        }
754        Document res = coll.findOneAndUpdate(filter, setLock);
755        if (res != null) {
756            // found a doc to lock
757            return null;
758        } else {
759            // doc not found, or lock owner already set
760            // get the old lock
761            if (log.isTraceEnabled()) {
762                logQuery(id, LOCK_FIELDS);
763            }
764            Document old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
765            if (old == null) {
766                // document not found
767                throw new DocumentNotFoundException(id);
768            }
769            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
770            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
771            if (oldOwner != null) {
772                return new Lock(oldOwner, oldCreated);
773            }
774            // no lock -- there was a race condition
775            // TODO do better
776            throw new ConcurrentUpdateException("Lock " + id);
777        }
778    }
779
780    @Override
781    public Lock removeLock(String id, String owner) {
782        Document filter = new Document(idKey, id);
783        if (owner != null) {
784            // remove if owner matches or null
785            // implements LockManager.canLockBeRemoved inside MongoDB
786            Object ownerOrNull = Arrays.asList(owner, null);
787            filter.put(KEY_LOCK_OWNER, new Document(QueryOperators.IN, ownerOrNull));
788        } // else unconditional remove
789        // remove the lock
790        if (log.isTraceEnabled()) {
791            log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + UNSET_LOCK_UPDATE);
792        }
793        Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE);
794        if (old != null) {
795            // found a doc and removed the lock, return previous lock
796            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
797            if (oldOwner == null) {
798                // was not locked
799                return null;
800            } else {
801                // return previous lock
802                Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
803                return new Lock(oldOwner, oldCreated);
804            }
805        } else {
806            // doc not found, or lock owner didn't match
807            // get the old lock
808            if (log.isTraceEnabled()) {
809                logQuery(id, LOCK_FIELDS);
810            }
811            old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
812            if (old == null) {
813                // document not found
814                throw new DocumentNotFoundException(id);
815            }
816            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
817            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
818            if (oldOwner != null) {
819                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
820                    // existing mismatched lock, flag failure
821                    return new Lock(oldOwner, oldCreated, true);
822                }
823                // old owner should have matched -- there was a race condition
824                // TODO do better
825                throw new ConcurrentUpdateException("Unlock " + id);
826            }
827            // old owner null, should have matched -- there was a race condition
828            // TODO do better
829            throw new ConcurrentUpdateException("Unlock " + id);
830        }
831    }
832
833    @Override
834    public void closeLockManager() {
835
836    }
837
838    @Override
839    public void clearLockManagerCaches() {
840    }
841
842}