001/*
002 * (C) Copyright 2014-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static com.mongodb.ErrorCategory.DUPLICATE_KEY;
022import static com.mongodb.ErrorCategory.fromErrorCode;
023import static java.util.concurrent.TimeUnit.MILLISECONDS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_TRASHED;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_RETAIN_UNTIL;
044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
045import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.COUNTER_FIELD;
046import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.COUNTER_NAME_UUID;
047import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.FULLTEXT_INDEX_NAME;
048import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.LANGUAGE_FIELD;
049import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.MONGODB_ID;
050import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.MONGODB_SET;
051import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.ONE;
052import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.ZERO;
053
054import java.io.Serializable;
055import java.security.SecureRandom;
056import java.util.ArrayList;
057import java.util.Arrays;
058import java.util.Calendar;
059import java.util.Collection;
060import java.util.HashSet;
061import java.util.List;
062import java.util.Map;
063import java.util.Random;
064import java.util.Set;
065import java.util.Spliterators;
066import java.util.UUID;
067import java.util.stream.Collectors;
068import java.util.stream.Stream;
069import java.util.stream.StreamSupport;
070
071import org.apache.logging.log4j.LogManager;
072import org.apache.logging.log4j.Logger;
073import org.bson.Document;
074import org.bson.conversions.Bson;
075import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
076import org.nuxeo.ecm.core.api.DocumentNotFoundException;
077import org.nuxeo.ecm.core.api.Lock;
078import org.nuxeo.ecm.core.api.NuxeoException;
079import org.nuxeo.ecm.core.api.PartialList;
080import org.nuxeo.ecm.core.api.ScrollResult;
081import org.nuxeo.ecm.core.api.lock.LockManager;
082import org.nuxeo.ecm.core.query.QueryParseException;
083import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
084import org.nuxeo.ecm.core.storage.State;
085import org.nuxeo.ecm.core.storage.State.StateDiff;
086import org.nuxeo.ecm.core.storage.dbs.DBSConnection;
087import org.nuxeo.ecm.core.storage.dbs.DBSConnectionBase;
088import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
089import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
090import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase.IdType;
091import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
092import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
093import org.nuxeo.runtime.mongodb.MongoDBOperators;
094
095import com.mongodb.DuplicateKeyException;
096import com.mongodb.MongoBulkWriteException;
097import com.mongodb.MongoExecutionTimeoutException;
098import com.mongodb.MongoWriteException;
099import com.mongodb.bulk.BulkWriteError;
100import com.mongodb.client.ClientSession;
101import com.mongodb.client.FindIterable;
102import com.mongodb.client.MongoCollection;
103import com.mongodb.client.MongoCursor;
104import com.mongodb.client.MongoIterable;
105import com.mongodb.client.model.CountOptions;
106import com.mongodb.client.model.Filters;
107import com.mongodb.client.model.FindOneAndUpdateOptions;
108import com.mongodb.client.model.IndexOptions;
109import com.mongodb.client.model.Indexes;
110import com.mongodb.client.model.Projections;
111import com.mongodb.client.model.ReturnDocument;
112import com.mongodb.client.model.Updates;
113import com.mongodb.client.result.DeleteResult;
114import com.mongodb.client.result.UpdateResult;
115
116/**
117 * MongoDB implementation of a {@link DBSConnection}.
118 *
119 * @since 11.1 (introduce in 5.9.4 as MongoDBRepository)
120 */
121public class MongoDBConnection extends DBSConnectionBase {
122
123    private static final Logger log = LogManager.getLogger(MongoDBConnection.class);
124
125    protected static final Random RANDOM = new SecureRandom();
126
127    protected final MongoDBRepository mongoDBRepository;
128
129    protected final MongoCollection<Document> coll;
130
131    /** The key to use to store the id in the database. */
132    protected final String idKey;
133
134    /** True if we don't use MongoDB's native "_id" key to store the id. */
135    protected final boolean useCustomId;
136
137    /** Number of values still available in the in-memory sequence. */
138    protected long sequenceLeft;
139
140    /**
141     * Last value or randomized value used from the in-memory sequence.
142     * <p>
143     * When used as a randomized sequence, this value (and the rest of the next block) may only be used after a
144     * successful update of the in-database version for the next task needing a randomized value.
145     */
146    protected long sequenceLastValue;
147
148    protected final MongoDBConverter converter;
149
150    protected ClientSession clientSession;
151
152    protected boolean transactionStarted;
153
154    public MongoDBConnection(MongoDBRepository repository) {
155        super(repository);
156        mongoDBRepository = repository;
157        coll = repository.getCollection();
158        idKey = repository.getIdKey();
159        useCustomId = KEY_ID.equals(idKey);
160        converter = repository.getConverter();
161        if (repository.supportsSessions()) {
162            clientSession = repository.getClient().startSession();
163        } else {
164            clientSession = null;
165        }
166        initRepository(repository.descriptor);
167    }
168
169    @Override
170    public void close() {
171        if (clientSession != null) {
172            clientSession.close();
173            clientSession = null;
174        }
175    }
176
177    @Override
178    public void begin() {
179        if (clientSession != null) {
180            clientSession.startTransaction();
181            transactionStarted = true;
182        }
183    }
184
185    @Override
186    public void commit() {
187        if (clientSession != null) {
188            try {
189                clientSession.commitTransaction();
190            } finally {
191                transactionStarted = false;
192            }
193        }
194    }
195
196    @Override
197    public void rollback() {
198        if (clientSession != null) {
199            try {
200                clientSession.abortTransaction();
201            } finally {
202                transactionStarted = false;
203            }
204        }
205    }
206
207    /**
208     * Initializes the MongoDB repository
209     *
210     * @param descriptor the MongoDB repository descriptor
211     * @since 11.1
212     */
213    protected void initRepository(MongoDBRepositoryDescriptor descriptor) {
214        // check root presence
215        if (coll.countDocuments(converter.filterEq(KEY_ID, getRootId())) > 0) {
216            return;
217        }
218        // create required indexes
219        // code does explicit queries on those
220        if (useCustomId) {
221            coll.createIndex(Indexes.ascending(KEY_ID), new IndexOptions().unique(true));
222        }
223        coll.createIndex(Indexes.ascending(KEY_PARENT_ID));
224        coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS));
225        coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID));
226        coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID));
227        coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID));
228        coll.createIndex(Indexes.ascending(KEY_READ_ACL));
229        IndexOptions parentNameIndexOptions = new IndexOptions();
230        if (descriptor != null) {
231            parentNameIndexOptions.unique(Boolean.TRUE.equals(descriptor.getChildNameUniqueConstraintEnabled()));
232        }
233        coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME), parentNameIndexOptions);
234        // often used in user-generated queries
235        coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE));
236        coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE));
237        coll.createIndex(Indexes.ascending(KEY_IS_TRASHED));
238        coll.createIndex(Indexes.ascending(KEY_RETAIN_UNTIL));
239        if (!repository.isFulltextDisabled()) {
240            coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID));
241        }
242        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER));
243        coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS));
244        // TODO configure these from somewhere else
245        coll.createIndex(Indexes.descending("dc:modified"));
246        coll.createIndex(Indexes.ascending("rend:renditionName"));
247        coll.createIndex(Indexes.ascending("rend:sourceId"));
248        coll.createIndex(Indexes.ascending("rend:sourceVersionableId"));
249        coll.createIndex(Indexes.ascending("drv:subscriptions.enabled"));
250        coll.createIndex(Indexes.ascending("collectionMember:collectionIds"));
251        coll.createIndex(Indexes.ascending("nxtag:tags"));
252        coll.createIndex(Indexes.ascending("coldstorage:beingRetrieved"));
253        if (!repository.isFulltextSearchDisabled()) {
254            Bson indexKeys = Indexes.compoundIndex( //
255                    Indexes.text(KEY_FULLTEXT_SIMPLE), //
256                    Indexes.text(KEY_FULLTEXT_BINARY) //
257            );
258            IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD);
259            coll.createIndex(indexKeys, indexOptions);
260        }
261        // create basic repository structure needed
262        IdType idType = repository.getIdType();
263        if (idType == IdType.sequence || idType == IdType.sequenceHexRandomized || DBSRepositoryBase.DEBUG_UUIDS) {
264            // create the id counter
265            long counter;
266            if (idType == IdType.sequenceHexRandomized) {
267                counter = randomInitialSeed();
268            } else {
269                counter = 0;
270            }
271            MongoCollection<Document> countersColl = mongoDBRepository.getCountersCollection();
272            Document idCounter = new Document();
273            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
274            idCounter.put(COUNTER_FIELD, Long.valueOf(counter));
275            countersColl.insertOne(idCounter);
276        }
277        initRoot();
278    }
279
280    protected synchronized long getNextSequenceId() {
281        long sequenceBlockSize = mongoDBRepository.sequenceBlockSize;
282        if (repository.getIdType() == IdType.sequence) {
283            if (sequenceLeft == 0) {
284                sequenceLeft = sequenceBlockSize;
285                sequenceLastValue = updateSequence();
286            }
287            sequenceLastValue++;
288        } else { // idType == IdType.sequenceHexRandomized
289            if (sequenceLeft == 0) {
290                sequenceLeft = sequenceBlockSize;
291                sequenceLastValue = updateRandomizedSequence();
292            }
293            sequenceLastValue = xorshift(sequenceLastValue);
294        }
295        sequenceLeft--;
296        return sequenceLastValue;
297    }
298
299    /**
300     * Allocates a new sequence block. The database contains the last value from the last block.
301     */
302    protected long updateSequence() {
303        long sequenceBlockSize = mongoDBRepository.sequenceBlockSize;
304        MongoCollection<Document> countersColl = mongoDBRepository.getCountersCollection();
305        Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID);
306        Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize));
307        Document idCounter = countersColl.findOneAndUpdate(filter, update,
308                new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER));
309        if (idCounter == null) {
310            throw new NuxeoException("Repository id counter not initialized");
311        }
312        return idCounter.getLong(COUNTER_FIELD).longValue() - sequenceBlockSize;
313    }
314
315    /**
316     * Updates the randomized sequence, using xorshift.
317     */
318    protected Long tryUpdateRandomizedSequence() {
319        long sequenceBlockSize = mongoDBRepository.sequenceBlockSize;
320        MongoCollection<Document> countersColl = mongoDBRepository.getCountersCollection();
321        // find the current value
322        Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID);
323        Document res = countersColl.find(filter).first();
324        if (res == null) {
325            throw new NuxeoException("Failed to read " + filter + " in collection " + countersColl.getNamespace());
326        }
327        Long lastValue = res.getLong(COUNTER_FIELD);
328        // find the next value after this block is done
329        long newValue = xorshift(lastValue, sequenceBlockSize);
330        // store the next value for whoever needs it next
331        Bson updateFilter = Filters.and( //
332                filter, //
333                Filters.eq(COUNTER_FIELD, lastValue) //
334        );
335        Bson update = Updates.set(COUNTER_FIELD, newValue);
336        log.trace("MongoDB: FINDANDMODIFY {} UPDATE {}", updateFilter, update);
337        boolean updated = countersColl.findOneAndUpdate(updateFilter, update) != null;
338        if (updated) {
339            return lastValue;
340        } else {
341            log.trace("MongoDB:    -> FAILED (will retry)");
342            return null;
343        }
344    }
345
346    protected static final int NB_TRY = 15;
347
348    protected long updateRandomizedSequence() {
349        long sleepDuration = 1; // start with 1ms
350        for (int i = 0; i < NB_TRY; i++) {
351            Long value = tryUpdateRandomizedSequence();
352            if (value != null) {
353                return value.longValue();
354            }
355            try {
356                Thread.sleep(sleepDuration);
357            } catch (InterruptedException e) {
358                Thread.currentThread().interrupt();
359                throw new NuxeoException();
360            }
361            sleepDuration *= 2; // exponential backoff
362            sleepDuration += System.nanoTime() % 4; // random jitter
363        }
364        throw new ConcurrentUpdateException("Failed to update randomized sequence");
365    }
366
367    /** Initial seed generation. */
368    protected long randomInitialSeed() {
369        long seed;
370        do {
371            seed = RANDOM.nextLong();
372        } while (seed == 0);
373        return seed;
374    }
375
376    /** Iterated version of xorshift. */
377    protected long xorshift(long n, long times) {
378        for (long i = 0; i < times; i++) {
379            n = xorshift(n);
380        }
381        return n;
382    }
383
384    /**
385     * xorshift algorithm from George Marsaglia, with period 2^64 - 1.
386     *
387     * @see <a href="https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf">xorshift algorithm from George
388     *      Marsaglia</a>
389     */
390    protected long xorshift(long n) {
391        n ^= (n << 13);
392        n ^= (n >>> 7);
393        n ^= (n << 17);
394        return n;
395    }
396
397    @Override
398    public String generateNewId() {
399        IdType idType = repository.getIdType();
400        if (idType == IdType.sequence || idType == IdType.sequenceHexRandomized || DBSRepositoryBase.DEBUG_UUIDS) {
401            long id = getNextSequenceId();
402            if (DBSRepositoryBase.DEBUG_UUIDS) {
403                return "UUID_" + id;
404            } else if (idType == IdType.sequence) {
405                return String.valueOf(id);
406            } else { // idType == IdType.sequenceHexRandomized
407                // hex version filled to 16 chars
408                String hex = Long.toHexString(id);
409                int nz = 16 - hex.length();
410                if (nz > 0) {
411                    hex = "0".repeat(nz) + hex;
412                }
413                return hex;
414            }
415        } else {
416            return UUID.randomUUID().toString();
417        }
418    }
419
420    @Override
421    public void createState(State state) {
422        Document doc = converter.stateToBson(state);
423        log.trace("MongoDB: CREATE {}: {}", doc.get(idKey), doc);
424        try {
425            insertOne(doc);
426        } catch (DuplicateKeyException dke) {
427            log.trace("MongoDB:    -> DUPLICATE KEY: {}", doc.get(idKey));
428            throw new ConcurrentUpdateException(dke);
429        }
430    }
431
432    @Override
433    public void createStates(List<State> states) {
434        List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList());
435        log.trace("MongoDB: CREATE [{}]: {}",
436                () -> docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", ")),
437                () -> docs);
438        try {
439            insertMany(docs);
440        } catch (MongoBulkWriteException mbwe) {
441            List<String> duplicates = mbwe.getWriteErrors()
442                                          .stream()
443                                          .filter(wr -> DUPLICATE_KEY.equals(fromErrorCode(wr.getCode())))
444                                          .map(BulkWriteError::getMessage)
445                                          .collect(Collectors.toList());
446            // Avoid hiding any others bulk errors
447            if (duplicates.size() == mbwe.getWriteErrors().size()) {
448                log.trace("MongoDB:    -> DUPLICATE KEY: {}", duplicates);
449                var concurrentUpdateException = new ConcurrentUpdateException("Concurrent update");
450                duplicates.forEach(concurrentUpdateException::addInfo);
451                throw concurrentUpdateException;
452            }
453
454            throw mbwe;
455        }
456    }
457
458    @Override
459    public State readState(String id) {
460        return findOne(converter.filterEq(KEY_ID, id));
461    }
462
463    @Override
464    public State readPartialState(String id, Collection<String> keys) {
465        Document fields = new Document();
466        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
467        return findOne(converter.filterEq(KEY_ID, id), fields);
468    }
469
470    @Override
471    public List<State> readStates(List<String> ids) {
472        return findAll(converter.filterIn(KEY_ID, ids));
473    }
474
475    @Override
476    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
477        List<Document> updates = converter.diffToBson(diff);
478        for (Document update : updates) {
479            Document filter = new Document();
480            converter.putToBson(filter, KEY_ID, id);
481            if (changeTokenUpdater == null) {
482                log.trace("MongoDB: UPDATE {}: {}", id, update);
483            } else {
484                // assume bson is identical to dbs internals
485                // condition works even if value is null
486                Map<String, Serializable> conditions = changeTokenUpdater.getConditions();
487                Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates();
488                if (update.containsKey(MONGODB_SET)) {
489                    ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates);
490                } else {
491                    Document set = new Document();
492                    set.putAll(tokenUpdates);
493                    update.put(MONGODB_SET, set);
494                }
495                log.trace("MongoDB: UPDATE {}: IF {} THEN {}", id, conditions, update);
496                filter.putAll(conditions);
497            }
498            try {
499                UpdateResult w = updateMany(filter, update);
500                if (w.getModifiedCount() != 1) {
501                    log.trace("MongoDB:    -> CONCURRENT UPDATE: {}", id);
502                    throw new ConcurrentUpdateException(id);
503                }
504            } catch (MongoWriteException mwe) {
505                if (DUPLICATE_KEY.equals(fromErrorCode(mwe.getCode()))) {
506                    log.trace("MongoDB:    -> DUPLICATE KEY: {}", id);
507                    throw new ConcurrentUpdateException(mwe.getError().getMessage(), mwe);
508                }
509                throw mwe;
510            }
511        }
512    }
513
514    @Override
515    public void deleteStates(Set<String> ids) {
516        Bson filter = converter.filterIn(KEY_ID, ids);
517        log.trace("MongoDB: REMOVE {}", ids);
518        DeleteResult w = deleteMany(filter);
519        if (w.getDeletedCount() != ids.size()) {
520            log.debug("Removed {} docs for {} ids: {}", w::getDeletedCount, ids::size, () -> ids);
521        }
522    }
523
524    @Override
525    public State readChildState(String parentId, String name, Set<String> ignored) {
526        Bson filter = getChildQuery(parentId, name, ignored);
527        return findOne(filter);
528    }
529
530    protected NuxeoException newQueryTimeout(MongoExecutionTimeoutException cause, Bson filter) {
531        NuxeoException exc = new NuxeoException("Query timed out after " + mongoDBRepository.maxTimeMS + " ms", cause);
532        if (filter != null) {
533            String msg;
534            if (filter instanceof Document) {
535                msg = ((Document) filter).toJson();
536            } else {
537                msg = filter.toString();
538            }
539            exc.addInfo("Filter: " + msg);
540        }
541        return exc;
542    }
543
544    protected void logQuery(String id, Bson fields) {
545        logQuery(converter.filterEq(KEY_ID, id), fields);
546    }
547
548    protected void logQuery(Bson filter, Bson fields) {
549        if (fields == null) {
550            log.trace("MongoDB: QUERY {}", filter);
551        } else {
552            log.trace("MongoDB: QUERY {} KEYS {}", filter, fields);
553        }
554    }
555
556    protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) {
557        if (orderBy == null) {
558            log.trace("MongoDB: QUERY {} KEYS {} OFFSET {} LIMIT {}", query, fields, offset, limit);
559        } else {
560            log.trace("MongoDB: QUERY {} KEYS {} ORDER BY {} OFFSET {} LIMIT {}", query, fields, orderBy, offset,
561                    limit);
562        }
563    }
564
565    @Override
566    public boolean hasChild(String parentId, String name, Set<String> ignored) {
567        Document filter = getChildQuery(parentId, name, ignored);
568        return exists(filter);
569    }
570
571    protected Document getChildQuery(String parentId, String name, Set<String> ignored) {
572        Document filter = new Document();
573        converter.putToBson(filter, KEY_PARENT_ID, parentId);
574        converter.putToBson(filter, KEY_NAME, name);
575        addIgnoredIds(filter, ignored);
576        return filter;
577    }
578
579    protected void addIgnoredIds(Document filter, Set<String> ignored) {
580        if (!ignored.isEmpty()) {
581            Document notInIds = new Document(MongoDBOperators.NIN, converter.listToBson(KEY_ID, ignored));
582            filter.put(idKey, notInIds);
583        }
584    }
585
586    @Override
587    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
588        Document filter = new Document();
589        converter.putToBson(filter, key, value);
590        addIgnoredIds(filter, ignored);
591        return findAll(filter);
592    }
593
594    @Override
595    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
596        Document filter = new Document();
597        converter.putToBson(filter, key1, value1);
598        converter.putToBson(filter, key2, value2);
599        addIgnoredIds(filter, ignored);
600        return findAll(filter);
601    }
602
603    @Override
604    public List<State> queryKeyValueWithOperator(String key1, Object value1, String key2, DBSQueryOperator operator,
605            Object value2, Set<String> ignored) {
606        Map<String, Object> comparatorAndValue;
607        switch (operator) {
608        case IN:
609            comparatorAndValue = Map.of(MongoDBOperators.IN, value2);
610            break;
611        case NOT_IN:
612            comparatorAndValue = Map.of(MongoDBOperators.NIN, value2);
613            break;
614        default:
615            throw new IllegalArgumentException(String.format("Unknown operator: %s", operator));
616        }
617        Document filter = new Document();
618        converter.putToBson(filter, key1, value1);
619        converter.putToBson(filter, key2, comparatorAndValue);
620        addIgnoredIds(filter, ignored);
621        return findAll(filter);
622    }
623
624    @Override
625    public Stream<State> getDescendants(String rootId, Set<String> keys) {
626        return getDescendants(rootId, keys, 0);
627    }
628
629    @Override
630    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
631        Bson filter = converter.filterEq(KEY_ANCESTOR_IDS, rootId);
632        Document fields = new Document();
633        if (useCustomId) {
634            fields.put(MONGODB_ID, ZERO);
635        }
636        fields.put(idKey, ONE);
637        keys.forEach(key -> fields.put(converter.keyToBson(key), ONE));
638        return stream(filter, fields, limit);
639    }
640
641    @Override
642    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
643        Document filter = new Document();
644        converter.putToBson(filter, key, value);
645        addIgnoredIds(filter, ignored);
646        return exists(filter);
647    }
648
649    protected boolean exists(Bson filter) {
650        return exists(filter, justPresenceField());
651    }
652
653    protected boolean exists(Bson filter, Bson projection) {
654        logQuery(filter, projection);
655        try {
656            return find(filter).projection(projection).first() != null;
657        } catch (MongoExecutionTimeoutException e) {
658            throw newQueryTimeout(e, filter);
659        }
660    }
661
662    protected State findOne(Bson filter) {
663        return findOne(filter, null);
664    }
665
666    protected State findOne(Bson filter, Bson projection) {
667        try (Stream<State> stream = stream(filter, projection)) {
668            return stream.findAny().orElse(null);
669        }
670    }
671
672    protected List<State> findAll(Bson filter) {
673        try (Stream<State> stream = stream(filter)) {
674            return stream.collect(Collectors.toList());
675        } catch (MongoExecutionTimeoutException e) {
676            throw newQueryTimeout(e, filter);
677        }
678    }
679
680    protected Stream<State> stream(Bson filter) {
681        return stream(filter, null, 0);
682    }
683
684    protected Stream<State> stream(Bson filter, Bson projection) {
685        return stream(filter, projection, 0);
686    }
687
688    /**
689     * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}.
690     * <p>
691     * We should rely on this method, because it correctly handles cursor closed state.
692     * <p>
693     * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case
694     * on some call to {@link MongoIterable#first()}.
695     *
696     * @return a closeable {@link Stream} instance linked to {@link MongoCursor}
697     */
698    protected Stream<State> stream(Bson filter, Bson projection, int limit) {
699        if (filter == null) {
700            // empty filter
701            filter = new Document();
702        }
703        // it's ok if projection is null
704        logQuery(filter, projection);
705
706        boolean completedAbruptly = true;
707        MongoCursor<Document> cursor = find(filter).limit(limit).projection(projection).iterator();
708        try {
709            Set<Object> seen = new HashSet<>();
710            Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) //
711                                                .onClose(cursor::close)
712                                                .filter(doc -> seen.add(doc.get(idKey)))
713                                                // MongoDB cursors may return the same
714                                                // object several times
715                                                .map(converter::bsonToState);
716            // the stream takes responsibility for closing the session
717            completedAbruptly = false;
718            return stream;
719        } catch (MongoExecutionTimeoutException e) {
720            throw newQueryTimeout(e, filter); // NOSONAR (cursor is not leaked)
721        } finally {
722            if (completedAbruptly) {
723                cursor.close();
724            }
725        }
726    }
727
728    protected Document justPresenceField() {
729        return new Document(MONGODB_ID, ONE);
730    }
731
732    @Override
733    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
734            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
735        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
736        MongoDBRepositoryQueryBuilder builder = new MongoDBRepositoryQueryBuilder((MongoDBRepository) repository,
737                evaluator.getExpression(), evaluator.getSelectClause(), orderByClause, evaluator.pathResolver,
738                evaluator.fulltextSearchDisabled);
739        builder.walk();
740        if (builder.hasFulltext && repository.isFulltextSearchDisabled()) {
741            throw new QueryParseException("Fulltext search disabled by configuration");
742        }
743        Document filter = builder.getQuery();
744        addPrincipals(filter, evaluator.principals);
745        Bson orderBy = builder.getOrderBy();
746        Bson keys = builder.getProjection();
747        // Don't do manual projection if there are no projection wildcards, as this brings no new
748        // information and is costly. The only difference is several identical rows instead of one.
749        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
750        if (manualProjection) {
751            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
752            // so we need the full state from the database
753            keys = null;
754            evaluator.parse();
755        }
756
757        logQuery(filter, keys, orderBy, limit, offset);
758
759        List<Map<String, Serializable>> projections;
760        long totalSize;
761        try (MongoCursor<Document> cursor = find(filter).projection(keys)
762                                                        .skip(offset)
763                                                        .limit(limit)
764                                                        .sort(orderBy)
765                                                        .iterator()) {
766            projections = new ArrayList<>();
767            DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys);
768            Iterable<Document> docs = () -> cursor;
769            for (Document doc : docs) {
770                State state = converter.bsonToState(doc);
771                if (manualProjection) {
772                    projections.addAll(evaluator.matches(state));
773                } else {
774                    projections.add(flattener.flatten(state));
775                }
776            }
777        } catch (MongoExecutionTimeoutException e) {
778            throw newQueryTimeout(e, filter);
779        }
780        if (countUpTo == -1) {
781            // count full size
782            if (limit == 0) {
783                totalSize = projections.size();
784            } else if (manualProjection) {
785                totalSize = -1; // unknown due to manual projection
786            } else {
787                totalSize = countDocuments(filter);
788            }
789        } else if (countUpTo == 0) {
790            // no count
791            totalSize = -1; // not counted
792        } else {
793            // count only if less than countUpTo
794            if (limit == 0) {
795                totalSize = projections.size();
796            } else if (manualProjection) {
797                totalSize = -1; // unknown due to manual projection
798            } else {
799                totalSize = countDocuments(filter, new CountOptions().limit(countUpTo + 1));
800            }
801            if (totalSize > countUpTo) {
802                totalSize = -2; // truncated
803            }
804        }
805        if (!projections.isEmpty()) {
806            log.trace("MongoDB:    -> {}", projections::size);
807        }
808        return new PartialList<>(projections, totalSize);
809    }
810
811    @SuppressWarnings("resource") // cursor is being registered, must not be closed
812    @Override
813    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
814        MongoDBCursorService cursorService = mongoDBRepository.getCursorService();
815        cursorService.checkForTimedOutScroll();
816        MongoDBRepositoryQueryBuilder builder = new MongoDBRepositoryQueryBuilder((MongoDBRepository) repository,
817                evaluator.getExpression(), evaluator.getSelectClause(), null, evaluator.pathResolver,
818                evaluator.fulltextSearchDisabled);
819        builder.walk();
820        if (builder.hasFulltext && repository.isFulltextSearchDisabled()) {
821            throw new QueryParseException("Fulltext search disabled by configuration");
822        }
823        Document filter = builder.getQuery();
824        addPrincipals(filter, evaluator.principals);
825        Bson keys = builder.getProjection();
826        logQuery(filter, keys, null, 0, 0);
827
828        MongoCursor<Document> cursor;
829        try {
830            cursor = find(filter).projection(keys).batchSize(batchSize).iterator();
831            cursor.hasNext(); // check timeout asap - NOSONAR
832        } catch (MongoExecutionTimeoutException e) {
833            throw newQueryTimeout(e, filter);
834        }
835        String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds);
836        return scroll(scrollId);
837    }
838
839    @Override
840    public ScrollResult<String> scroll(String scrollId) {
841        MongoDBCursorService cursorService = mongoDBRepository.getCursorService();
842        try {
843            return cursorService.scroll(scrollId);
844        } catch (MongoExecutionTimeoutException e) {
845            throw newQueryTimeout(e, null);
846        }
847    }
848
849    protected void addPrincipals(Document query, Set<String> principals) {
850        if (principals != null) {
851            Document inPrincipals = new Document(MongoDBOperators.IN, new ArrayList<>(principals));
852            query.put(KEY_READ_ACL, inPrincipals);
853        }
854    }
855
856    protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED);
857
858    protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER),
859            Updates.unset(KEY_LOCK_CREATED));
860
861    @Override
862    public Lock getLock(String id) {
863        logQuery(id, LOCK_FIELDS);
864        // we do NOT want to use clientSession here because locks must be non-transactional
865        Document res = coll.find(converter.filterEq(KEY_ID, id)).projection(LOCK_FIELDS).first();
866        if (res == null) {
867            // document not found
868            throw new DocumentNotFoundException(id);
869        }
870        String owner = res.getString(KEY_LOCK_OWNER);
871        if (owner == null) {
872            // not locked
873            return null;
874        }
875        Calendar created = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, res.get(KEY_LOCK_CREATED));
876        return new Lock(owner, created);
877    }
878
879    @Override
880    public Lock setLock(String id, Lock lock) {
881        Bson filter = Filters.and( //
882                converter.filterEq(KEY_ID, id), //
883                Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set
884        );
885        Bson setLock = Updates.combine( //
886                Updates.set(KEY_LOCK_OWNER, lock.getOwner()), //
887                Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(KEY_LOCK_CREATED, lock.getCreated())) //
888        );
889        log.trace("MongoDB: FINDANDMODIFY {} UPDATE {}", filter, setLock);
890        // we do NOT want to use clientSession here because locks must be non-transactional
891        Document res = coll.findOneAndUpdate(filter, setLock);
892        if (res != null) {
893            // found a doc to lock
894            return null;
895        } else {
896            // doc not found, or lock owner already set
897            // get the old lock
898            logQuery(id, LOCK_FIELDS);
899            // we do NOT want to use clientSession here because locks must be non-transactional
900            Document old = coll.find(converter.filterEq(KEY_ID, id)).projection(LOCK_FIELDS).first();
901            if (old == null) {
902                // document not found
903                throw new DocumentNotFoundException(id);
904            }
905            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
906            Calendar oldCreated = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, old.get(KEY_LOCK_CREATED));
907            if (oldOwner != null) {
908                return new Lock(oldOwner, oldCreated);
909            }
910            // no lock -- there was a race condition
911            // TODO do better
912            throw new ConcurrentUpdateException("Lock " + id);
913        }
914    }
915
916    @Override
917    public Lock removeLock(String id, String owner) {
918        Document filter = new Document();
919        converter.putToBson(filter, KEY_ID, id);
920        if (owner != null) {
921            // remove if owner matches or null
922            // implements LockManager.canLockBeRemoved inside MongoDB
923            Object ownerOrNull = Arrays.asList(owner, null);
924            filter.put(KEY_LOCK_OWNER, new Document(MongoDBOperators.IN, ownerOrNull));
925        }
926        // else unconditional remove
927        // remove the lock
928        log.trace("MongoDB: FINDANDMODIFY {} UPDATE {}", filter, UNSET_LOCK_UPDATE);
929        // we do NOT want to use clientSession here because locks must be non-transactional
930        Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE);
931        if (old != null) {
932            // found a doc and removed the lock, return previous lock
933            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
934            if (oldOwner == null) {
935                // was not locked
936                return null;
937            } else {
938                // return previous lock
939                var oldCreated = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, old.get(KEY_LOCK_CREATED));
940                return new Lock(oldOwner, oldCreated);
941            }
942        } else {
943            // doc not found, or lock owner didn't match
944            // get the old lock
945            logQuery(id, LOCK_FIELDS);
946            // we do NOT want to use clientSession here because locks must be non-transactional
947            old = coll.find(converter.filterEq(KEY_ID, id)).projection(LOCK_FIELDS).first();
948            if (old == null) {
949                // document not found
950                throw new DocumentNotFoundException(id);
951            }
952            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
953            Calendar oldCreated = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, old.get(KEY_LOCK_CREATED));
954            if (oldOwner != null) {
955                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
956                    // existing mismatched lock, flag failure
957                    return new Lock(oldOwner, oldCreated, true);
958                }
959                // old owner should have matched -- there was a race condition
960                // TODO do better
961                throw new ConcurrentUpdateException("Unlock " + id);
962            }
963            // old owner null, should have matched -- there was a race condition
964            // TODO do better
965            throw new ConcurrentUpdateException("Unlock " + id);
966        }
967    }
968
969    protected void insertOne(Document document) {
970        if (transactionStarted) {
971            coll.insertOne(clientSession, document);
972        } else {
973            coll.insertOne(document);
974        }
975    }
976
977    protected void insertMany(List<Document> documents) {
978        if (transactionStarted) {
979            coll.insertMany(clientSession, documents);
980        } else {
981            coll.insertMany(documents);
982        }
983    }
984
985    protected UpdateResult updateMany(Bson filter, Bson update) {
986        if (transactionStarted) {
987            return coll.updateMany(clientSession, filter, update);
988        } else {
989            return coll.updateMany(filter, update);
990        }
991    }
992
993    protected DeleteResult deleteMany(Bson filter) {
994        if (transactionStarted) {
995            return coll.deleteMany(clientSession, filter);
996        } else {
997            return coll.deleteMany(filter);
998        }
999    }
1000
1001    protected FindIterable<Document> find(Bson filter) {
1002        FindIterable<Document> it;
1003        if (transactionStarted) {
1004            it = coll.find(clientSession, filter);
1005        } else {
1006            it = coll.find(filter);
1007        }
1008        it.maxTime(mongoDBRepository.maxTimeMS, MILLISECONDS);
1009        return it;
1010    }
1011
1012    protected long countDocuments(Bson filter) {
1013        return countDocuments(filter, new CountOptions());
1014    }
1015
1016    protected long countDocuments(Bson filter, CountOptions options) {
1017        options.maxTime(mongoDBRepository.maxTimeMS, MILLISECONDS);
1018        if (transactionStarted) {
1019            return coll.countDocuments(clientSession, filter, options);
1020        } else {
1021            return coll.countDocuments(filter, options);
1022        }
1023    }
1024
1025}