001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
045
046import java.io.Serializable;
047import java.lang.reflect.Array;
048import java.net.UnknownHostException;
049import java.util.ArrayList;
050import java.util.Arrays;
051import java.util.Calendar;
052import java.util.Date;
053import java.util.HashSet;
054import java.util.List;
055import java.util.Map;
056import java.util.Map.Entry;
057import java.util.Set;
058import java.util.UUID;
059import java.util.stream.Collectors;
060
061import javax.resource.spi.ConnectionManager;
062
063import org.apache.commons.lang.StringUtils;
064import org.apache.commons.logging.Log;
065import org.apache.commons.logging.LogFactory;
066import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
067import org.nuxeo.ecm.core.api.DocumentNotFoundException;
068import org.nuxeo.ecm.core.api.Lock;
069import org.nuxeo.ecm.core.api.NuxeoException;
070import org.nuxeo.ecm.core.api.PartialList;
071import org.nuxeo.ecm.core.api.model.Delta;
072import org.nuxeo.ecm.core.blob.BlobManager;
073import org.nuxeo.ecm.core.model.LockManager;
074import org.nuxeo.ecm.core.model.Repository;
075import org.nuxeo.ecm.core.query.QueryParseException;
076import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
077import org.nuxeo.ecm.core.storage.State;
078import org.nuxeo.ecm.core.storage.State.ListDiff;
079import org.nuxeo.ecm.core.storage.State.StateDiff;
080import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
081import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
082import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
083import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener;
084import org.nuxeo.runtime.api.Framework;
085
086import com.mongodb.BasicDBObject;
087import com.mongodb.DB;
088import com.mongodb.DBCollection;
089import com.mongodb.DBCursor;
090import com.mongodb.DBObject;
091import com.mongodb.MongoClient;
092import com.mongodb.MongoClientURI;
093import com.mongodb.QueryOperators;
094import com.mongodb.ServerAddress;
095import com.mongodb.WriteResult;
096
097/**
098 * MongoDB implementation of a {@link Repository}.
099 *
100 * @since 5.9.4
101 */
102public class MongoDBRepository extends DBSRepositoryBase {
103
104    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
105
106    private static final Long ZERO = Long.valueOf(0);
107
108    private static final Long ONE = Long.valueOf(1);
109
110    private static final Long MINUS_ONE = Long.valueOf(-11);
111
112    public static final String DB_DEFAULT = "nuxeo";
113
114    public static final String MONGODB_ID = "_id";
115
116    public static final String MONGODB_INC = "$inc";
117
118    public static final String MONGODB_SET = "$set";
119
120    public static final String MONGODB_UNSET = "$unset";
121
122    public static final String MONGODB_PUSH = "$push";
123
124    public static final String MONGODB_EACH = "$each";
125
126    public static final String MONGODB_META = "$meta";
127
128    public static final String MONGODB_TEXT_SCORE = "textScore";
129
130    private static final String MONGODB_INDEX_TEXT = "text";
131
132    private static final String MONGODB_INDEX_NAME = "name";
133
134    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
135
136    private static final String FULLTEXT_INDEX_NAME = "fulltext";
137
138    private static final String LANGUAGE_FIELD = "__language";
139
140    protected static final String COUNTER_NAME_UUID = "ecm:id";
141
142    protected static final String COUNTER_FIELD = "seq";
143
144    protected MongoClient mongoClient;
145
146    protected DBCollection coll;
147
148    protected DBCollection countersColl;
149
150    /** The key to use to store the id in the database. */
151    protected String idKey;
152
153    /** True if we don't use MongoDB's native "_id" key to store the id. */
154    protected boolean useCustomId;
155
156    /** Number of values still available in the in-memory sequence. */
157    protected long sequenceLeft;
158
159    /** Last value used from the in-memory sequence. */
160    protected long sequenceLastValue;
161
162    /** Sequence allocation block size. */
163    protected long sequenceBlockSize;
164
165    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
166        super(cm, descriptor.name, descriptor);
167        try {
168            mongoClient = newMongoClient(descriptor);
169            coll = getCollection(descriptor, mongoClient);
170            countersColl = getCountersCollection(descriptor, mongoClient);
171        } catch (UnknownHostException e) {
172            throw new RuntimeException(e);
173        }
174        if (Boolean.TRUE.equals(descriptor.nativeId)) {
175            idKey = MONGODB_ID;
176        } else {
177            idKey = KEY_ID;
178        }
179        useCustomId = KEY_ID.equals(idKey);
180        if (idType == IdType.sequence || DEBUG_UUIDS) {
181            Integer sbs = descriptor.sequenceBlockSize;
182            sequenceBlockSize = sbs == null ? 1 : sbs.longValue();
183            sequenceLeft = 0;
184        }
185        initRepository();
186    }
187
188    @Override
189    public List<IdType> getAllowedIdTypes() {
190        return Arrays.asList(IdType.varchar, IdType.sequence);
191    }
192
193    @Override
194    public void shutdown() {
195        super.shutdown();
196        mongoClient.close();
197    }
198
199    // used also by unit tests
200    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
201        String server = descriptor.server;
202        if (StringUtils.isBlank(server)) {
203            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
204        }
205        if (server.startsWith("mongodb://")) {
206            // allow mongodb:// URI syntax for the server, to pass everything in one string
207            return new MongoClient(new MongoClientURI(server));
208        } else {
209            return new MongoClient(new ServerAddress(server));
210        }
211    }
212
213    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
214        if (StringUtils.isBlank(dbname)) {
215            dbname = DB_DEFAULT;
216        }
217        DB db = mongoClient.getDB(dbname);
218        return db.getCollection(collection);
219    }
220
221    // used also by unit tests
222    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
223        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
224    }
225
226    // used also by unit tests
227    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
228        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
229    }
230
231    protected String keyToBson(String key) {
232        if (useCustomId) {
233            return key;
234        } else {
235            return KEY_ID.equals(key) ? idKey : key;
236        }
237    }
238
239    protected Object valueToBson(Object value) {
240        if (value instanceof State) {
241            return stateToBson((State) value);
242        } else if (value instanceof List) {
243            @SuppressWarnings("unchecked")
244            List<Object> values = (List<Object>) value;
245            return listToBson(values);
246        } else if (value instanceof Object[]) {
247            return listToBson(Arrays.asList((Object[]) value));
248        } else {
249            return serializableToBson(value);
250        }
251    }
252
253    protected DBObject stateToBson(State state) {
254        DBObject ob = new BasicDBObject();
255        for (Entry<String, Serializable> en : state.entrySet()) {
256            Object val = valueToBson(en.getValue());
257            if (val != null) {
258                ob.put(keyToBson(en.getKey()), val);
259            }
260        }
261        return ob;
262    }
263
264    protected List<Object> listToBson(List<Object> values) {
265        ArrayList<Object> objects = new ArrayList<Object>(values.size());
266        for (Object value : values) {
267            objects.add(valueToBson(value));
268        }
269        return objects;
270    }
271
272    protected String bsonToKey(String key) {
273        if (useCustomId) {
274            return key;
275        } else {
276            return idKey.equals(key) ? KEY_ID : key;
277        }
278    }
279
280    protected State bsonToState(DBObject ob) {
281        if (ob == null) {
282            return null;
283        }
284        State state = new State(ob.keySet().size());
285        for (String key : ob.keySet()) {
286            if (useCustomId && MONGODB_ID.equals(key)) {
287                // skip native id
288                continue;
289            }
290            state.put(bsonToKey(key), bsonToValue(ob.get(key)));
291        }
292        return state;
293    }
294
295    protected Serializable bsonToValue(Object value) {
296        if (value instanceof List) {
297            @SuppressWarnings("unchecked")
298            List<Object> list = (List<Object>) value;
299            if (list.isEmpty()) {
300                return null;
301            } else {
302                if (list.get(0) instanceof DBObject) {
303                    List<Serializable> l = new ArrayList<>(list.size());
304                    for (Object el : list) {
305                        l.add(bsonToState((DBObject) el));
306                    }
307                    return (Serializable) l;
308                } else {
309                    // turn the list into a properly-typed array
310                    Class<?> klass = Object.class;
311                    for (Object o : list) {
312                        if (o != null) {
313                            klass = scalarToSerializableClass(o.getClass());
314                            break;
315                        }
316                    }
317                    Object[] ar = (Object[]) Array.newInstance(klass, list.size());
318                    int i = 0;
319                    for (Object el : list) {
320                        ar[i++] = scalarToSerializable(el);
321                    }
322                    return ar;
323                }
324            }
325        } else if (value instanceof DBObject) {
326            return bsonToState((DBObject) value);
327        } else {
328            return scalarToSerializable(value);
329        }
330    }
331
332    public static class Updates {
333        public BasicDBObject set = new BasicDBObject();
334
335        public BasicDBObject unset = new BasicDBObject();
336
337        public BasicDBObject push = new BasicDBObject();
338
339        public BasicDBObject inc = new BasicDBObject();
340    }
341
342    /**
343     * Constructs a list of MongoDB updates from the given {@link StateDiff}.
344     * <p>
345     * We need a list because some cases need two operations to avoid conflicts.
346     */
347    protected List<DBObject> diffToBson(StateDiff diff) {
348        Updates updates = new Updates();
349        diffToUpdates(diff, null, updates);
350        UpdateListBuilder builder = new UpdateListBuilder();
351        for (Entry<String, Object> en : updates.set.entrySet()) {
352            builder.update(MONGODB_SET, en.getKey(), en.getValue());
353        }
354        for (Entry<String, Object> en : updates.unset.entrySet()) {
355            builder.update(MONGODB_UNSET, en.getKey(), en.getValue());
356        }
357        for (Entry<String, Object> en : updates.push.entrySet()) {
358            builder.update(MONGODB_PUSH, en.getKey(), en.getValue());
359        }
360        for (Entry<String, Object> en : updates.inc.entrySet()) {
361            builder.update(MONGODB_INC, en.getKey(), en.getValue());
362        }
363        return builder.updateList;
364    }
365
366    /**
367     * Update list builder to prevent several updates of the same field.
368     * <p>
369     * This happens if two operations act on two fields where one is a prefix of the other.
370     * <p>
371     * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837)
372     *
373     * @since 5.9.5
374     */
375    protected static class UpdateListBuilder {
376
377        protected List<DBObject> updateList = new ArrayList<>(1);
378
379        protected DBObject update;
380
381        protected List<String> keys;
382
383        protected UpdateListBuilder() {
384            newUpdate();
385        }
386
387        protected void newUpdate() {
388            updateList.add(update = new BasicDBObject());
389            keys = new ArrayList<>();
390        }
391
392        protected void update(String op, String key, Object value) {
393            if (conflicts(key, keys)) {
394                newUpdate();
395            }
396            keys.add(key);
397            DBObject map = (DBObject) update.get(op);
398            if (map == null) {
399                update.put(op, map = new BasicDBObject());
400            }
401            map.put(key, value);
402        }
403
404        /**
405         * Checks if the key conflicts with one of the previous keys.
406         * <p>
407         * A conflict occurs if one key is equals to or is a prefix of the other.
408         */
409        protected boolean conflicts(String key, List<String> previousKeys) {
410            String keydot = key + '.';
411            for (String prev : previousKeys) {
412                if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) {
413                    return true;
414                }
415            }
416            return false;
417        }
418    }
419
420    protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) {
421        String elemPrefix = prefix == null ? "" : prefix + '.';
422        for (Entry<String, Serializable> en : diff.entrySet()) {
423            String name = elemPrefix + en.getKey();
424            Serializable value = en.getValue();
425            if (value instanceof StateDiff) {
426                diffToUpdates((StateDiff) value, name, updates);
427            } else if (value instanceof ListDiff) {
428                diffToUpdates((ListDiff) value, name, updates);
429            } else if (value instanceof Delta) {
430                diffToUpdates((Delta) value, name, updates);
431            } else {
432                // not a diff
433                if (value == null) {
434                    // for null values, beyond the space saving,
435                    // it's important to unset the field instead of setting the value to null
436                    // because $inc does not work on nulls but works on non-existent fields
437                    updates.unset.put(name, ONE);
438                } else {
439                    updates.set.put(name, valueToBson(value));
440                }
441            }
442        }
443    }
444
445    protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) {
446        if (listDiff.diff != null) {
447            String elemPrefix = prefix == null ? "" : prefix + '.';
448            int i = 0;
449            for (Object value : listDiff.diff) {
450                String name = elemPrefix + i;
451                if (value instanceof StateDiff) {
452                    diffToUpdates((StateDiff) value, name, updates);
453                } else if (value != NOP) {
454                    // set value
455                    updates.set.put(name, valueToBson(value));
456                }
457                i++;
458            }
459        }
460        if (listDiff.rpush != null) {
461            Object pushed;
462            if (listDiff.rpush.size() == 1) {
463                // no need to use $each for one element
464                pushed = valueToBson(listDiff.rpush.get(0));
465            } else {
466                pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush));
467            }
468            updates.push.put(prefix, pushed);
469        }
470    }
471
472    protected void diffToUpdates(Delta delta, String prefix, Updates updates) {
473        // MongoDB can $inc a field that doesn't exist, it's treated as 0 BUT it doesn't work on null
474        // so we ensure (in diffToUpdates) that we never store a null but remove the field instead
475        Object inc = valueToBson(delta.getDeltaValue());
476        updates.inc.put(prefix, inc);
477    }
478
479    protected Object serializableToBson(Object value) {
480        if (value instanceof Calendar) {
481            return ((Calendar) value).getTime();
482        }
483        return value;
484    }
485
486    protected Serializable scalarToSerializable(Object val) {
487        if (val instanceof Date) {
488            Calendar cal = Calendar.getInstance();
489            cal.setTime((Date) val);
490            return cal;
491        }
492        return (Serializable) val;
493    }
494
495    protected Class<?> scalarToSerializableClass(Class<?> klass) {
496        if (Date.class.isAssignableFrom(klass)) {
497            return Calendar.class;
498        }
499        return klass;
500    }
501
502    protected void initRepository() {
503        // create required indexes
504        // code does explicit queries on those
505        if (useCustomId) {
506            coll.createIndex(new BasicDBObject(idKey, ONE));
507        }
508        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
509        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
510        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
511        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
512        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
513        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
514        DBObject parentChild = new BasicDBObject();
515        parentChild.put(KEY_PARENT_ID, ONE);
516        parentChild.put(KEY_NAME, ONE);
517        coll.createIndex(parentChild);
518        // often used in user-generated queries
519        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
520        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
521        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
522        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
523        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
524        // TODO configure these from somewhere else
525        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
526        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
527        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
528        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
529        if (!isFulltextDisabled()) {
530            DBObject indexKeys = new BasicDBObject();
531            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
532            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
533            DBObject indexOptions = new BasicDBObject();
534            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
535            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
536            coll.createIndex(indexKeys, indexOptions);
537        }
538        // check root presence
539        DBObject query = new BasicDBObject(idKey, getRootId());
540        if (coll.findOne(query, justPresenceField()) != null) {
541            return;
542        }
543        // create basic repository structure needed
544        if (idType == IdType.sequence || DEBUG_UUIDS) {
545            // create the id counter
546            DBObject idCounter = new BasicDBObject();
547            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
548            idCounter.put(COUNTER_FIELD, ZERO);
549            countersColl.insert(idCounter);
550        }
551        initRoot();
552    }
553
554    protected synchronized Long getNextSequenceId() {
555        if (sequenceLeft == 0) {
556            // allocate a new sequence block
557            // the database contains the last value from the last block
558            DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
559            DBObject update = new BasicDBObject(MONGODB_INC,
560                    new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)));
561            DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false);
562            if (idCounter == null) {
563                throw new NuxeoException("Repository id counter not initialized");
564            }
565            sequenceLeft = sequenceBlockSize;
566            sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize;
567        }
568        sequenceLeft--;
569        sequenceLastValue++;
570        return Long.valueOf(sequenceLastValue);
571    }
572
573    @Override
574    public String generateNewId() {
575        if (idType == IdType.sequence || DEBUG_UUIDS) {
576            Long id = getNextSequenceId();
577            if (DEBUG_UUIDS) {
578                return "UUID_" + id;
579            }
580            return id.toString();
581        } else {
582            return UUID.randomUUID().toString();
583        }
584    }
585
586    @Override
587    public void createState(State state) {
588        DBObject ob = stateToBson(state);
589        if (log.isTraceEnabled()) {
590            log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob);
591        }
592        coll.insert(ob);
593        // TODO dupe exception
594        // throw new DocumentException("Already exists: " + id);
595    }
596
597    @Override
598    public void createStates(List<State> states) {
599        List<DBObject> obs = states.stream().map(this::stateToBson).collect(Collectors.toList());
600        if (log.isTraceEnabled()) {
601            log.trace("MongoDB: CREATE ["
602                    + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", "))
603                    + "]: " + obs);
604        }
605        coll.insert(obs);
606    }
607
608    @Override
609    public State readState(String id) {
610        DBObject query = new BasicDBObject(idKey, id);
611        return findOne(query);
612    }
613
614    @Override
615    public List<State> readStates(List<String> ids) {
616        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
617        return findAll(query, ids.size());
618    }
619
620    @Override
621    public void updateState(String id, StateDiff diff) {
622        DBObject query = new BasicDBObject(idKey, id);
623        for (DBObject update : diffToBson(diff)) {
624            if (log.isTraceEnabled()) {
625                log.trace("MongoDB: UPDATE " + id + ": " + update);
626            }
627            coll.update(query, update);
628            // TODO dupe exception
629            // throw new DocumentException("Missing: " + id);
630        }
631    }
632
633    @Override
634    public void deleteStates(Set<String> ids) {
635        DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids));
636        if (log.isTraceEnabled()) {
637            log.trace("MongoDB: REMOVE " + ids);
638        }
639        WriteResult w = coll.remove(query);
640        if (w.getN() != ids.size()) {
641            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
642        }
643    }
644
645    @Override
646    public State readChildState(String parentId, String name, Set<String> ignored) {
647        DBObject query = getChildQuery(parentId, name, ignored);
648        return findOne(query);
649    }
650
651    protected void logQuery(String id, DBObject fields) {
652        logQuery(new BasicDBObject(idKey, id), fields);
653    }
654
655    protected void logQuery(DBObject query, DBObject fields) {
656        if (fields == null) {
657            log.trace("MongoDB: QUERY " + query);
658        } else {
659            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
660        }
661    }
662
663    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
664        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
665                + " OFFSET " + offset + " LIMIT " + limit);
666    }
667
668    @Override
669    public boolean hasChild(String parentId, String name, Set<String> ignored) {
670        DBObject query = getChildQuery(parentId, name, ignored);
671        if (log.isTraceEnabled()) {
672            logQuery(query, justPresenceField());
673        }
674        return coll.findOne(query, justPresenceField()) != null;
675    }
676
677    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
678        DBObject query = new BasicDBObject();
679        query.put(KEY_PARENT_ID, parentId);
680        query.put(KEY_NAME, name);
681        addIgnoredIds(query, ignored);
682        return query;
683    }
684
685    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
686        if (!ignored.isEmpty()) {
687            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
688            query.put(idKey, notInIds);
689        }
690    }
691
692    @Override
693    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
694        DBObject query = new BasicDBObject(keyToBson(key), value);
695        addIgnoredIds(query, ignored);
696        return findAll(query, 0);
697    }
698
699    @Override
700    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
701        DBObject query = new BasicDBObject(keyToBson(key1), value1);
702        query.put(keyToBson(key2), value2);
703        addIgnoredIds(query, ignored);
704        return findAll(query, 0);
705    }
706
707    @Override
708    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
709            Map<String, Object[]> targetProxies) {
710        DBObject query = new BasicDBObject(key, value);
711        DBObject fields = new BasicDBObject();
712        if (useCustomId) {
713            fields.put(MONGODB_ID, ZERO);
714        }
715        fields.put(idKey, ONE);
716        fields.put(KEY_IS_PROXY, ONE);
717        fields.put(KEY_PROXY_TARGET_ID, ONE);
718        fields.put(KEY_PROXY_IDS, ONE);
719        if (log.isTraceEnabled()) {
720            logQuery(query, fields);
721        }
722        DBCursor cursor = coll.find(query, fields);
723        try {
724            for (DBObject ob : cursor) {
725                String id = (String) ob.get(idKey);
726                ids.add(id);
727                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
728                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
729                    proxyTargets.put(id, targetId);
730                }
731                if (targetProxies != null) {
732                    Object[] proxyIds = (Object[]) bsonToValue(ob.get(KEY_PROXY_IDS));
733                    if (proxyIds != null) {
734                        targetProxies.put(id, proxyIds);
735                    }
736                }
737            }
738        } finally {
739            cursor.close();
740        }
741    }
742
743    @Override
744    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
745        DBObject query = new BasicDBObject(key, value);
746        addIgnoredIds(query, ignored);
747        if (log.isTraceEnabled()) {
748            logQuery(query, justPresenceField());
749        }
750        return coll.findOne(query, justPresenceField()) != null;
751    }
752
753    protected State findOne(DBObject query) {
754        if (log.isTraceEnabled()) {
755            logQuery(query, null);
756        }
757        return bsonToState(coll.findOne(query));
758    }
759
760    protected List<State> findAll(DBObject query, int sizeHint) {
761        if (log.isTraceEnabled()) {
762            logQuery(query, null);
763        }
764        DBCursor cursor = coll.find(query);
765        Set<String> seen = new HashSet<>();
766        try {
767            List<State> list = new ArrayList<>(sizeHint);
768            for (DBObject ob : cursor) {
769                if (!seen.add((String) ob.get(idKey))) {
770                    // MongoDB cursors may return the same
771                    // object several times
772                    continue;
773                }
774                list.add(bsonToState(ob));
775            }
776            return list;
777        } finally {
778            cursor.close();
779        }
780    }
781
782    protected DBObject justPresenceField() {
783        return new BasicDBObject(MONGODB_ID, ONE);
784    }
785
786    @Override
787    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
788            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
789        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
790        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(),
791                evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
792        builder.walk();
793        if (builder.hasFulltext && isFulltextDisabled()) {
794            throw new QueryParseException("Fulltext search disabled by configuration");
795        }
796        DBObject query = builder.getQuery();
797        addPrincipals(query, evaluator.principals);
798        DBObject orderBy = builder.getOrderBy();
799        DBObject keys = builder.getProjection();
800        // Don't do manual projection if there are no projection wildcards, as this brings no new
801        // information and is costly. The only difference is several identical rows instead of one.
802        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
803        if (manualProjection) {
804            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
805            // so we need the full state from the database
806            keys = new BasicDBObject();
807            evaluator.parse();
808        }
809
810        if (log.isTraceEnabled()) {
811            logQuery(query, keys, orderBy, limit, offset);
812        }
813
814        List<Map<String, Serializable>> projections;
815        long totalSize;
816        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
817        try {
818            if (orderBy != null) {
819                cursor.sort(orderBy);
820            }
821            projections = new ArrayList<>();
822            for (DBObject ob : cursor) {
823                State state = bsonToState(ob);
824                if (manualProjection) {
825                    projections.addAll(evaluator.matches(state));
826                } else {
827                    projections.add(DBSStateFlattener.flatten(state));
828                }
829            }
830            if (countUpTo == -1) {
831                // count full size
832                if (limit == 0) {
833                    totalSize = projections.size();
834                } else {
835                    totalSize = cursor.count();
836                }
837            } else if (countUpTo == 0) {
838                // no count
839                totalSize = -1; // not counted
840            } else {
841                // count only if less than countUpTo
842                if (limit == 0) {
843                    totalSize = projections.size();
844                } else {
845                    totalSize = cursor.copy().limit(countUpTo + 1).count();
846                }
847                if (totalSize > countUpTo) {
848                    totalSize = -2; // truncated
849                }
850            }
851        } finally {
852            cursor.close();
853        }
854        if (log.isTraceEnabled() && projections.size() != 0) {
855            log.trace("MongoDB:    -> " + projections.size());
856        }
857        return new PartialList<>(projections, totalSize);
858    }
859
860    protected void addPrincipals(DBObject query, Set<String> principals) {
861        if (principals != null) {
862            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
863            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
864        }
865    }
866
867    /** Keys used for document projection when marking all binaries for GC. */
868    protected DBObject binaryKeys;
869
870    @Override
871    protected void initBlobsPaths() {
872        MongoDBBlobFinder finder = new MongoDBBlobFinder();
873        finder.visit();
874        binaryKeys = finder.binaryKeys;
875    }
876
877    protected static class MongoDBBlobFinder extends BlobFinder {
878        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
879
880        @Override
881        protected void recordBlobPath() {
882            path.addLast(KEY_BLOB_DATA);
883            binaryKeys.put(StringUtils.join(path, "."), ONE);
884            path.removeLast();
885        }
886    }
887
888    @Override
889    public void markReferencedBinaries() {
890        BlobManager blobManager = Framework.getService(BlobManager.class);
891        // TODO add a query to not scan all documents
892        if (log.isTraceEnabled()) {
893            logQuery(new BasicDBObject(), binaryKeys);
894        }
895        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
896        try {
897            for (DBObject ob : cursor) {
898                markReferencedBinaries(ob, blobManager);
899            }
900        } finally {
901            cursor.close();
902        }
903    }
904
905    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
906        for (String key : ob.keySet()) {
907            Object value = ob.get(key);
908            if (value instanceof List) {
909                @SuppressWarnings("unchecked")
910                List<Object> list = (List<Object>) value;
911                for (Object v : list) {
912                    if (v instanceof DBObject) {
913                        markReferencedBinaries((DBObject) v, blobManager);
914                    } else {
915                        markReferencedBinary(v, blobManager);
916                    }
917                }
918            } else if (value instanceof Object[]) {
919                for (Object v : (Object[]) value) {
920                    markReferencedBinary(v, blobManager);
921                }
922            } else if (value instanceof DBObject) {
923                markReferencedBinaries((DBObject) value, blobManager);
924            } else {
925                markReferencedBinary(value, blobManager);
926            }
927        }
928    }
929
930    protected void markReferencedBinary(Object value, BlobManager blobManager) {
931        if (!(value instanceof String)) {
932            return;
933        }
934        String key = (String) value;
935        blobManager.markReferencedBinary(key, repositoryName);
936    }
937
938    protected static final DBObject LOCK_FIELDS;
939
940    static {
941        LOCK_FIELDS = new BasicDBObject();
942        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
943        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
944    }
945
946    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
947
948    @Override
949    public Lock getLock(String id) {
950        if (log.isTraceEnabled()) {
951            logQuery(id, LOCK_FIELDS);
952        }
953        DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
954        if (res == null) {
955            // document not found
956            throw new DocumentNotFoundException(id);
957        }
958        String owner = (String) res.get(KEY_LOCK_OWNER);
959        if (owner == null) {
960            // not locked
961            return null;
962        }
963        Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED));
964        return new Lock(owner, created);
965    }
966
967    @Override
968    public Lock setLock(String id, Lock lock) {
969        DBObject query = new BasicDBObject(idKey, id);
970        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
971        DBObject setLock = new BasicDBObject();
972        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
973        setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated()));
974        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
975        if (log.isTraceEnabled()) {
976            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
977        }
978        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
979        if (res != null) {
980            // found a doc to lock
981            return null;
982        } else {
983            // doc not found, or lock owner already set
984            // get the old lock
985            if (log.isTraceEnabled()) {
986                logQuery(id, LOCK_FIELDS);
987            }
988            DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
989            if (old == null) {
990                // document not found
991                throw new DocumentNotFoundException(id);
992            }
993            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
994            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
995            if (oldOwner != null) {
996                return new Lock(oldOwner, oldCreated);
997            }
998            // no lock -- there was a race condition
999            // TODO do better
1000            throw new ConcurrentUpdateException("Lock " + id);
1001        }
1002    }
1003
1004    @Override
1005    public Lock removeLock(String id, String owner) {
1006        DBObject query = new BasicDBObject(idKey, id);
1007        if (owner != null) {
1008            // remove if owner matches or null
1009            // implements LockManager.canLockBeRemoved inside MongoDB
1010            Object ownerOrNull = Arrays.asList(owner, null);
1011            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
1012        } // else unconditional remove
1013        // remove the lock
1014        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
1015        if (old != null) {
1016            // found a doc and removed the lock, return previous lock
1017            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1018            if (oldOwner == null) {
1019                // was not locked
1020                return null;
1021            } else {
1022                // return previous lock
1023                Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1024                return new Lock(oldOwner, oldCreated);
1025            }
1026        } else {
1027            // doc not found, or lock owner didn't match
1028            // get the old lock
1029            if (log.isTraceEnabled()) {
1030                logQuery(id, LOCK_FIELDS);
1031            }
1032            old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS);
1033            if (old == null) {
1034                // document not found
1035                throw new DocumentNotFoundException(id);
1036            }
1037            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1038            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1039            if (oldOwner != null) {
1040                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
1041                    // existing mismatched lock, flag failure
1042                    return new Lock(oldOwner, oldCreated, true);
1043                }
1044                // old owner should have matched -- there was a race condition
1045                // TODO do better
1046                throw new ConcurrentUpdateException("Unlock " + id);
1047            }
1048            // old owner null, should have matched -- there was a race condition
1049            // TODO do better
1050            throw new ConcurrentUpdateException("Unlock " + id);
1051        }
1052    }
1053
1054    @Override
1055    public void closeLockManager() {
1056
1057    }
1058
1059    @Override
1060    public void clearLockManagerCaches() {
1061    }
1062
1063}