001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
041
042import java.io.Serializable;
043import java.util.ArrayList;
044import java.util.Arrays;
045import java.util.Calendar;
046import java.util.Collection;
047import java.util.Collections;
048import java.util.HashSet;
049import java.util.List;
050import java.util.Map;
051import java.util.Set;
052import java.util.Spliterators;
053import java.util.UUID;
054import java.util.stream.Collectors;
055import java.util.stream.Stream;
056import java.util.stream.StreamSupport;
057
058import javax.resource.spi.ConnectionManager;
059
060import org.apache.commons.lang.StringUtils;
061import org.apache.commons.logging.Log;
062import org.apache.commons.logging.LogFactory;
063import org.bson.Document;
064import org.bson.conversions.Bson;
065import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
066import org.nuxeo.ecm.core.api.CursorService;
067import org.nuxeo.ecm.core.api.DocumentNotFoundException;
068import org.nuxeo.ecm.core.api.Lock;
069import org.nuxeo.ecm.core.api.NuxeoException;
070import org.nuxeo.ecm.core.api.PartialList;
071import org.nuxeo.ecm.core.api.ScrollResult;
072import org.nuxeo.ecm.core.blob.DocumentBlobManager;
073import org.nuxeo.ecm.core.model.LockManager;
074import org.nuxeo.ecm.core.model.Repository;
075import org.nuxeo.ecm.core.query.QueryParseException;
076import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
077import org.nuxeo.ecm.core.storage.State;
078import org.nuxeo.ecm.core.storage.State.StateDiff;
079import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
080import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
081import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
082import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
083import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
084import org.nuxeo.runtime.api.Framework;
085import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
086
087import com.mongodb.Block;
088import com.mongodb.QueryOperators;
089import com.mongodb.client.FindIterable;
090import com.mongodb.client.MongoCollection;
091import com.mongodb.client.MongoCursor;
092import com.mongodb.client.MongoDatabase;
093import com.mongodb.client.MongoIterable;
094import com.mongodb.client.model.CountOptions;
095import com.mongodb.client.model.Filters;
096import com.mongodb.client.model.FindOneAndUpdateOptions;
097import com.mongodb.client.model.IndexOptions;
098import com.mongodb.client.model.Indexes;
099import com.mongodb.client.model.Projections;
100import com.mongodb.client.model.ReturnDocument;
101import com.mongodb.client.model.Updates;
102import com.mongodb.client.result.DeleteResult;
103import com.mongodb.client.result.UpdateResult;
104
105/**
106 * MongoDB implementation of a {@link Repository}.
107 *
108 * @since 5.9.4
109 */
110public class MongoDBRepository extends DBSRepositoryBase {
111
112    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
113
114    /**
115     * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}.
116     * <p />
117     * The connection id will be {@code repository/[REPOSITORY_NAME]}.
118     */
119    public static final String REPOSITORY_CONNECTION_PREFIX = "repository/";
120
121    public static final Long LONG_ZERO = Long.valueOf(0);
122
123    public static final Double ZERO = Double.valueOf(0);
124
125    public static final Double ONE = Double.valueOf(1);
126
127    public static final String MONGODB_ID = "_id";
128
129    public static final String MONGODB_INC = "$inc";
130
131    public static final String MONGODB_SET = "$set";
132
133    public static final String MONGODB_UNSET = "$unset";
134
135    public static final String MONGODB_PUSH = "$push";
136
137    public static final String MONGODB_EACH = "$each";
138
139    public static final String MONGODB_META = "$meta";
140
141    public static final String MONGODB_TEXT_SCORE = "textScore";
142
143    private static final String FULLTEXT_INDEX_NAME = "fulltext";
144
145    private static final String LANGUAGE_FIELD = "__language";
146
147    protected static final String COUNTER_NAME_UUID = "ecm:id";
148
149    protected static final String COUNTER_FIELD = "seq";
150
151    protected final MongoCollection<Document> coll;
152
153    protected final MongoCollection<Document> countersColl;
154
155    /** The key to use to store the id in the database. */
156    protected String idKey;
157
158    /** True if we don't use MongoDB's native "_id" key to store the id. */
159    protected boolean useCustomId;
160
161    /** Number of values still available in the in-memory sequence. */
162    protected long sequenceLeft;
163
164    /** Last value used from the in-memory sequence. */
165    protected long sequenceLastValue;
166
167    /** Sequence allocation block size. */
168    protected long sequenceBlockSize;
169
170    protected final MongoDBConverter converter;
171
172    protected final CursorService<MongoCursor<Document>, Document, String> cursorService;
173
174    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
175        super(cm, descriptor.name, descriptor);
176        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
177        // prefix with repository/ to group repository connection
178        MongoDatabase database = mongoService.getDatabase(REPOSITORY_CONNECTION_PREFIX + descriptor.name);
179        coll = database.getCollection(descriptor.name);
180        countersColl = database.getCollection(descriptor.name + ".counters");
181        if (Boolean.TRUE.equals(descriptor.nativeId)) {
182            idKey = MONGODB_ID;
183        } else {
184            idKey = KEY_ID;
185        }
186        useCustomId = KEY_ID.equals(idKey);
187        if (idType == IdType.sequence || DEBUG_UUIDS) {
188            Integer sbs = descriptor.sequenceBlockSize;
189            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
190            sequenceLeft = 0;
191        }
192        converter = new MongoDBConverter(idKey);
193        cursorService = new CursorService<>(ob -> (String) ob.get(converter.keyToBson(KEY_ID)));
194        initRepository();
195    }
196
197    @Override
198    public List<IdType> getAllowedIdTypes() {
199        return Arrays.asList(IdType.varchar, IdType.sequence);
200    }
201
202    @Override
203    public void shutdown() {
204        super.shutdown();
205        cursorService.clear();
206    }
207
208    protected void initRepository() {
209        // create required indexes
210        // code does explicit queries on those
211        if (useCustomId) {
212            coll.createIndex(Indexes.ascending(idKey));
213        }
214        coll.createIndex(Indexes.ascending(KEY_PARENT_ID));
215        coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS));
216        coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID));
217        coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID));
218        coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID));
219        coll.createIndex(Indexes.ascending(KEY_READ_ACL));
220        coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME));
221        // often used in user-generated queries
222        coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE));
223        coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE));
224        coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID));
225        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER));
226        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS));
227        // TODO configure these from somewhere else
228        coll.createIndex(Indexes.descending("dc:modified"));
229        coll.createIndex(Indexes.ascending("rend:renditionName"));
230        coll.createIndex(Indexes.ascending("drv:subscriptions.enabled"));
231        coll.createIndex(Indexes.ascending("collectionMember:collectionIds"));
232        coll.createIndex(Indexes.ascending("nxtag:tags"));
233        if (!isFulltextDisabled()) {
234            Bson indexKeys = Indexes.compoundIndex( //
235                    Indexes.text(KEY_FULLTEXT_SIMPLE), //
236                    Indexes.text(KEY_FULLTEXT_BINARY) //
237            );
238            IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD);
239            coll.createIndex(indexKeys, indexOptions);
240        }
241        // check root presence
242        if (coll.count(Filters.eq(idKey, getRootId())) > 0) {
243            return;
244        }
245        // create basic repository structure needed
246        if (idType == IdType.sequence || DEBUG_UUIDS) {
247            // create the id counter
248            Document idCounter = new Document();
249            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
250            idCounter.put(COUNTER_FIELD, LONG_ZERO);
251            countersColl.insertOne(idCounter);
252        }
253        initRoot();
254    }
255
256    protected synchronized Long getNextSequenceId() {
257        if (sequenceLeft == 0) {
258            // allocate a new sequence block
259            // the database contains the last value from the last block
260            Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID);
261            Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize));
262            Document idCounter = countersColl.findOneAndUpdate(filter, update,
263                    new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
264            if (idCounter == null) {
265                throw new NuxeoException("Repository id counter not initialized");
266            }
267            sequenceLeft = sequenceBlockSize;
268            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
269        }
270        sequenceLeft--;
271        sequenceLastValue++;
272        return Long.valueOf(sequenceLastValue);
273    }
274
275    @Override
276    public String generateNewId() {
277        if (idType == IdType.sequence || DEBUG_UUIDS) {
278            Long id = getNextSequenceId();
279            if (DEBUG_UUIDS) {
280                return "UUID_" + id;
281            }
282            return id.toString();
283        } else {
284            return UUID.randomUUID().toString();
285        }
286    }
287
288    @Override
289    public void createState(State state) {
290        Document doc = converter.stateToBson(state);
291        if (log.isTraceEnabled()) {
292            log.trace("MongoDB: CREATE " + doc.get(idKey) + ": " + doc);
293        }
294        coll.insertOne(doc);
295        // TODO dupe exception
296        // throw new DocumentException("Already exists: " + id);
297    }
298
299    @Override
300    public void createStates(List<State> states) {
301        List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList());
302        if (log.isTraceEnabled()) {
303            log.trace("MongoDB: CREATE ["
304                    + docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", "))
305                    + "]: " + docs);
306        }
307        coll.insertMany(docs);
308    }
309
310    @Override
311    public State readState(String id) {
312        return findOne(Filters.eq(idKey, id));
313    }
314
315    @Override
316    public State readPartialState(String id, Collection<String> keys) {
317        Document fields = new Document();
318        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
319        return findOne(Filters.eq(idKey, id), fields);
320    }
321
322    @Override
323    public List<State> readStates(List<String> ids) {
324        return findAll(Filters.in(idKey, ids));
325    }
326
327    @Override
328    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
329        List<Document> updates = converter.diffToBson(diff);
330        for (Document update : updates) {
331            Document filter = new Document(idKey, id);
332            if (changeTokenUpdater == null) {
333                if (log.isTraceEnabled()) {
334                    log.trace("MongoDB: UPDATE " + id + ": " + update);
335                }
336            } else {
337                // assume bson is identical to dbs internals
338                // condition works even if value is null
339                Map<String, Serializable> conditions = changeTokenUpdater.getConditions();
340                Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates();
341                if (update.containsKey(MONGODB_SET)) {
342                    ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates);
343                } else {
344                    Document set = new Document();
345                    set.putAll(tokenUpdates);
346                    update.put(MONGODB_SET, set);
347                }
348                if (log.isTraceEnabled()) {
349                    log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update);
350                }
351                filter.putAll(conditions);
352            }
353            UpdateResult w = coll.updateMany(filter, update);
354            if (w.getModifiedCount() != 1) {
355                log.trace("MongoDB:    -> CONCURRENT UPDATE: " + id);
356                throw new ConcurrentUpdateException(id);
357            }
358            // TODO dupe exception
359            // throw new DocumentException("Missing: " + id);
360        }
361    }
362
363    @Override
364    public void deleteStates(Set<String> ids) {
365        Bson filter = Filters.in(idKey, ids);
366        if (log.isTraceEnabled()) {
367            log.trace("MongoDB: REMOVE " + ids);
368        }
369        DeleteResult w = coll.deleteMany(filter);
370        if (w.getDeletedCount() != ids.size()) {
371            if (log.isDebugEnabled()) {
372                log.debug("Removed " + w.getDeletedCount() + " docs for " + ids.size() + " ids: " + ids);
373            }
374        }
375    }
376
377    @Override
378    public State readChildState(String parentId, String name, Set<String> ignored) {
379        Bson filter = getChildQuery(parentId, name, ignored);
380        return findOne(filter);
381    }
382
383    protected void logQuery(String id, Bson fields) {
384        logQuery(Filters.eq(idKey, id), fields);
385    }
386
387    protected void logQuery(Bson filter, Bson fields) {
388        if (fields == null) {
389            log.trace("MongoDB: QUERY " + filter);
390        } else {
391            log.trace("MongoDB: QUERY " + filter + " KEYS " + fields);
392        }
393    }
394
395    protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) {
396        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
397                + " OFFSET " + offset + " LIMIT " + limit);
398    }
399
400    @Override
401    public boolean hasChild(String parentId, String name, Set<String> ignored) {
402        Document filter = getChildQuery(parentId, name, ignored);
403        return exists(filter);
404    }
405
406    protected Document getChildQuery(String parentId, String name, Set<String> ignored) {
407        Document filter = new Document();
408        filter.put(KEY_PARENT_ID, parentId);
409        filter.put(KEY_NAME, name);
410        addIgnoredIds(filter, ignored);
411        return filter;
412    }
413
414    protected void addIgnoredIds(Document filter, Set<String> ignored) {
415        if (!ignored.isEmpty()) {
416            Document notInIds = new Document(QueryOperators.NIN, new ArrayList<>(ignored));
417            filter.put(idKey, notInIds);
418        }
419    }
420
421    @Override
422    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
423        Document filter = new Document(converter.keyToBson(key), value);
424        addIgnoredIds(filter, ignored);
425        return findAll(filter);
426    }
427
428    @Override
429    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
430        Document filter = new Document(converter.keyToBson(key1), value1);
431        filter.put(converter.keyToBson(key2), value2);
432        addIgnoredIds(filter, ignored);
433        return findAll(filter);
434    }
435
436    @Override
437    public Stream<State> getDescendants(String rootId, Set<String> keys) {
438        return getDescendants(rootId, keys, 0);
439    }
440
441    @Override
442    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
443        Bson filter = Filters.eq(KEY_ANCESTOR_IDS, rootId);
444        Document fields = new Document();
445        if (useCustomId) {
446            fields.put(MONGODB_ID, ZERO);
447        }
448        fields.put(idKey, ONE);
449        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
450        return stream(filter, fields, limit);
451    }
452
453    @Override
454    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
455        Document filter = new Document(converter.keyToBson(key), value);
456        addIgnoredIds(filter, ignored);
457        return exists(filter);
458    }
459
460    protected boolean exists(Bson filter) {
461        return exists(filter, justPresenceField());
462    }
463
464    protected boolean exists(Bson filter, Bson projection) {
465        if (log.isTraceEnabled()) {
466            logQuery(filter, projection);
467        }
468        return coll.find(filter).projection(projection).first() != null;
469    }
470
471    protected State findOne(Bson filter) {
472        return findOne(filter, null);
473    }
474
475    protected State findOne(Bson filter, Bson projection) {
476        try (Stream<State> stream = stream(filter, projection)) {
477            return stream.findAny().orElse(null);
478        }
479    }
480
481    protected List<State> findAll(Bson filter) {
482        try (Stream<State> stream = stream(filter)) {
483            return stream.collect(Collectors.toList());
484        }
485    }
486
487    protected Stream<State> stream(Bson filter) {
488        return stream(filter, null, 0);
489    }
490
491    protected Stream<State> stream(Bson filter, Bson projection) {
492        return stream(filter, projection, 0);
493    }
494
495    /**
496     * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}.
497     * <p />
498     * We should rely on this method, because it correctly handles cursor closed state.
499     * <p />
500     * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case
501     * on some call to {@link MongoIterable#first()}.
502     *
503     * @return a closeable {@link Stream} instance linked to {@link MongoCursor}
504     */
505    protected Stream<State> stream(Bson filter, Bson projection, int limit) {
506        if (filter == null) {
507            // empty filter
508            filter = new Document();
509        }
510        // it's ok if projection is null
511        if (log.isTraceEnabled()) {
512            logQuery(filter, projection);
513        }
514
515        boolean completedAbruptly = true;
516        MongoCursor<Document> cursor = coll.find(filter).limit(limit).projection(projection).iterator();
517        try {
518            Set<String> seen = new HashSet<>();
519            Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) //
520                                                .onClose(cursor::close)
521                                                .filter(doc -> seen.add(doc.getString(idKey)))
522                                                // MongoDB cursors may return the same
523                                                // object several times
524                                                .map(converter::bsonToState);
525            // the stream takes responsibility for closing the session
526            completedAbruptly = false;
527            return stream;
528        } finally {
529            if (completedAbruptly) {
530                cursor.close();
531            }
532        }
533    }
534
535    protected Document justPresenceField() {
536        return new Document(MONGODB_ID, ONE);
537    }
538
539    @Override
540    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
541            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
542        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
543        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
544                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
545        builder.walk();
546        if (builder.hasFulltext && isFulltextDisabled()) {
547            throw new QueryParseException("Fulltext search disabled by configuration");
548        }
549        Document filter = builder.getQuery();
550        addPrincipals(filter, evaluator.principals);
551        Bson orderBy = builder.getOrderBy();
552        Bson keys = builder.getProjection();
553        // Don't do manual projection if there are no projection wildcards, as this brings no new
554        // information and is costly. The only difference is several identical rows instead of one.
555        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
556        if (manualProjection) {
557            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
558            // so we need the full state from the database
559            keys = null;
560            evaluator.parse();
561        }
562
563        if (log.isTraceEnabled()) {
564            logQuery(filter, keys, orderBy, limit, offset);
565        }
566
567        List<Map<String, Serializable>> projections;
568        long totalSize;
569        try (MongoCursor<Document> cursor = coll.find(filter)
570                                                .projection(keys)
571                                                .skip(offset)
572                                                .limit(limit)
573                                                .sort(orderBy)
574                                                .iterator()) {
575            projections = new ArrayList<>();
576            DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys);
577            Iterable<Document> docs = () -> cursor;
578            for (Document doc : docs) {
579                State state = converter.bsonToState(doc);
580                if (manualProjection) {
581                    projections.addAll(evaluator.matches(state));
582                } else {
583                    projections.add(flattener.flatten(state));
584                }
585            }
586        }
587        if (countUpTo == -1) {
588            // count full size
589            if (limit == 0) {
590                totalSize = projections.size();
591            } else if (manualProjection) {
592                totalSize = -1; // unknown due to manual projection
593            } else {
594                totalSize = coll.count(filter);
595            }
596        } else if (countUpTo == 0) {
597            // no count
598            totalSize = -1; // not counted
599        } else {
600            // count only if less than countUpTo
601            if (limit == 0) {
602                totalSize = projections.size();
603            } else if (manualProjection) {
604                totalSize = -1; // unknown due to manual projection
605            } else {
606                totalSize = coll.count(filter, new CountOptions().limit(countUpTo + 1));
607            }
608            if (totalSize > countUpTo) {
609                totalSize = -2; // truncated
610            }
611        }
612        if (log.isTraceEnabled() && projections.size() != 0) {
613            log.trace("MongoDB:    -> " + projections.size());
614        }
615        return new PartialList<>(projections, totalSize);
616    }
617
618    @Override
619    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
620        cursorService.checkForTimedOutScroll();
621        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
622                evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
623        builder.walk();
624        if (builder.hasFulltext && isFulltextDisabled()) {
625            throw new QueryParseException("Fulltext search disabled by configuration");
626        }
627        Bson filter = builder.getQuery();
628        Bson keys = builder.getProjection();
629        if (log.isTraceEnabled()) {
630            logQuery(filter, keys, null, 0, 0);
631        }
632
633        MongoCursor<Document> cursor = coll.find(filter).projection(keys).batchSize(batchSize).iterator();
634        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
635        return scroll(scrollId);
636    }
637
638    @Override
639    public ScrollResult<String> scroll(String scrollId) {
640        return cursorService.scroll(scrollId);
641    }
642
643    protected void addPrincipals(Document query, Set<String> principals) {
644        if (principals != null) {
645            Document inPrincipals = new Document(QueryOperators.IN, new ArrayList<>(principals));
646            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
647        }
648    }
649
650    /** Keys used for document projection when marking all binaries for GC. */
651    protected Bson binaryKeys;
652
653    @Override
654    protected void initBlobsPaths() {
655        MongoDBBlobFinder finder = new MongoDBBlobFinder();
656        finder.visit();
657        binaryKeys = Projections.fields(finder.binaryKeys);
658    }
659
660    protected static class MongoDBBlobFinder extends BlobFinder {
661        protected List<Bson> binaryKeys = new ArrayList<>(Collections.singleton(Projections.excludeId()));
662
663        @Override
664        protected void recordBlobPath() {
665            path.addLast(KEY_BLOB_DATA);
666            binaryKeys.add(Projections.include(StringUtils.join(path, ".")));
667            path.removeLast();
668        }
669    }
670
671    @Override
672    public void markReferencedBinaries() {
673        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
674        // TODO add a query to not scan all documents
675        if (log.isTraceEnabled()) {
676            logQuery(new Document(), binaryKeys);
677        }
678        Block<Document> block = doc -> markReferencedBinaries(doc, blobManager);
679        coll.find().projection(binaryKeys).forEach(block);
680    }
681
682    protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) {
683        for (String key : ob.keySet()) {
684            Object value = ob.get(key);
685            if (value instanceof List) {
686                @SuppressWarnings("unchecked")
687                List<Object> list = (List<Object>) value;
688                for (Object v : list) {
689                    if (v instanceof Document) {
690                        markReferencedBinaries((Document) v, blobManager);
691                    } else {
692                        markReferencedBinary(v, blobManager);
693                    }
694                }
695            } else if (value instanceof Object[]) {
696                for (Object v : (Object[]) value) {
697                    markReferencedBinary(v, blobManager);
698                }
699            } else if (value instanceof Document) {
700                markReferencedBinaries((Document) value, blobManager);
701            } else {
702                markReferencedBinary(value, blobManager);
703            }
704        }
705    }
706
707    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
708        if (!(value instanceof String)) {
709            return;
710        }
711        String key = (String) value;
712        blobManager.markReferencedBinary(key, repositoryName);
713    }
714
715    protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED);
716
717    protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER),
718            Updates.unset(KEY_LOCK_CREATED));
719
720    @Override
721    public Lock getLock(String id) {
722        if (log.isTraceEnabled()) {
723            logQuery(id, LOCK_FIELDS);
724        }
725        Document res = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
726        if (res == null) {
727            // document not found
728            throw new DocumentNotFoundException(id);
729        }
730        String owner = res.getString(KEY_LOCK_OWNER);
731        if (owner == null) {
732            // not locked
733            return null;
734        }
735        Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED));
736        return new Lock(owner, created);
737    }
738
739    @Override
740    public Lock setLock(String id, Lock lock) {
741        Bson filter = Filters.and( //
742                Filters.eq(idKey, id), //
743                Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set
744        );
745        Bson setLock = Updates.combine( //
746                Updates.set(KEY_LOCK_OWNER, lock.getOwner()), //
747                Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())) //
748        );
749        if (log.isTraceEnabled()) {
750            log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + setLock);
751        }
752        Document res = coll.findOneAndUpdate(filter, setLock);
753        if (res != null) {
754            // found a doc to lock
755            return null;
756        } else {
757            // doc not found, or lock owner already set
758            // get the old lock
759            if (log.isTraceEnabled()) {
760                logQuery(id, LOCK_FIELDS);
761            }
762            Document old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
763            if (old == null) {
764                // document not found
765                throw new DocumentNotFoundException(id);
766            }
767            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
768            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
769            if (oldOwner != null) {
770                return new Lock(oldOwner, oldCreated);
771            }
772            // no lock -- there was a race condition
773            // TODO do better
774            throw new ConcurrentUpdateException("Lock " + id);
775        }
776    }
777
778    @Override
779    public Lock removeLock(String id, String owner) {
780        Document filter = new Document(idKey, id);
781        if (owner != null) {
782            // remove if owner matches or null
783            // implements LockManager.canLockBeRemoved inside MongoDB
784            Object ownerOrNull = Arrays.asList(owner, null);
785            filter.put(KEY_LOCK_OWNER, new Document(QueryOperators.IN, ownerOrNull));
786        } // else unconditional remove
787        // remove the lock
788        if (log.isTraceEnabled()) {
789            log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + UNSET_LOCK_UPDATE);
790        }
791        Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE);
792        if (old != null) {
793            // found a doc and removed the lock, return previous lock
794            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
795            if (oldOwner == null) {
796                // was not locked
797                return null;
798            } else {
799                // return previous lock
800                Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
801                return new Lock(oldOwner, oldCreated);
802            }
803        } else {
804            // doc not found, or lock owner didn't match
805            // get the old lock
806            if (log.isTraceEnabled()) {
807                logQuery(id, LOCK_FIELDS);
808            }
809            old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first();
810            if (old == null) {
811                // document not found
812                throw new DocumentNotFoundException(id);
813            }
814            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
815            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
816            if (oldOwner != null) {
817                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
818                    // existing mismatched lock, flag failure
819                    return new Lock(oldOwner, oldCreated, true);
820                }
821                // old owner should have matched -- there was a race condition
822                // TODO do better
823                throw new ConcurrentUpdateException("Unlock " + id);
824            }
825            // old owner null, should have matched -- there was a race condition
826            // TODO do better
827            throw new ConcurrentUpdateException("Unlock " + id);
828        }
829    }
830
831    @Override
832    public void closeLockManager() {
833
834    }
835
836    @Override
837    public void clearLockManagerCaches() {
838    }
839
840}