001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
045
046import java.io.Serializable;
047import java.net.UnknownHostException;
048import java.util.ArrayList;
049import java.util.Arrays;
050import java.util.Calendar;
051import java.util.HashSet;
052import java.util.List;
053import java.util.Map;
054import java.util.Set;
055import java.util.UUID;
056import java.util.concurrent.ConcurrentHashMap;
057import java.util.stream.Collectors;
058
059import javax.resource.spi.ConnectionManager;
060
061import com.mongodb.MongoClientOptions;
062import org.apache.commons.lang.StringUtils;
063import org.apache.commons.logging.Log;
064import org.apache.commons.logging.LogFactory;
065import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
066import org.nuxeo.ecm.core.api.DocumentNotFoundException;
067import org.nuxeo.ecm.core.api.Lock;
068import org.nuxeo.ecm.core.api.NuxeoException;
069import org.nuxeo.ecm.core.api.PartialList;
070import org.nuxeo.ecm.core.api.ScrollResult;
071import org.nuxeo.ecm.core.api.ScrollResultImpl;
072import org.nuxeo.ecm.core.blob.BlobManager;
073import org.nuxeo.ecm.core.model.LockManager;
074import org.nuxeo.ecm.core.model.Repository;
075import org.nuxeo.ecm.core.query.QueryParseException;
076import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
077import org.nuxeo.ecm.core.storage.State;
078import org.nuxeo.ecm.core.storage.State.StateDiff;
079import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
080import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
081import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
082import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
083import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
084import org.nuxeo.runtime.api.Framework;
085
086import com.mongodb.BasicDBObject;
087import com.mongodb.DB;
088import com.mongodb.DBCollection;
089import com.mongodb.DBCursor;
090import com.mongodb.DBObject;
091import com.mongodb.MongoClient;
092import com.mongodb.MongoClientURI;
093import com.mongodb.QueryOperators;
094import com.mongodb.ServerAddress;
095import com.mongodb.WriteResult;
096
097/**
098 * MongoDB implementation of a {@link Repository}.
099 *
100 * @since 5.9.4
101 */
102public class MongoDBRepository extends DBSRepositoryBase {
103
104    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
105
106    public static final Long ZERO = Long.valueOf(0);
107
108    public static final Long ONE = Long.valueOf(1);
109
110    public static final Long MINUS_ONE = Long.valueOf(-1);
111
112    public static final String DB_DEFAULT = "nuxeo";
113
114    public static final String MONGODB_ID = "_id";
115
116    public static final String MONGODB_INC = "$inc";
117
118    public static final String MONGODB_SET = "$set";
119
120    public static final String MONGODB_UNSET = "$unset";
121
122    public static final String MONGODB_PUSH = "$push";
123
124    public static final String MONGODB_EACH = "$each";
125
126    public static final String MONGODB_META = "$meta";
127
128    public static final String MONGODB_TEXT_SCORE = "textScore";
129
130    private static final String MONGODB_INDEX_TEXT = "text";
131
132    private static final String MONGODB_INDEX_NAME = "name";
133
134    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
135
136    private static final String FULLTEXT_INDEX_NAME = "fulltext";
137
138    private static final String LANGUAGE_FIELD = "__language";
139
140    protected static final String COUNTER_NAME_UUID = "ecm:id";
141
142    protected static final String COUNTER_FIELD = "seq";
143
144    protected static final int MONGODB_OPTION_CONNECTION_TIMEOUT_MS = 30000;
145
146    protected static final int MONGODB_OPTION_SOCKET_TIMEOUT_MS = 60000;
147
148    protected MongoClient mongoClient;
149
150    protected DBCollection coll;
151
152    protected DBCollection countersColl;
153
154    /** The key to use to store the id in the database. */
155    protected String idKey;
156
157    /** True if we don't use MongoDB's native "_id" key to store the id. */
158    protected boolean useCustomId;
159
160    /** Number of values still available in the in-memory sequence. */
161    protected long sequenceLeft;
162
163    /** Last value used from the in-memory sequence. */
164    protected long sequenceLastValue;
165
166    /** Sequence allocation block size. */
167    protected long sequenceBlockSize;
168
169    protected final MongoDBConverter converter;
170
171    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
172
173    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
174        super(cm, descriptor.name, descriptor);
175        try {
176            mongoClient = newMongoClient(descriptor);
177            coll = getCollection(descriptor, mongoClient);
178            countersColl = getCountersCollection(descriptor, mongoClient);
179        } catch (UnknownHostException e) {
180            throw new RuntimeException(e);
181        }
182        if (Boolean.TRUE.equals(descriptor.nativeId)) {
183            idKey = MONGODB_ID;
184        } else {
185            idKey = KEY_ID;
186        }
187        useCustomId = KEY_ID.equals(idKey);
188        if (idType == IdType.sequence || DEBUG_UUIDS) {
189            Integer sbs = descriptor.sequenceBlockSize;
190            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
191            sequenceLeft = 0;
192        }
193        converter = new MongoDBConverter(idKey);
194        initRepository();
195    }
196
197    @Override
198    public List<IdType> getAllowedIdTypes() {
199        return Arrays.asList(IdType.varchar, IdType.sequence);
200    }
201
202    @Override
203    public void shutdown() {
204        super.shutdown();
205        mongoClient.close();
206    }
207
208    // used also by unit tests
209    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
210        MongoClient ret = null;
211        String server = descriptor.server;
212        if (StringUtils.isBlank(server)) {
213            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
214        }
215        MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder()
216                // Can help to prevent firewall disconnects inactive connection, option not available from URI
217                .socketKeepAlive(true)
218                // don't wait for ever by default, can be overridden using URI options
219                .connectTimeout(MONGODB_OPTION_CONNECTION_TIMEOUT_MS)
220                .socketTimeout(MONGODB_OPTION_SOCKET_TIMEOUT_MS)
221                .description("Nuxeo");
222        if (server.startsWith("mongodb://")) {
223            // allow mongodb:// URI syntax for the server, to pass everything in one string
224            ret = new MongoClient(new MongoClientURI(server, optionsBuilder));
225        } else {
226            ret = new MongoClient(new ServerAddress(server), optionsBuilder.build());
227        }
228        if (log.isDebugEnabled()) {
229            log.debug("MongoClient initialized with options: " + ret.getMongoClientOptions().toString());
230        }
231        return ret;
232    }
233
234    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
235        if (StringUtils.isBlank(dbname)) {
236            dbname = DB_DEFAULT;
237        }
238        DB db = mongoClient.getDB(dbname);
239        return db.getCollection(collection);
240    }
241
242    // used also by unit tests
243    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
244        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
245    }
246
247    // used also by unit tests
248    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
249        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
250    }
251
252    protected void initRepository() {
253        // create required indexes
254        // code does explicit queries on those
255        if (useCustomId) {
256            coll.createIndex(new BasicDBObject(idKey, ONE));
257        }
258        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
259        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
260        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
261        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
262        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
263        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
264        DBObject parentChild = new BasicDBObject();
265        parentChild.put(KEY_PARENT_ID, ONE);
266        parentChild.put(KEY_NAME, ONE);
267        coll.createIndex(parentChild);
268        // often used in user-generated queries
269        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
270        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
271        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
272        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
273        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
274        // TODO configure these from somewhere else
275        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
276        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
277        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
278        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
279        if (!isFulltextDisabled()) {
280            DBObject indexKeys = new BasicDBObject();
281            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
282            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
283            DBObject indexOptions = new BasicDBObject();
284            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
285            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
286            coll.createIndex(indexKeys, indexOptions);
287        }
288        // check root presence
289        DBObject query = new BasicDBObject(idKey, getRootId());
290        if (coll.findOne(query, justPresenceField()) != null) {
291            return;
292        }
293        // create basic repository structure needed
294        if (idType == IdType.sequence || DEBUG_UUIDS) {
295            // create the id counter
296            DBObject idCounter = new BasicDBObject();
297            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
298            idCounter.put(COUNTER_FIELD, ZERO);
299            countersColl.insert(idCounter);
300        }
301        initRoot();
302    }
303
304    protected synchronized Long getNextSequenceId() {
305        if (sequenceLeft == 0) {
306            // allocate a new sequence block
307            // the database contains the last value from the last block
308            DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
309            DBObject update = new BasicDBObject(MONGODB_INC,
310                    new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)));
311            DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false);
312            if (idCounter == null) {
313                throw new NuxeoException("Repository id counter not initialized");
314            }
315            sequenceLeft = sequenceBlockSize;
316            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
317        }
318        sequenceLeft--;
319        sequenceLastValue++;
320        return Long.valueOf(sequenceLastValue);
321    }
322
323    @Override
324    public String generateNewId() {
325        if (idType == IdType.sequence || DEBUG_UUIDS) {
326            Long id = getNextSequenceId();
327            if (DEBUG_UUIDS) {
328                return "UUID_" + id;
329            }
330            return id.toString();
331        } else {
332            return UUID.randomUUID().toString();
333        }
334    }
335
336    @Override
337    public void createState(State state) {
338        DBObject ob = converter.stateToBson(state);
339        if (log.isTraceEnabled()) {
340            log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob);
341        }
342        coll.insert(ob);
343        // TODO dupe exception
344        // throw new DocumentException("Already exists: " + id);
345    }
346
347    @Override
348    public void createStates(List<State> states) {
349        List<DBObject> obs = states.stream().map(converter::stateToBson).collect(Collectors.toList());
350        if (log.isTraceEnabled()) {
351            log.trace("MongoDB: CREATE ["
352                    + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", "))
353                    + "]: " + obs);
354        }
355        coll.insert(obs);
356    }
357
358    @Override
359    public State readState(String id) {
360        DBObject query = new BasicDBObject(idKey, id);
361        return findOne(query);
362    }
363
364    @Override
365    public List<State> readStates(List<String> ids) {
366        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
367        return findAll(query, ids.size());
368    }
369
370    @Override
371    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
372        List<DBObject> updates = converter.diffToBson(diff);
373        for (DBObject update : updates) {
374            DBObject query = new BasicDBObject(idKey, id);
375            if (changeTokenUpdater == null) {
376                if (log.isTraceEnabled()) {
377                    log.trace("MongoDB: UPDATE " + id + ": " + update);
378                }
379            } else {
380                // assume bson is identical to dbs internals
381                // condition works even if value is null
382                Map<String, Serializable> conditions = changeTokenUpdater.getConditions();
383                Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates();
384                if (update.containsField(MONGODB_SET)) {
385                    ((DBObject) update.get(MONGODB_SET)).putAll(tokenUpdates);
386                } else {
387                    DBObject set = new BasicDBObject();
388                    set.putAll(tokenUpdates);
389                    update.put(MONGODB_SET, set);
390                }
391                if (log.isTraceEnabled()) {
392                    log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update);
393                }
394                query.putAll(conditions);
395            }
396            WriteResult w = coll.update(query, update);
397            if (w.getN() != 1) {
398                log.trace("MongoDB:    -> CONCURRENT UPDATE: " + id);
399                throw new ConcurrentUpdateException(id);
400            }
401            // TODO dupe exception
402            // throw new DocumentException("Missing: " + id);
403        }
404    }
405
406    @Override
407    public void deleteStates(Set<String> ids) {
408        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
409        if (log.isTraceEnabled()) {
410            log.trace("MongoDB: REMOVE " + ids);
411        }
412        WriteResult w = coll.remove(query);
413        if (w.getN() != ids.size()) {
414            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
415        }
416    }
417
418    @Override
419    public State readChildState(String parentId, String name, Set<String> ignored) {
420        DBObject query = getChildQuery(parentId, name, ignored);
421        return findOne(query);
422    }
423
424    protected void logQuery(String id, DBObject fields) {
425        logQuery(new BasicDBObject(idKey, id), fields);
426    }
427
428    protected void logQuery(DBObject query, DBObject fields) {
429        if (fields == null) {
430            log.trace("MongoDB: QUERY " + query);
431        } else {
432            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
433        }
434    }
435
436    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
437        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
438                + " OFFSET " + offset + " LIMIT " + limit);
439    }
440
441    @Override
442    public boolean hasChild(String parentId, String name, Set<String> ignored) {
443        DBObject query = getChildQuery(parentId, name, ignored);
444        if (log.isTraceEnabled()) {
445            logQuery(query, justPresenceField());
446        }
447        return coll.findOne(query, justPresenceField()) != null;
448    }
449
450    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
451        DBObject query = new BasicDBObject();
452        query.put(KEY_PARENT_ID, parentId);
453        query.put(KEY_NAME, name);
454        addIgnoredIds(query, ignored);
455        return query;
456    }
457
458    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
459        if (!ignored.isEmpty()) {
460            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
461            query.put(idKey, notInIds);
462        }
463    }
464
465    @Override
466    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
467        DBObject query = new BasicDBObject(converter.keyToBson(key), value);
468        addIgnoredIds(query, ignored);
469        return findAll(query, 0);
470    }
471
472    @Override
473    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
474        DBObject query = new BasicDBObject(converter.keyToBson(key1), value1);
475        query.put(converter.keyToBson(key2), value2);
476        addIgnoredIds(query, ignored);
477        return findAll(query, 0);
478    }
479
480    @Override
481    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
482            Map<String, Object[]> targetProxies) {
483        DBObject query = new BasicDBObject(key, value);
484        DBObject fields = new BasicDBObject();
485        if (useCustomId) {
486            fields.put(MONGODB_ID, ZERO);
487        }
488        fields.put(idKey, ONE);
489        fields.put(KEY_IS_PROXY, ONE);
490        fields.put(KEY_PROXY_TARGET_ID, ONE);
491        fields.put(KEY_PROXY_IDS, ONE);
492        if (log.isTraceEnabled()) {
493            logQuery(query, fields);
494        }
495        DBCursor cursor = coll.find(query, fields);
496        try {
497            for (DBObject ob : cursor) {
498                String id = (String) ob.get(idKey);
499                ids.add(id);
500                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
501                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
502                    proxyTargets.put(id, targetId);
503                }
504                if (targetProxies != null) {
505                    Object[] proxyIds = (Object[]) converter.bsonToValue(ob.get(KEY_PROXY_IDS));
506                    if (proxyIds != null) {
507                        targetProxies.put(id, proxyIds);
508                    }
509                }
510            }
511        } finally {
512            cursor.close();
513        }
514    }
515
516    @Override
517    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
518        DBObject query = new BasicDBObject(key, value);
519        addIgnoredIds(query, ignored);
520        if (log.isTraceEnabled()) {
521            logQuery(query, justPresenceField());
522        }
523        return coll.findOne(query, justPresenceField()) != null;
524    }
525
526    protected State findOne(DBObject query) {
527        if (log.isTraceEnabled()) {
528            logQuery(query, null);
529        }
530        return converter.bsonToState(coll.findOne(query));
531    }
532
533    protected List<State> findAll(DBObject query, int sizeHint) {
534        if (log.isTraceEnabled()) {
535            logQuery(query, null);
536        }
537        DBCursor cursor = coll.find(query);
538        Set<String> seen = new HashSet<>();
539        try {
540            List<State> list = new ArrayList<>(sizeHint);
541            for (DBObject ob : cursor) {
542                if (!seen.add((String) ob.get(idKey))) {
543                    // MongoDB cursors may return the same
544                    // object several times
545                    continue;
546                }
547                list.add(converter.bsonToState(ob));
548            }
549            return list;
550        } finally {
551            cursor.close();
552        }
553    }
554
555    protected DBObject justPresenceField() {
556        return new BasicDBObject(MONGODB_ID, ONE);
557    }
558
559    @Override
560    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
561            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
562        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
563        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
564                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
565        builder.walk();
566        if (builder.hasFulltext && isFulltextDisabled()) {
567            throw new QueryParseException("Fulltext search disabled by configuration");
568        }
569        DBObject query = builder.getQuery();
570        addPrincipals(query, evaluator.principals);
571        DBObject orderBy = builder.getOrderBy();
572        DBObject keys = builder.getProjection();
573        // Don't do manual projection if there are no projection wildcards, as this brings no new
574        // information and is costly. The only difference is several identical rows instead of one.
575        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
576        if (manualProjection) {
577            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
578            // so we need the full state from the database
579            keys = new BasicDBObject();
580            evaluator.parse();
581        }
582
583        if (log.isTraceEnabled()) {
584            logQuery(query, keys, orderBy, limit, offset);
585        }
586
587        List<Map<String, Serializable>> projections;
588        long totalSize;
589        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
590        try {
591            if (orderBy != null) {
592                cursor.sort(orderBy);
593            }
594            projections = new ArrayList<>();
595            for (DBObject ob : cursor) {
596                State state = converter.bsonToState(ob);
597                if (manualProjection) {
598                    projections.addAll(evaluator.matches(state));
599                } else {
600                    projections.add(DBSStateFlattener.flatten(state));
601                }
602            }
603            if (countUpTo == -1) {
604                // count full size
605                if (limit == 0) {
606                    totalSize = projections.size();
607                } else {
608                    totalSize = cursor.count();
609                }
610            } else if (countUpTo == 0) {
611                // no count
612                totalSize = -1; // not counted
613            } else {
614                // count only if less than countUpTo
615                if (limit == 0) {
616                    totalSize = projections.size();
617                } else {
618                    totalSize = cursor.copy().limit(countUpTo + 1).count();
619                }
620                if (totalSize > countUpTo) {
621                    totalSize = -2; // truncated
622                }
623            }
624        } finally {
625            cursor.close();
626        }
627        if (log.isTraceEnabled() && projections.size() != 0) {
628            log.trace("MongoDB:    -> " + projections.size());
629        }
630        return new PartialList<>(projections, totalSize);
631    }
632
633    @Override
634    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
635        checkForTimedoutScroll();
636        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
637                evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
638        builder.walk();
639        if (builder.hasFulltext && isFulltextDisabled()) {
640            throw new QueryParseException("Fulltext search disabled by configuration");
641        }
642        DBObject query = builder.getQuery();
643        DBObject keys = builder.getProjection();
644        if (log.isTraceEnabled()) {
645            logQuery(query, keys, null, 0, 0);
646        }
647
648        DBCursor cursor = coll.find(query, keys);
649        String scrollId = UUID.randomUUID().toString();
650        registerCursor(scrollId, cursor, batchSize, keepAliveSeconds);
651        return scroll(scrollId);
652    }
653
654    protected void checkForTimedoutScroll() {
655        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
656    }
657
658    protected void registerCursor(String scrollId, DBCursor cursor, int batchSize, int keepAliveSeconds) {
659        cursorResults.put(scrollId, new CursorResult(cursor, batchSize, keepAliveSeconds));
660    }
661
662    @Override
663    public ScrollResult scroll(String scrollId) {
664        CursorResult cursorResult = cursorResults.get(scrollId);
665        if (cursorResult == null) {
666            throw new NuxeoException("Unknown or timed out scrollId");
667        } else if (cursorResult.timedOut(scrollId)) {
668            throw new NuxeoException("Timed out scrollId");
669        }
670        cursorResult.touch();
671        List<String> ids = new ArrayList<>(cursorResult.batchSize);
672        synchronized (cursorResult) {
673            if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) {
674                unregisterCursor(scrollId);
675                return emptyResult();
676            }
677            while (ids.size() < cursorResult.batchSize) {
678                if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) {
679                    cursorResult.close();
680                    break;
681                } else {
682                    DBObject ob = cursorResult.cursor.next();
683                    String id = (String) ob.get(converter.keyToBson(KEY_ID));
684                    if (id != null) {
685                        ids.add(id);
686                    } else {
687                        log.error("Got a document without id: " + ob);
688                    }
689                }
690            }
691        }
692        return new ScrollResultImpl(scrollId, ids);
693    }
694
695    protected boolean unregisterCursor(String scrollId) {
696        CursorResult cursor = cursorResults.remove(scrollId);
697        if (cursor != null) {
698            cursor.close();
699            return true;
700        }
701        return false;
702    }
703
704    protected void addPrincipals(DBObject query, Set<String> principals) {
705        if (principals != null) {
706            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
707            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
708        }
709    }
710
711    /** Keys used for document projection when marking all binaries for GC. */
712    protected DBObject binaryKeys;
713
714    @Override
715    protected void initBlobsPaths() {
716        MongoDBBlobFinder finder = new MongoDBBlobFinder();
717        finder.visit();
718        binaryKeys = finder.binaryKeys;
719    }
720
721    protected static class MongoDBBlobFinder extends BlobFinder {
722        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
723
724        @Override
725        protected void recordBlobPath() {
726            path.addLast(KEY_BLOB_DATA);
727            binaryKeys.put(StringUtils.join(path, "."), ONE);
728            path.removeLast();
729        }
730    }
731
732    @Override
733    public void markReferencedBinaries() {
734        BlobManager blobManager = Framework.getService(BlobManager.class);
735        // TODO add a query to not scan all documents
736        if (log.isTraceEnabled()) {
737            logQuery(new BasicDBObject(), binaryKeys);
738        }
739        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
740        try {
741            for (DBObject ob : cursor) {
742                markReferencedBinaries(ob, blobManager);
743            }
744        } finally {
745            cursor.close();
746        }
747    }
748
749    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
750        for (String key : ob.keySet()) {
751            Object value = ob.get(key);
752            if (value instanceof List) {
753                @SuppressWarnings("unchecked")
754                List<Object> list = (List<Object>) value;
755                for (Object v : list) {
756                    if (v instanceof DBObject) {
757                        markReferencedBinaries((DBObject) v, blobManager);
758                    } else {
759                        markReferencedBinary(v, blobManager);
760                    }
761                }
762            } else if (value instanceof Object[]) {
763                for (Object v : (Object[]) value) {
764                    markReferencedBinary(v, blobManager);
765                }
766            } else if (value instanceof DBObject) {
767                markReferencedBinaries((DBObject) value, blobManager);
768            } else {
769                markReferencedBinary(value, blobManager);
770            }
771        }
772    }
773
774    protected void markReferencedBinary(Object value, BlobManager blobManager) {
775        if (!(value instanceof String)) {
776            return;
777        }
778        String key = (String) value;
779        blobManager.markReferencedBinary(key, repositoryName);
780    }
781
782    protected static final DBObject LOCK_FIELDS;
783
784    static {
785        LOCK_FIELDS = new BasicDBObject();
786        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
787        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
788    }
789
790    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
791
792    @Override
793    public Lock getLock(String id) {
794        if (log.isTraceEnabled()) {
795            logQuery(id, LOCK_FIELDS);
796        }
797        DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
798        if (res == null) {
799            // document not found
800            throw new DocumentNotFoundException(id);
801        }
802        String owner = (String) res.get(KEY_LOCK_OWNER);
803        if (owner == null) {
804            // not locked
805            return null;
806        }
807        Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED));
808        return new Lock(owner, created);
809    }
810
811    @Override
812    public Lock setLock(String id, Lock lock) {
813        DBObject query = new BasicDBObject(idKey, id);
814        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
815        DBObject setLock = new BasicDBObject();
816        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
817        setLock.put(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated()));
818        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
819        if (log.isTraceEnabled()) {
820            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
821        }
822        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
823        if (res != null) {
824            // found a doc to lock
825            return null;
826        } else {
827            // doc not found, or lock owner already set
828            // get the old lock
829            if (log.isTraceEnabled()) {
830                logQuery(id, LOCK_FIELDS);
831            }
832            DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
833            if (old == null) {
834                // document not found
835                throw new DocumentNotFoundException(id);
836            }
837            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
838            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
839            if (oldOwner != null) {
840                return new Lock(oldOwner, oldCreated);
841            }
842            // no lock -- there was a race condition
843            // TODO do better
844            throw new ConcurrentUpdateException("Lock " + id);
845        }
846    }
847
848    @Override
849    public Lock removeLock(String id, String owner) {
850        DBObject query = new BasicDBObject(idKey, id);
851        if (owner != null) {
852            // remove if owner matches or null
853            // implements LockManager.canLockBeRemoved inside MongoDB
854            Object ownerOrNull = Arrays.asList(owner, null);
855            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
856        } // else unconditional remove
857        // remove the lock
858        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
859        if (old != null) {
860            // found a doc and removed the lock, return previous lock
861            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
862            if (oldOwner == null) {
863                // was not locked
864                return null;
865            } else {
866                // return previous lock
867                Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
868                return new Lock(oldOwner, oldCreated);
869            }
870        } else {
871            // doc not found, or lock owner didn't match
872            // get the old lock
873            if (log.isTraceEnabled()) {
874                logQuery(id, LOCK_FIELDS);
875            }
876            old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
877            if (old == null) {
878                // document not found
879                throw new DocumentNotFoundException(id);
880            }
881            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
882            Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED));
883            if (oldOwner != null) {
884                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
885                    // existing mismatched lock, flag failure
886                    return new Lock(oldOwner, oldCreated, true);
887                }
888                // old owner should have matched -- there was a race condition
889                // TODO do better
890                throw new ConcurrentUpdateException("Unlock " + id);
891            }
892            // old owner null, should have matched -- there was a race condition
893            // TODO do better
894            throw new ConcurrentUpdateException("Unlock " + id);
895        }
896    }
897
898    @Override
899    public void closeLockManager() {
900
901    }
902
903    @Override
904    public void clearLockManagerCaches() {
905    }
906
907    protected class CursorResult {
908        // Note that MongoDB cursor automatically timeout after 10 minutes of inactivity by default.
909        protected DBCursor cursor;
910        protected final int batchSize;
911        protected long lastCallTimestamp;
912        protected final int keepAliveSeconds;
913
914        public CursorResult(DBCursor cursor, int batchSize, int keepAliveSeconds) {
915            this.cursor = cursor;
916            this.batchSize = batchSize;
917            this.keepAliveSeconds = keepAliveSeconds;
918            lastCallTimestamp = System.currentTimeMillis();
919        }
920
921        void touch() {
922            lastCallTimestamp = System.currentTimeMillis();
923        }
924
925        boolean timedOut(String scrollId) {
926            long now = System.currentTimeMillis();
927            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
928                if (unregisterCursor(scrollId)) {
929                    log.warn("Scroll " + scrollId + " timed out");
930                }
931                return true;
932            }
933            return false;
934        }
935
936        public void close() {
937            if (cursor != null) {
938                cursor.close();
939            }
940            cursor = null;
941        }
942    }
943}