001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023import static org.nuxeo.ecm.core.storage.State.NOP;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
045import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
046
047import java.io.Serializable;
048import java.lang.reflect.Array;
049import java.net.UnknownHostException;
050import java.util.ArrayList;
051import java.util.Arrays;
052import java.util.Calendar;
053import java.util.Date;
054import java.util.HashSet;
055import java.util.List;
056import java.util.Map;
057import java.util.Map.Entry;
058import java.util.Set;
059import java.util.UUID;
060import java.util.concurrent.ConcurrentHashMap;
061import java.util.stream.Collectors;
062
063import javax.resource.spi.ConnectionManager;
064
065import com.mongodb.MongoClientOptions;
066import org.apache.commons.lang.StringUtils;
067import org.apache.commons.logging.Log;
068import org.apache.commons.logging.LogFactory;
069import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
070import org.nuxeo.ecm.core.api.DocumentNotFoundException;
071import org.nuxeo.ecm.core.api.Lock;
072import org.nuxeo.ecm.core.api.NuxeoException;
073import org.nuxeo.ecm.core.api.PartialList;
074import org.nuxeo.ecm.core.api.ScrollResult;
075import org.nuxeo.ecm.core.api.ScrollResultImpl;
076import org.nuxeo.ecm.core.api.model.Delta;
077import org.nuxeo.ecm.core.blob.BlobManager;
078import org.nuxeo.ecm.core.model.LockManager;
079import org.nuxeo.ecm.core.model.Repository;
080import org.nuxeo.ecm.core.query.QueryParseException;
081import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
082import org.nuxeo.ecm.core.storage.State;
083import org.nuxeo.ecm.core.storage.State.ListDiff;
084import org.nuxeo.ecm.core.storage.State.StateDiff;
085import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
086import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
087import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
088import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
089import org.nuxeo.runtime.api.Framework;
090
091import com.mongodb.BasicDBObject;
092import com.mongodb.DB;
093import com.mongodb.DBCollection;
094import com.mongodb.DBCursor;
095import com.mongodb.DBObject;
096import com.mongodb.MongoClient;
097import com.mongodb.MongoClientURI;
098import com.mongodb.QueryOperators;
099import com.mongodb.ServerAddress;
100import com.mongodb.WriteResult;
101
102/**
103 * MongoDB implementation of a {@link Repository}.
104 *
105 * @since 5.9.4
106 */
107public class MongoDBRepository extends DBSRepositoryBase {
108
109    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
110
111    private static final Long ZERO = Long.valueOf(0);
112
113    private static final Long ONE = Long.valueOf(1);
114
115    private static final Long MINUS_ONE = Long.valueOf(-11);
116
117    public static final String DB_DEFAULT = "nuxeo";
118
119    public static final String MONGODB_ID = "_id";
120
121    public static final String MONGODB_INC = "$inc";
122
123    public static final String MONGODB_SET = "$set";
124
125    public static final String MONGODB_UNSET = "$unset";
126
127    public static final String MONGODB_PUSH = "$push";
128
129    public static final String MONGODB_EACH = "$each";
130
131    public static final String MONGODB_META = "$meta";
132
133    public static final String MONGODB_TEXT_SCORE = "textScore";
134
135    private static final String MONGODB_INDEX_TEXT = "text";
136
137    private static final String MONGODB_INDEX_NAME = "name";
138
139    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
140
141    private static final String FULLTEXT_INDEX_NAME = "fulltext";
142
143    private static final String LANGUAGE_FIELD = "__language";
144
145    protected static final String COUNTER_NAME_UUID = "ecm:id";
146
147    protected static final String COUNTER_FIELD = "seq";
148
149    protected static final int MONGODB_OPTION_CONNECTION_TIMEOUT_MS = 30000;
150
151    protected static final int MONGODB_OPTION_SOCKET_TIMEOUT_MS = 60000;
152
153    protected MongoClient mongoClient;
154
155    protected DBCollection coll;
156
157    protected DBCollection countersColl;
158
159    /** The key to use to store the id in the database. */
160    protected String idKey;
161
162    /** True if we don't use MongoDB's native "_id" key to store the id. */
163    protected boolean useCustomId;
164
165    /** Number of values still available in the in-memory sequence. */
166    protected long sequenceLeft;
167
168    /** Last value used from the in-memory sequence. */
169    protected long sequenceLastValue;
170
171    /** Sequence allocation block size. */
172    protected long sequenceBlockSize;
173
174    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
175
176    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
177        super(cm, descriptor.name, descriptor);
178        try {
179            mongoClient = newMongoClient(descriptor);
180            coll = getCollection(descriptor, mongoClient);
181            countersColl = getCountersCollection(descriptor, mongoClient);
182        } catch (UnknownHostException e) {
183            throw new RuntimeException(e);
184        }
185        if (Boolean.TRUE.equals(descriptor.nativeId)) {
186            idKey = MONGODB_ID;
187        } else {
188            idKey = KEY_ID;
189        }
190        useCustomId = KEY_ID.equals(idKey);
191        if (idType == IdType.sequence || DEBUG_UUIDS) {
192            Integer sbs = descriptor.sequenceBlockSize;
193            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
194            sequenceLeft = 0;
195        }
196        initRepository();
197    }
198
199    @Override
200    public List<IdType> getAllowedIdTypes() {
201        return Arrays.asList(IdType.varchar, IdType.sequence);
202    }
203
204    @Override
205    public void shutdown() {
206        super.shutdown();
207        mongoClient.close();
208    }
209
210    // used also by unit tests
211    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
212        MongoClient ret = null;
213        String server = descriptor.server;
214        if (StringUtils.isBlank(server)) {
215            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
216        }
217        MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder()
218                // Can help to prevent firewall disconnects inactive connection, option not available from URI
219                .socketKeepAlive(true)
220                // don't wait for ever by default, can be overridden using URI options
221                .connectTimeout(MONGODB_OPTION_CONNECTION_TIMEOUT_MS)
222                .socketTimeout(MONGODB_OPTION_SOCKET_TIMEOUT_MS)
223                .description("Nuxeo");
224        if (server.startsWith("mongodb://")) {
225            // allow mongodb:// URI syntax for the server, to pass everything in one string
226            ret = new MongoClient(new MongoClientURI(server, optionsBuilder));
227        } else {
228            ret = new MongoClient(new ServerAddress(server), optionsBuilder.build());
229        }
230        if (log.isDebugEnabled()) {
231            log.debug("MongoClient initialized with options: " + ret.getMongoClientOptions().toString());
232        }
233        return ret;
234    }
235
236    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
237        if (StringUtils.isBlank(dbname)) {
238            dbname = DB_DEFAULT;
239        }
240        DB db = mongoClient.getDB(dbname);
241        return db.getCollection(collection);
242    }
243
244    // used also by unit tests
245    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
246        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
247    }
248
249    // used also by unit tests
250    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
251        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
252    }
253
254    protected String keyToBson(String key) {
255        if (useCustomId) {
256            return key;
257        } else {
258            return KEY_ID.equals(key) ? idKey : key;
259        }
260    }
261
262    protected Object valueToBson(Object value) {
263        if (value instanceof State) {
264            return stateToBson((State) value);
265        } else if (value instanceof List) {
266            @SuppressWarnings("unchecked")
267            List<Object> values = (List<Object>) value;
268            return listToBson(values);
269        } else if (value instanceof Object[]) {
270            return listToBson(Arrays.asList((Object[]) value));
271        } else {
272            return serializableToBson(value);
273        }
274    }
275
276    protected DBObject stateToBson(State state) {
277        DBObject ob = new BasicDBObject();
278        for (Entry<String, Serializable> en : state.entrySet()) {
279            Object val = valueToBson(en.getValue());
280            if (val != null) {
281                ob.put(keyToBson(en.getKey()), val);
282            }
283        }
284        return ob;
285    }
286
287    protected List<Object> listToBson(List<Object> values) {
288        ArrayList<Object> objects = new ArrayList<Object>(values.size());
289        for (Object value : values) {
290            objects.add(valueToBson(value));
291        }
292        return objects;
293    }
294
295    protected String bsonToKey(String key) {
296        if (useCustomId) {
297            return key;
298        } else {
299            return idKey.equals(key) ? KEY_ID : key;
300        }
301    }
302
303    protected State bsonToState(DBObject ob) {
304        if (ob == null) {
305            return null;
306        }
307        State state = new State(ob.keySet().size());
308        for (String key : ob.keySet()) {
309            if (useCustomId && MONGODB_ID.equals(key)) {
310                // skip native id
311                continue;
312            }
313            state.put(bsonToKey(key), bsonToValue(ob.get(key)));
314        }
315        return state;
316    }
317
318    protected Serializable bsonToValue(Object value) {
319        if (value instanceof List) {
320            @SuppressWarnings("unchecked")
321            List<Object> list = (List<Object>) value;
322            if (list.isEmpty()) {
323                return null;
324            } else {
325                if (list.get(0) instanceof DBObject) {
326                    List<Serializable> l = new ArrayList<>(list.size());
327                    for (Object el : list) {
328                        l.add(bsonToState((DBObject) el));
329                    }
330                    return (Serializable) l;
331                } else {
332                    // turn the list into a properly-typed array
333                    Class<?> klass = Object.class;
334                    for (Object o : list) {
335                        if (o != null) {
336                            klass = scalarToSerializableClass(o.getClass());
337                            break;
338                        }
339                    }
340                    Object[] ar = (Object[]) Array.newInstance(klass, list.size());
341                    int i = 0;
342                    for (Object el : list) {
343                        ar[i++] = scalarToSerializable(el);
344                    }
345                    return ar;
346                }
347            }
348        } else if (value instanceof DBObject) {
349            return bsonToState((DBObject) value);
350        } else {
351            return scalarToSerializable(value);
352        }
353    }
354
355    public static class Updates {
356        public BasicDBObject set = new BasicDBObject();
357
358        public BasicDBObject unset = new BasicDBObject();
359
360        public BasicDBObject push = new BasicDBObject();
361
362        public BasicDBObject inc = new BasicDBObject();
363    }
364
365    /**
366     * Constructs a list of MongoDB updates from the given {@link StateDiff}.
367     * <p>
368     * We need a list because some cases need two operations to avoid conflicts.
369     */
370    protected List<DBObject> diffToBson(StateDiff diff) {
371        Updates updates = new Updates();
372        diffToUpdates(diff, null, updates);
373        UpdateListBuilder builder = new UpdateListBuilder();
374        for (Entry<String, Object> en : updates.set.entrySet()) {
375            builder.update(MONGODB_SET, en.getKey(), en.getValue());
376        }
377        for (Entry<String, Object> en : updates.unset.entrySet()) {
378            builder.update(MONGODB_UNSET, en.getKey(), en.getValue());
379        }
380        for (Entry<String, Object> en : updates.push.entrySet()) {
381            builder.update(MONGODB_PUSH, en.getKey(), en.getValue());
382        }
383        for (Entry<String, Object> en : updates.inc.entrySet()) {
384            builder.update(MONGODB_INC, en.getKey(), en.getValue());
385        }
386        return builder.updateList;
387    }
388
389    /**
390     * Update list builder to prevent several updates of the same field.
391     * <p>
392     * This happens if two operations act on two fields where one is a prefix of the other.
393     * <p>
394     * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837)
395     *
396     * @since 5.9.5
397     */
398    protected static class UpdateListBuilder {
399
400        protected List<DBObject> updateList = new ArrayList<>(10);
401
402        protected DBObject update;
403
404        protected Set<String> prefixKeys;
405
406        protected Set<String> keys;
407
408        protected UpdateListBuilder() {
409            newUpdate();
410        }
411
412        protected void update(String op, String key, Object value) {
413            checkForConflict(key);
414            DBObject map = (DBObject) update.get(op);
415            if (map == null) {
416                update.put(op, map = new BasicDBObject());
417            }
418            map.put(key, value);
419        }
420
421        /**
422         * Checks if the key conflicts with one of the previous keys.
423         * <p>
424         * A conflict occurs if one key is equals to or is a prefix of the other.
425         */
426        protected void checkForConflict(String key) {
427            List<String> pKeys = getPrefixKeys(key);
428            if (conflictKeys(key, pKeys)) {
429                newUpdate();
430            }
431            prefixKeys.addAll(pKeys);
432            keys.add(key);
433        }
434
435        protected void newUpdate() {
436            updateList.add(update = new BasicDBObject());
437            prefixKeys = new HashSet<>();
438            keys = new HashSet<>();
439        }
440
441        protected boolean conflictKeys(String key, List<String> subkeys) {
442            if (prefixKeys.contains(key)) {
443                return true;
444            }
445            for (String sk: subkeys) {
446                if (keys.contains(sk)) {
447                    return true;
448                }
449            }
450            return false;
451        }
452
453        /**
454         * return a list of parents key
455         * foo.0.bar -> [foo, foo.0, foo.0.bar]
456         */
457        protected List<String> getPrefixKeys(String key) {
458            List<String> ret = new ArrayList<>(10);
459            int i=0;
460            while ((i = key.indexOf('.', i)) > 0) {
461               ret.add(key.substring(0, i++));
462            }
463            ret.add(key);
464            return ret;
465        }
466
467    }
468
469    protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) {
470        String elemPrefix = prefix == null ? "" : prefix + '.';
471        for (Entry<String, Serializable> en : diff.entrySet()) {
472            String name = elemPrefix + en.getKey();
473            Serializable value = en.getValue();
474            if (value instanceof StateDiff) {
475                diffToUpdates((StateDiff) value, name, updates);
476            } else if (value instanceof ListDiff) {
477                diffToUpdates((ListDiff) value, name, updates);
478            } else if (value instanceof Delta) {
479                diffToUpdates((Delta) value, name, updates);
480            } else {
481                // not a diff
482                if (value == null) {
483                    // for null values, beyond the space saving,
484                    // it's important to unset the field instead of setting the value to null
485                    // because $inc does not work on nulls but works on non-existent fields
486                    updates.unset.put(name, ONE);
487                } else {
488                    updates.set.put(name, valueToBson(value));
489                }
490            }
491        }
492    }
493
494    protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) {
495        if (listDiff.diff != null) {
496            String elemPrefix = prefix == null ? "" : prefix + '.';
497            int i = 0;
498            for (Object value : listDiff.diff) {
499                String name = elemPrefix + i;
500                if (value instanceof StateDiff) {
501                    diffToUpdates((StateDiff) value, name, updates);
502                } else if (value != NOP) {
503                    // set value
504                    updates.set.put(name, valueToBson(value));
505                }
506                i++;
507            }
508        }
509        if (listDiff.rpush != null) {
510            Object pushed;
511            if (listDiff.rpush.size() == 1) {
512                // no need to use $each for one element
513                pushed = valueToBson(listDiff.rpush.get(0));
514            } else {
515                pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush));
516            }
517            updates.push.put(prefix, pushed);
518        }
519    }
520
521    protected void diffToUpdates(Delta delta, String prefix, Updates updates) {
522        // MongoDB can $inc a field that doesn't exist, it's treated as 0 BUT it doesn't work on null
523        // so we ensure (in diffToUpdates) that we never store a null but remove the field instead
524        Object inc = valueToBson(delta.getDeltaValue());
525        updates.inc.put(prefix, inc);
526    }
527
528    protected Object serializableToBson(Object value) {
529        if (value instanceof Calendar) {
530            return ((Calendar) value).getTime();
531        }
532        return value;
533    }
534
535    protected Serializable scalarToSerializable(Object val) {
536        if (val instanceof Date) {
537            Calendar cal = Calendar.getInstance();
538            cal.setTime((Date) val);
539            return cal;
540        }
541        return (Serializable) val;
542    }
543
544    protected Class<?> scalarToSerializableClass(Class<?> klass) {
545        if (Date.class.isAssignableFrom(klass)) {
546            return Calendar.class;
547        }
548        return klass;
549    }
550
551    protected void initRepository() {
552        // create required indexes
553        // code does explicit queries on those
554        if (useCustomId) {
555            coll.createIndex(new BasicDBObject(idKey, ONE));
556        }
557        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
558        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
559        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
560        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
561        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
562        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
563        DBObject parentChild = new BasicDBObject();
564        parentChild.put(KEY_PARENT_ID, ONE);
565        parentChild.put(KEY_NAME, ONE);
566        coll.createIndex(parentChild);
567        // often used in user-generated queries
568        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
569        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
570        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
571        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
572        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
573        // TODO configure these from somewhere else
574        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
575        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
576        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
577        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
578        if (!isFulltextDisabled()) {
579            DBObject indexKeys = new BasicDBObject();
580            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
581            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
582            DBObject indexOptions = new BasicDBObject();
583            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
584            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
585            coll.createIndex(indexKeys, indexOptions);
586        }
587        // check root presence
588        DBObject query = new BasicDBObject(idKey, getRootId());
589        if (coll.findOne(query, justPresenceField()) != null) {
590            return;
591        }
592        // create basic repository structure needed
593        if (idType == IdType.sequence || DEBUG_UUIDS) {
594            // create the id counter
595            DBObject idCounter = new BasicDBObject();
596            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
597            idCounter.put(COUNTER_FIELD, ZERO);
598            countersColl.insert(idCounter);
599        }
600        initRoot();
601    }
602
603    protected synchronized Long getNextSequenceId() {
604        if (sequenceLeft == 0) {
605            // allocate a new sequence block
606            // the database contains the last value from the last block
607            DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
608            DBObject update = new BasicDBObject(MONGODB_INC,
609                    new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)));
610            DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false);
611            if (idCounter == null) {
612                throw new NuxeoException("Repository id counter not initialized");
613            }
614            sequenceLeft = sequenceBlockSize;
615            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
616        }
617        sequenceLeft--;
618        sequenceLastValue++;
619        return Long.valueOf(sequenceLastValue);
620    }
621
622    @Override
623    public String generateNewId() {
624        if (idType == IdType.sequence || DEBUG_UUIDS) {
625            Long id = getNextSequenceId();
626            if (DEBUG_UUIDS) {
627                return "UUID_" + id;
628            }
629            return id.toString();
630        } else {
631            return UUID.randomUUID().toString();
632        }
633    }
634
635    @Override
636    public void createState(State state) {
637        DBObject ob = stateToBson(state);
638        if (log.isTraceEnabled()) {
639            log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob);
640        }
641        coll.insert(ob);
642        // TODO dupe exception
643        // throw new DocumentException("Already exists: " + id);
644    }
645
646    @Override
647    public void createStates(List<State> states) {
648        List<DBObject> obs = states.stream().map(this::stateToBson).collect(Collectors.toList());
649        if (log.isTraceEnabled()) {
650            log.trace("MongoDB: CREATE ["
651                    + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", "))
652                    + "]: " + obs);
653        }
654        coll.insert(obs);
655    }
656
657    @Override
658    public State readState(String id) {
659        DBObject query = new BasicDBObject(idKey, id);
660        return findOne(query);
661    }
662
663    @Override
664    public List<State> readStates(List<String> ids) {
665        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
666        return findAll(query, ids.size());
667    }
668
669    @Override
670    public void updateState(String id, StateDiff diff) {
671        DBObject query = new BasicDBObject(idKey, id);
672        for (DBObject update : diffToBson(diff)) {
673            if (log.isTraceEnabled()) {
674                log.trace("MongoDB: UPDATE " + id + ": " + update);
675            }
676            coll.update(query, update);
677            // TODO dupe exception
678            // throw new DocumentException("Missing: " + id);
679        }
680    }
681
682    @Override
683    public void deleteStates(Set<String> ids) {
684        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
685        if (log.isTraceEnabled()) {
686            log.trace("MongoDB: REMOVE " + ids);
687        }
688        WriteResult w = coll.remove(query);
689        if (w.getN() != ids.size()) {
690            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
691        }
692    }
693
694    @Override
695    public State readChildState(String parentId, String name, Set<String> ignored) {
696        DBObject query = getChildQuery(parentId, name, ignored);
697        return findOne(query);
698    }
699
700    protected void logQuery(String id, DBObject fields) {
701        logQuery(new BasicDBObject(idKey, id), fields);
702    }
703
704    protected void logQuery(DBObject query, DBObject fields) {
705        if (fields == null) {
706            log.trace("MongoDB: QUERY " + query);
707        } else {
708            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
709        }
710    }
711
712    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
713        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
714                + " OFFSET " + offset + " LIMIT " + limit);
715    }
716
717    @Override
718    public boolean hasChild(String parentId, String name, Set<String> ignored) {
719        DBObject query = getChildQuery(parentId, name, ignored);
720        if (log.isTraceEnabled()) {
721            logQuery(query, justPresenceField());
722        }
723        return coll.findOne(query, justPresenceField()) != null;
724    }
725
726    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
727        DBObject query = new BasicDBObject();
728        query.put(KEY_PARENT_ID, parentId);
729        query.put(KEY_NAME, name);
730        addIgnoredIds(query, ignored);
731        return query;
732    }
733
734    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
735        if (!ignored.isEmpty()) {
736            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
737            query.put(idKey, notInIds);
738        }
739    }
740
741    @Override
742    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
743        DBObject query = new BasicDBObject(keyToBson(key), value);
744        addIgnoredIds(query, ignored);
745        return findAll(query, 0);
746    }
747
748    @Override
749    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
750        DBObject query = new BasicDBObject(keyToBson(key1), value1);
751        query.put(keyToBson(key2), value2);
752        addIgnoredIds(query, ignored);
753        return findAll(query, 0);
754    }
755
756    @Override
757    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
758            Map<String, Object[]> targetProxies) {
759        DBObject query = new BasicDBObject(key, value);
760        DBObject fields = new BasicDBObject();
761        if (useCustomId) {
762            fields.put(MONGODB_ID, ZERO);
763        }
764        fields.put(idKey, ONE);
765        fields.put(KEY_IS_PROXY, ONE);
766        fields.put(KEY_PROXY_TARGET_ID, ONE);
767        fields.put(KEY_PROXY_IDS, ONE);
768        if (log.isTraceEnabled()) {
769            logQuery(query, fields);
770        }
771        DBCursor cursor = coll.find(query, fields);
772        try {
773            for (DBObject ob : cursor) {
774                String id = (String) ob.get(idKey);
775                ids.add(id);
776                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
777                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
778                    proxyTargets.put(id, targetId);
779                }
780                if (targetProxies != null) {
781                    Object[] proxyIds = (Object[]) bsonToValue(ob.get(KEY_PROXY_IDS));
782                    if (proxyIds != null) {
783                        targetProxies.put(id, proxyIds);
784                    }
785                }
786            }
787        } finally {
788            cursor.close();
789        }
790    }
791
792    @Override
793    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
794        DBObject query = new BasicDBObject(key, value);
795        addIgnoredIds(query, ignored);
796        if (log.isTraceEnabled()) {
797            logQuery(query, justPresenceField());
798        }
799        return coll.findOne(query, justPresenceField()) != null;
800    }
801
802    protected State findOne(DBObject query) {
803        if (log.isTraceEnabled()) {
804            logQuery(query, null);
805        }
806        return bsonToState(coll.findOne(query));
807    }
808
809    protected List<State> findAll(DBObject query, int sizeHint) {
810        if (log.isTraceEnabled()) {
811            logQuery(query, null);
812        }
813        DBCursor cursor = coll.find(query);
814        Set<String> seen = new HashSet<>();
815        try {
816            List<State> list = new ArrayList<>(sizeHint);
817            for (DBObject ob : cursor) {
818                if (!seen.add((String) ob.get(idKey))) {
819                    // MongoDB cursors may return the same
820                    // object several times
821                    continue;
822                }
823                list.add(bsonToState(ob));
824            }
825            return list;
826        } finally {
827            cursor.close();
828        }
829    }
830
831    protected DBObject justPresenceField() {
832        return new BasicDBObject(MONGODB_ID, ONE);
833    }
834
835    @Override
836    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
837            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
838        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
839        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
840                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
841        builder.walk();
842        if (builder.hasFulltext && isFulltextDisabled()) {
843            throw new QueryParseException("Fulltext search disabled by configuration");
844        }
845        DBObject query = builder.getQuery();
846        addPrincipals(query, evaluator.principals);
847        DBObject orderBy = builder.getOrderBy();
848        DBObject keys = builder.getProjection();
849        // Don't do manual projection if there are no projection wildcards, as this brings no new
850        // information and is costly. The only difference is several identical rows instead of one.
851        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
852        if (manualProjection) {
853            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
854            // so we need the full state from the database
855            keys = new BasicDBObject();
856            evaluator.parse();
857        }
858
859        if (log.isTraceEnabled()) {
860            logQuery(query, keys, orderBy, limit, offset);
861        }
862
863        List<Map<String, Serializable>> projections;
864        long totalSize;
865        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
866        try {
867            if (orderBy != null) {
868                cursor.sort(orderBy);
869            }
870            projections = new ArrayList<>();
871            for (DBObject ob : cursor) {
872                State state = bsonToState(ob);
873                if (manualProjection) {
874                    projections.addAll(evaluator.matches(state));
875                } else {
876                    projections.add(DBSStateFlattener.flatten(state));
877                }
878            }
879            if (countUpTo == -1) {
880                // count full size
881                if (limit == 0) {
882                    totalSize = projections.size();
883                } else {
884                    totalSize = cursor.count();
885                }
886            } else if (countUpTo == 0) {
887                // no count
888                totalSize = -1; // not counted
889            } else {
890                // count only if less than countUpTo
891                if (limit == 0) {
892                    totalSize = projections.size();
893                } else {
894                    totalSize = cursor.copy().limit(countUpTo + 1).count();
895                }
896                if (totalSize > countUpTo) {
897                    totalSize = -2; // truncated
898                }
899            }
900        } finally {
901            cursor.close();
902        }
903        if (log.isTraceEnabled() && projections.size() != 0) {
904            log.trace("MongoDB:    -> " + projections.size());
905        }
906        return new PartialList<>(projections, totalSize);
907    }
908
909    @Override
910    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
911        checkForTimedoutScroll();
912        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
913                evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
914        builder.walk();
915        if (builder.hasFulltext && isFulltextDisabled()) {
916            throw new QueryParseException("Fulltext search disabled by configuration");
917        }
918        DBObject query = builder.getQuery();
919        DBObject keys = builder.getProjection();
920        if (log.isTraceEnabled()) {
921            logQuery(query, keys, null, 0, 0);
922        }
923
924        DBCursor cursor = coll.find(query, keys);
925        String scrollId = UUID.randomUUID().toString();
926        registerCursor(scrollId, cursor, batchSize, keepAliveSeconds);
927        return scroll(scrollId);
928    }
929
930    protected void checkForTimedoutScroll() {
931        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
932    }
933
934    protected void registerCursor(String scrollId, DBCursor cursor, int batchSize, int keepAliveSeconds) {
935        cursorResults.put(scrollId, new CursorResult(cursor, batchSize, keepAliveSeconds));
936    }
937
938    @Override
939    public ScrollResult scroll(String scrollId) {
940        CursorResult cursorResult = cursorResults.get(scrollId);
941        if (cursorResult == null) {
942            throw new NuxeoException("Unknown or timed out scrollId");
943        } else if (cursorResult.timedOut(scrollId)) {
944            throw new NuxeoException("Timed out scrollId");
945        }
946        cursorResult.touch();
947        List<String> ids = new ArrayList<>(cursorResult.batchSize);
948        synchronized (cursorResult) {
949            if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) {
950                unregisterCursor(scrollId);
951                return emptyResult();
952            }
953            while (ids.size() < cursorResult.batchSize) {
954                if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) {
955                    cursorResult.close();
956                    break;
957                } else {
958                    DBObject ob = cursorResult.cursor.next();
959                    String id = (String) ob.get(keyToBson(KEY_ID));
960                    if (id != null) {
961                        ids.add(id);
962                    } else {
963                        log.error("Got a document without id: " + ob);
964                    }
965                }
966            }
967        }
968        return new ScrollResultImpl(scrollId, ids);
969    }
970
971    protected boolean unregisterCursor(String scrollId) {
972        CursorResult cursor = cursorResults.remove(scrollId);
973        if (cursor != null) {
974            cursor.close();
975            return true;
976        }
977        return false;
978    }
979
980    protected void addPrincipals(DBObject query, Set<String> principals) {
981        if (principals != null) {
982            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
983            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
984        }
985    }
986
987    /** Keys used for document projection when marking all binaries for GC. */
988    protected DBObject binaryKeys;
989
990    @Override
991    protected void initBlobsPaths() {
992        MongoDBBlobFinder finder = new MongoDBBlobFinder();
993        finder.visit();
994        binaryKeys = finder.binaryKeys;
995    }
996
997    protected static class MongoDBBlobFinder extends BlobFinder {
998        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
999
1000        @Override
1001        protected void recordBlobPath() {
1002            path.addLast(KEY_BLOB_DATA);
1003            binaryKeys.put(StringUtils.join(path, "."), ONE);
1004            path.removeLast();
1005        }
1006    }
1007
1008    @Override
1009    public void markReferencedBinaries() {
1010        BlobManager blobManager = Framework.getService(BlobManager.class);
1011        // TODO add a query to not scan all documents
1012        if (log.isTraceEnabled()) {
1013            logQuery(new BasicDBObject(), binaryKeys);
1014        }
1015        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
1016        try {
1017            for (DBObject ob : cursor) {
1018                markReferencedBinaries(ob, blobManager);
1019            }
1020        } finally {
1021            cursor.close();
1022        }
1023    }
1024
1025    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
1026        for (String key : ob.keySet()) {
1027            Object value = ob.get(key);
1028            if (value instanceof List) {
1029                @SuppressWarnings("unchecked")
1030                List<Object> list = (List<Object>) value;
1031                for (Object v : list) {
1032                    if (v instanceof DBObject) {
1033                        markReferencedBinaries((DBObject) v, blobManager);
1034                    } else {
1035                        markReferencedBinary(v, blobManager);
1036                    }
1037                }
1038            } else if (value instanceof Object[]) {
1039                for (Object v : (Object[]) value) {
1040                    markReferencedBinary(v, blobManager);
1041                }
1042            } else if (value instanceof DBObject) {
1043                markReferencedBinaries((DBObject) value, blobManager);
1044            } else {
1045                markReferencedBinary(value, blobManager);
1046            }
1047        }
1048    }
1049
1050    protected void markReferencedBinary(Object value, BlobManager blobManager) {
1051        if (!(value instanceof String)) {
1052            return;
1053        }
1054        String key = (String) value;
1055        blobManager.markReferencedBinary(key, repositoryName);
1056    }
1057
1058    protected static final DBObject LOCK_FIELDS;
1059
1060    static {
1061        LOCK_FIELDS = new BasicDBObject();
1062        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
1063        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
1064    }
1065
1066    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
1067
1068    @Override
1069    public Lock getLock(String id) {
1070        if (log.isTraceEnabled()) {
1071            logQuery(id, LOCK_FIELDS);
1072        }
1073        DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
1074        if (res == null) {
1075            // document not found
1076            throw new DocumentNotFoundException(id);
1077        }
1078        String owner = (String) res.get(KEY_LOCK_OWNER);
1079        if (owner == null) {
1080            // not locked
1081            return null;
1082        }
1083        Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED));
1084        return new Lock(owner, created);
1085    }
1086
1087    @Override
1088    public Lock setLock(String id, Lock lock) {
1089        DBObject query = new BasicDBObject(idKey, id);
1090        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
1091        DBObject setLock = new BasicDBObject();
1092        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
1093        setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated()));
1094        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
1095        if (log.isTraceEnabled()) {
1096            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
1097        }
1098        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
1099        if (res != null) {
1100            // found a doc to lock
1101            return null;
1102        } else {
1103            // doc not found, or lock owner already set
1104            // get the old lock
1105            if (log.isTraceEnabled()) {
1106                logQuery(id, LOCK_FIELDS);
1107            }
1108            DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
1109            if (old == null) {
1110                // document not found
1111                throw new DocumentNotFoundException(id);
1112            }
1113            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1114            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1115            if (oldOwner != null) {
1116                return new Lock(oldOwner, oldCreated);
1117            }
1118            // no lock -- there was a race condition
1119            // TODO do better
1120            throw new ConcurrentUpdateException("Lock " + id);
1121        }
1122    }
1123
1124    @Override
1125    public Lock removeLock(String id, String owner) {
1126        DBObject query = new BasicDBObject(idKey, id);
1127        if (owner != null) {
1128            // remove if owner matches or null
1129            // implements LockManager.canLockBeRemoved inside MongoDB
1130            Object ownerOrNull = Arrays.asList(owner, null);
1131            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
1132        } // else unconditional remove
1133        // remove the lock
1134        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
1135        if (old != null) {
1136            // found a doc and removed the lock, return previous lock
1137            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1138            if (oldOwner == null) {
1139                // was not locked
1140                return null;
1141            } else {
1142                // return previous lock
1143                Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1144                return new Lock(oldOwner, oldCreated);
1145            }
1146        } else {
1147            // doc not found, or lock owner didn't match
1148            // get the old lock
1149            if (log.isTraceEnabled()) {
1150                logQuery(id, LOCK_FIELDS);
1151            }
1152            old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
1153            if (old == null) {
1154                // document not found
1155                throw new DocumentNotFoundException(id);
1156            }
1157            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1158            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1159            if (oldOwner != null) {
1160                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
1161                    // existing mismatched lock, flag failure
1162                    return new Lock(oldOwner, oldCreated, true);
1163                }
1164                // old owner should have matched -- there was a race condition
1165                // TODO do better
1166                throw new ConcurrentUpdateException("Unlock " + id);
1167            }
1168            // old owner null, should have matched -- there was a race condition
1169            // TODO do better
1170            throw new ConcurrentUpdateException("Unlock " + id);
1171        }
1172    }
1173
1174    @Override
1175    public void closeLockManager() {
1176
1177    }
1178
1179    @Override
1180    public void clearLockManagerCaches() {
1181    }
1182
1183    protected class CursorResult {
1184        // Note that MongoDB cursor automatically timeout after 10 minutes of inactivity by default.
1185        protected DBCursor cursor;
1186        protected final int batchSize;
1187        protected long lastCallTimestamp;
1188        protected final int keepAliveSeconds;
1189
1190        public CursorResult(DBCursor cursor, int batchSize, int keepAliveSeconds) {
1191            this.cursor = cursor;
1192            this.batchSize = batchSize;
1193            this.keepAliveSeconds = keepAliveSeconds;
1194            lastCallTimestamp = System.currentTimeMillis();
1195        }
1196
1197        void touch() {
1198            lastCallTimestamp = System.currentTimeMillis();
1199        }
1200
1201        boolean timedOut(String scrollId) {
1202            long now = System.currentTimeMillis();
1203            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
1204                if (unregisterCursor(scrollId)) {
1205                    log.warn("Scroll " + scrollId + " timed out");
1206                }
1207                return true;
1208            }
1209            return false;
1210        }
1211
1212        public void close() {
1213            if (cursor != null) {
1214                cursor.close();
1215            }
1216            cursor = null;
1217        }
1218    }
1219}