001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PREFIX;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
045import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
046
047import java.io.Serializable;
048import java.lang.reflect.Array;
049import java.net.UnknownHostException;
050import java.util.ArrayList;
051import java.util.Arrays;
052import java.util.Calendar;
053import java.util.Date;
054import java.util.HashMap;
055import java.util.HashSet;
056import java.util.List;
057import java.util.Map;
058import java.util.Map.Entry;
059import java.util.Set;
060import java.util.UUID;
061
062import org.apache.commons.lang.StringUtils;
063import org.apache.commons.logging.Log;
064import org.apache.commons.logging.LogFactory;
065import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
066import org.nuxeo.ecm.core.api.DocumentNotFoundException;
067import org.nuxeo.ecm.core.api.Lock;
068import org.nuxeo.ecm.core.api.NuxeoException;
069import org.nuxeo.ecm.core.api.PartialList;
070import org.nuxeo.ecm.core.api.model.Delta;
071import org.nuxeo.ecm.core.blob.BlobManager;
072import org.nuxeo.ecm.core.model.LockManager;
073import org.nuxeo.ecm.core.model.Repository;
074import org.nuxeo.ecm.core.query.QueryParseException;
075import org.nuxeo.ecm.core.query.sql.model.Expression;
076import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
077import org.nuxeo.ecm.core.query.sql.model.SelectClause;
078import org.nuxeo.ecm.core.storage.State;
079import org.nuxeo.ecm.core.storage.State.ListDiff;
080import org.nuxeo.ecm.core.storage.State.StateDiff;
081import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
082import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
083import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
084import org.nuxeo.ecm.core.storage.dbs.DBSSession;
085import org.nuxeo.runtime.api.Framework;
086
087import com.mongodb.BasicDBObject;
088import com.mongodb.DB;
089import com.mongodb.DBCollection;
090import com.mongodb.DBCursor;
091import com.mongodb.DBObject;
092import com.mongodb.MongoClient;
093import com.mongodb.MongoClientURI;
094import com.mongodb.QueryOperators;
095import com.mongodb.ServerAddress;
096import com.mongodb.WriteResult;
097
098/**
099 * MongoDB implementation of a {@link Repository}.
100 *
101 * @since 5.9.4
102 */
103public class MongoDBRepository extends DBSRepositoryBase {
104
105    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
106
107    private static final Long ZERO = Long.valueOf(0);
108
109    private static final Long ONE = Long.valueOf(1);
110
111    private static final Long MINUS_ONE = Long.valueOf(-11);
112
113    public static final String DB_DEFAULT = "nuxeo";
114
115    public static final String MONGODB_ID = "_id";
116
117    public static final String MONGODB_INC = "$inc";
118
119    public static final String MONGODB_SET = "$set";
120
121    public static final String MONGODB_UNSET = "$unset";
122
123    public static final String MONGODB_PUSH = "$push";
124
125    public static final String MONGODB_EACH = "$each";
126
127    public static final String MONGODB_META = "$meta";
128
129    public static final String MONGODB_TEXT_SCORE = "textScore";
130
131    private static final String MONGODB_INDEX_TEXT = "text";
132
133    private static final String MONGODB_INDEX_NAME = "name";
134
135    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
136
137    private static final String FULLTEXT_INDEX_NAME = "fulltext";
138
139    private static final String LANGUAGE_FIELD = "__language";
140
141    protected static final String COUNTER_NAME_UUID = "ecm:id";
142
143    protected static final String COUNTER_FIELD = "seq";
144
145    protected MongoClient mongoClient;
146
147    protected DBCollection coll;
148
149    protected DBCollection countersColl;
150
151    public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) {
152        super(descriptor.name, descriptor.getFulltextDescriptor());
153        try {
154            mongoClient = newMongoClient(descriptor);
155            coll = getCollection(descriptor, mongoClient);
156            countersColl = getCountersCollection(descriptor, mongoClient);
157        } catch (UnknownHostException e) {
158            throw new RuntimeException(e);
159        }
160        initRepository();
161    }
162
163    @Override
164    public void shutdown() {
165        super.shutdown();
166        mongoClient.close();
167    }
168
169    // used also by unit tests
170    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
171        String server = descriptor.server;
172        if (StringUtils.isBlank(server)) {
173            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
174        }
175        if (server.startsWith("mongodb://")) {
176            // allow mongodb:// URI syntax for the server, to pass everything in one string
177            return new MongoClient(new MongoClientURI(server));
178        } else {
179            return new MongoClient(new ServerAddress(server));
180        }
181    }
182
183    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
184        if (StringUtils.isBlank(dbname)) {
185            dbname = DB_DEFAULT;
186        }
187        DB db = mongoClient.getDB(dbname);
188        return db.getCollection(collection);
189    }
190
191    // used also by unit tests
192    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
193        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
194    }
195
196    // used also by unit tests
197    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
198        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
199    }
200
201    protected Object valueToBson(Object value) {
202        if (value instanceof State) {
203            return stateToBson((State) value);
204        } else if (value instanceof List) {
205            @SuppressWarnings("unchecked")
206            List<Object> values = (List<Object>) value;
207            return listToBson(values);
208        } else if (value instanceof Object[]) {
209            return listToBson(Arrays.asList((Object[]) value));
210        } else {
211            return serializableToBson(value);
212        }
213    }
214
215    protected DBObject stateToBson(State state) {
216        DBObject ob = new BasicDBObject();
217        for (Entry<String, Serializable> en : state.entrySet()) {
218            Object val = valueToBson(en.getValue());
219            if (val != null) {
220                ob.put(en.getKey(), val);
221            }
222        }
223        return ob;
224    }
225
226    protected List<Object> listToBson(List<Object> values) {
227        ArrayList<Object> objects = new ArrayList<Object>(values.size());
228        for (Object value : values) {
229            objects.add(valueToBson(value));
230        }
231        return objects;
232    }
233
234    protected State bsonToState(DBObject ob) {
235        if (ob == null) {
236            return null;
237        }
238        State state = new State(ob.keySet().size());
239        for (String key : ob.keySet()) {
240            Object val = ob.get(key);
241            Serializable value;
242            if (val instanceof List) {
243                @SuppressWarnings("unchecked")
244                List<Object> list = (List<Object>) val;
245                if (list.isEmpty()) {
246                    value = null;
247                } else {
248                    if (list.get(0) instanceof DBObject) {
249                        List<Serializable> l = new ArrayList<>(list.size());
250                        for (Object el : list) {
251                            l.add(bsonToState((DBObject) el));
252                        }
253                        value = (Serializable) l;
254                    } else {
255                        // turn the list into a properly-typed array
256                        Class<?> klass = Object.class;
257                        for (Object o : list) {
258                            if (o != null) {
259                                klass = scalarToSerializableClass(o.getClass());
260                                break;
261                            }
262                        }
263                        Object[] ar = (Object[]) Array.newInstance(klass, list.size());
264                        int i = 0;
265                        for (Object el : list) {
266                            ar[i++] = scalarToSerializable(el);
267                        }
268                        value = ar;
269                    }
270                }
271            } else if (val instanceof DBObject) {
272                value = bsonToState((DBObject) val);
273            } else {
274                if (MONGODB_ID.equals(key)) {
275                    // skip ObjectId
276                    continue;
277                }
278                value = scalarToSerializable(val);
279            }
280            state.put(key, value);
281        }
282        return state;
283    }
284
285    public static class Updates {
286        public BasicDBObject set = new BasicDBObject();
287
288        public BasicDBObject unset = new BasicDBObject();
289
290        public BasicDBObject push = new BasicDBObject();
291
292        public BasicDBObject inc = new BasicDBObject();
293    }
294
295    /**
296     * Constructs a list of MongoDB updates from the given {@link StateDiff}.
297     * <p>
298     * We need a list because some cases need two operations to avoid conflicts.
299     */
300    protected List<DBObject> diffToBson(StateDiff diff) {
301        Updates updates = new Updates();
302        diffToUpdates(diff, null, updates);
303        UpdateListBuilder builder = new UpdateListBuilder();
304        for (Entry<String, Object> en : updates.set.entrySet()) {
305            builder.update(MONGODB_SET, en.getKey(), en.getValue());
306        }
307        for (Entry<String, Object> en : updates.unset.entrySet()) {
308            builder.update(MONGODB_UNSET, en.getKey(), en.getValue());
309        }
310        for (Entry<String, Object> en : updates.push.entrySet()) {
311            builder.update(MONGODB_PUSH, en.getKey(), en.getValue());
312        }
313        for (Entry<String, Object> en : updates.inc.entrySet()) {
314            builder.update(MONGODB_INC, en.getKey(), en.getValue());
315        }
316        return builder.updateList;
317    }
318
319    /**
320     * Update list builder to prevent several updates of the same field.
321     * <p>
322     * This happens if two operations act on two fields where one is a prefix of the other.
323     * <p>
324     * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837)
325     *
326     * @since 5.9.5
327     */
328    protected static class UpdateListBuilder {
329
330        protected List<DBObject> updateList = new ArrayList<>(1);
331
332        protected DBObject update;
333
334        protected List<String> keys;
335
336        protected UpdateListBuilder() {
337            newUpdate();
338        }
339
340        protected void newUpdate() {
341            updateList.add(update = new BasicDBObject());
342            keys = new ArrayList<>();
343        }
344
345        protected void update(String op, String key, Object value) {
346            if (conflicts(key, keys)) {
347                newUpdate();
348            }
349            keys.add(key);
350            DBObject map = (DBObject) update.get(op);
351            if (map == null) {
352                update.put(op, map = new BasicDBObject());
353            }
354            map.put(key, value);
355        }
356
357        /**
358         * Checks if the key conflicts with one of the previous keys.
359         * <p>
360         * A conflict occurs if one key is equals to or is a prefix of the other.
361         */
362        protected boolean conflicts(String key, List<String> previousKeys) {
363            String keydot = key + '.';
364            for (String prev : previousKeys) {
365                if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) {
366                    return true;
367                }
368            }
369            return false;
370        }
371    }
372
373    protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) {
374        String elemPrefix = prefix == null ? "" : prefix + '.';
375        for (Entry<String, Serializable> en : diff.entrySet()) {
376            String name = elemPrefix + en.getKey();
377            Serializable value = en.getValue();
378            if (value instanceof StateDiff) {
379                diffToUpdates((StateDiff) value, name, updates);
380            } else if (value instanceof ListDiff) {
381                diffToUpdates((ListDiff) value, name, updates);
382            } else if (value instanceof Delta) {
383                diffToUpdates((Delta) value, name, updates);
384            } else {
385                // not a diff
386                updates.set.put(name, valueToBson(value));
387            }
388        }
389    }
390
391    protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) {
392        if (listDiff.diff != null) {
393            String elemPrefix = prefix == null ? "" : prefix + '.';
394            int i = 0;
395            for (Object value : listDiff.diff) {
396                String name = elemPrefix + i;
397                if (value instanceof StateDiff) {
398                    diffToUpdates((StateDiff) value, name, updates);
399                } else if (value != NOP) {
400                    // set value
401                    updates.set.put(name, valueToBson(value));
402                }
403                i++;
404            }
405        }
406        if (listDiff.rpush != null) {
407            Object pushed;
408            if (listDiff.rpush.size() == 1) {
409                // no need to use $each for one element
410                pushed = valueToBson(listDiff.rpush.get(0));
411            } else {
412                pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush));
413            }
414            updates.push.put(prefix, pushed);
415        }
416    }
417
418    protected void diffToUpdates(Delta delta, String prefix, Updates updates) {
419        Object inc = valueToBson(delta.getDeltaValue());
420        updates.inc.put(prefix, inc);
421    }
422
423    protected Object serializableToBson(Object value) {
424        if (value instanceof Calendar) {
425            return ((Calendar) value).getTime();
426        }
427        return value;
428    }
429
430    protected Serializable scalarToSerializable(Object val) {
431        if (val instanceof Date) {
432            Calendar cal = Calendar.getInstance();
433            cal.setTime((Date) val);
434            return cal;
435        }
436        return (Serializable) val;
437    }
438
439    protected Class<?> scalarToSerializableClass(Class<?> klass) {
440        if (Date.class.isAssignableFrom(klass)) {
441            return Calendar.class;
442        }
443        return klass;
444    }
445
446    protected void initRepository() {
447        // create required indexes
448        // code does explicit queries on those
449        coll.createIndex(new BasicDBObject(KEY_ID, ONE));
450        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
451        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
452        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
453        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
454        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
455        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
456        DBObject parentChild = new BasicDBObject();
457        parentChild.put(KEY_PARENT_ID, ONE);
458        parentChild.put(KEY_NAME, ONE);
459        coll.createIndex(parentChild);
460        // often used in user-generated queries
461        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
462        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
463        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
464        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
465        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
466        // TODO configure these from somewhere else
467        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
468        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
469        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
470        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
471        if (!isFulltextDisabled()) {
472            DBObject indexKeys = new BasicDBObject();
473            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
474            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
475            DBObject indexOptions = new BasicDBObject();
476            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
477            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
478            coll.createIndex(indexKeys, indexOptions);
479        }
480        // check root presence
481        DBObject query = new BasicDBObject(KEY_ID, getRootId());
482        if (coll.findOne(query, justPresenceField()) != null) {
483            return;
484        }
485        // create basic repository structure needed
486        if (DEBUG_UUIDS) {
487            // create the id counter
488            DBObject idCounter = new BasicDBObject();
489            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
490            idCounter.put(COUNTER_FIELD, ZERO);
491            countersColl.insert(idCounter);
492        }
493        initRoot();
494    }
495
496    protected Long getNextUuidSeq() {
497        DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
498        DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE));
499        boolean returnNew = true;
500        DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false);
501        if (idCounter == null) {
502            throw new RuntimeException("Repository id counter not initialized");
503        }
504        return (Long) idCounter.get(COUNTER_FIELD);
505    }
506
507    @Override
508    public String generateNewId() {
509        if (DEBUG_UUIDS) {
510            Long id = getNextUuidSeq();
511            return "UUID_" + id;
512        } else {
513            return UUID.randomUUID().toString();
514        }
515    }
516
517    @Override
518    public void createState(State state) {
519        DBObject ob = stateToBson(state);
520        if (log.isTraceEnabled()) {
521            log.trace("MongoDB: CREATE " + ob.get(KEY_ID) + ": " + ob);
522        }
523        coll.insert(ob);
524        // TODO dupe exception
525        // throw new DocumentException("Already exists: " + id);
526    }
527
528    @Override
529    public State readState(String id) {
530        DBObject query = new BasicDBObject(KEY_ID, id);
531        return findOne(query);
532    }
533
534    @Override
535    public List<State> readStates(List<String> ids) {
536        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
537        return findAll(query, ids.size());
538    }
539
540    @Override
541    public void updateState(String id, StateDiff diff) {
542        DBObject query = new BasicDBObject(KEY_ID, id);
543        for (DBObject update : diffToBson(diff)) {
544            if (log.isTraceEnabled()) {
545                log.trace("MongoDB: UPDATE " + id + ": " + update);
546            }
547            coll.update(query, update);
548            // TODO dupe exception
549            // throw new DocumentException("Missing: " + id);
550        }
551    }
552
553    @Override
554    public void deleteStates(Set<String> ids) {
555        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
556        if (log.isTraceEnabled()) {
557            log.trace("MongoDB: REMOVE " + ids);
558        }
559        WriteResult w = coll.remove(query);
560        if (w.getN() != ids.size()) {
561            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
562        }
563    }
564
565    @Override
566    public State readChildState(String parentId, String name, Set<String> ignored) {
567        DBObject query = getChildQuery(parentId, name, ignored);
568        return findOne(query);
569    }
570
571    protected void logQuery(String id, DBObject fields) {
572        logQuery(new BasicDBObject(KEY_ID, id), fields);
573    }
574
575    protected void logQuery(DBObject query, DBObject fields) {
576        if (fields == null) {
577            log.trace("MongoDB: QUERY " + query);
578        } else {
579            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
580        }
581    }
582
583    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
584        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
585                + " OFFSET " + offset + " LIMIT " + limit);
586    }
587
588    @Override
589    public boolean hasChild(String parentId, String name, Set<String> ignored) {
590        DBObject query = getChildQuery(parentId, name, ignored);
591        if (log.isTraceEnabled()) {
592            logQuery(query, justPresenceField());
593        }
594        return coll.findOne(query, justPresenceField()) != null;
595    }
596
597    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
598        DBObject query = new BasicDBObject();
599        query.put(KEY_PARENT_ID, parentId);
600        query.put(KEY_NAME, name);
601        addIgnoredIds(query, ignored);
602        return query;
603    }
604
605    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
606        if (!ignored.isEmpty()) {
607            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
608            query.put(KEY_ID, notInIds);
609        }
610    }
611
612    @Override
613    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
614        DBObject query = new BasicDBObject(key, value);
615        addIgnoredIds(query, ignored);
616        return findAll(query, 0);
617    }
618
619    @Override
620    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
621        DBObject query = new BasicDBObject(key1, value1);
622        query.put(key2, value2);
623        addIgnoredIds(query, ignored);
624        return findAll(query, 0);
625    }
626
627    @Override
628    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
629            Map<String, Object[]> targetProxies) {
630        DBObject query = new BasicDBObject(key, value);
631        DBObject fields = new BasicDBObject();
632        fields.put(MONGODB_ID, ZERO);
633        fields.put(KEY_ID, ONE);
634        fields.put(KEY_IS_PROXY, ONE);
635        fields.put(KEY_PROXY_TARGET_ID, ONE);
636        fields.put(KEY_PROXY_IDS, ONE);
637        if (log.isTraceEnabled()) {
638            logQuery(query, fields);
639        }
640        DBCursor cursor = coll.find(query, fields);
641        try {
642            for (DBObject ob : cursor) {
643                String id = (String) ob.get(KEY_ID);
644                ids.add(id);
645                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
646                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
647                    proxyTargets.put(id, targetId);
648                }
649                if (targetProxies != null) {
650                    Object[] proxyIds = (Object[]) ob.get(KEY_PROXY_IDS);
651                    if (proxyIds != null) {
652                        targetProxies.put(id, proxyIds);
653                    }
654                }
655            }
656        } finally {
657            cursor.close();
658        }
659    }
660
661    @Override
662    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
663        DBObject query = new BasicDBObject(key, value);
664        addIgnoredIds(query, ignored);
665        if (log.isTraceEnabled()) {
666            logQuery(query, justPresenceField());
667        }
668        return coll.findOne(query, justPresenceField()) != null;
669    }
670
671    protected State findOne(DBObject query) {
672        if (log.isTraceEnabled()) {
673            logQuery(query, null);
674        }
675        return bsonToState(coll.findOne(query));
676    }
677
678    protected List<State> findAll(DBObject query, int sizeHint) {
679        if (log.isTraceEnabled()) {
680            logQuery(query, null);
681        }
682        DBCursor cursor = coll.find(query);
683        Set<String> seen = new HashSet<>();
684        try {
685            List<State> list = new ArrayList<>(sizeHint);
686            for (DBObject ob : cursor) {
687                if (!seen.add((String) ob.get(KEY_ID))) {
688                    // MongoDB cursors may return the same
689                    // object several times
690                    continue;
691                }
692                list.add(bsonToState(ob));
693            }
694            return list;
695        } finally {
696            cursor.close();
697        }
698    }
699
700    protected DBObject justPresenceField() {
701        return new BasicDBObject(MONGODB_ID, ONE);
702    }
703
704    @Override
705    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
706            OrderByClause orderByClause, int limit, int offset, int countUpTo) {
707        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
708        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(evaluator.getExpression(), evaluator.getSelectClause(),
709                orderByClause, evaluator.pathResolver);
710        builder.walk();
711        if (builder.hasFulltext && isFulltextDisabled()) {
712            throw new QueryParseException("Fulltext search disabled by configuration");
713        }
714        DBObject query = builder.getQuery();
715        addPrincipals(query, evaluator.principals);
716        DBObject orderBy = builder.getOrderBy();
717        DBObject keys = builder.getProjection();
718        boolean projectionHasWildcard = builder.hasProjectionWildcard();
719        if (projectionHasWildcard) {
720            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
721            // so we need the full state from the database
722            keys = new BasicDBObject();
723            evaluator.parse();
724        }
725
726        if (log.isTraceEnabled()) {
727            logQuery(query, keys, orderBy, limit, offset);
728        }
729
730        List<Map<String, Serializable>> projections;
731        long totalSize;
732        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
733        try {
734            if (orderBy != null) {
735                cursor.sort(orderBy);
736            }
737            projections = new ArrayList<>();
738            for (DBObject ob : cursor) {
739                State state = bsonToState(ob);
740                if (projectionHasWildcard) {
741                    projections.addAll(evaluator.matches(state));
742                } else {
743                    projections.add(flatten(state));
744                }
745            }
746            if (countUpTo == -1) {
747                // count full size
748                if (limit == 0) {
749                    totalSize = projections.size();
750                } else {
751                    totalSize = cursor.count();
752                }
753            } else if (countUpTo == 0) {
754                // no count
755                totalSize = -1; // not counted
756            } else {
757                // count only if less than countUpTo
758                if (limit == 0) {
759                    totalSize = projections.size();
760                } else {
761                    totalSize = cursor.copy().limit(countUpTo + 1).count();
762                }
763                if (totalSize > countUpTo) {
764                    totalSize = -2; // truncated
765                }
766            }
767        } finally {
768            cursor.close();
769        }
770        if (log.isTraceEnabled() && projections.size() != 0) {
771            log.trace("MongoDB:    -> " + projections.size());
772        }
773        return new PartialList<>(projections, totalSize);
774    }
775
776    protected void addPrincipals(DBObject query, Set<String> principals) {
777        if (principals != null) {
778            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
779            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
780        }
781    }
782
783    /**
784     * Flatten and convert from internal names to NXQL.
785     */
786    protected Map<String, Serializable> flatten(State state) {
787        Map<String, Serializable> map = new HashMap<>();
788        flatten(map, state, null);
789        return map;
790    }
791
792    protected void flatten(Map<String, Serializable> map, State state, String prefix) {
793        for (Entry<String, Serializable> en : state.entrySet()) {
794            String key = en.getKey();
795            Serializable value = en.getValue();
796            String name;
797            if (key.startsWith(KEY_PREFIX)) {
798                name = DBSSession.convToNXQL(key);
799                if (name == null) {
800                    // present in state but not returned to caller
801                    continue;
802                }
803            } else {
804                name = key;
805            }
806            name = prefix == null ? name : prefix + name;
807            if (value instanceof State) {
808                flatten(map, (State) value, name + '/');
809            } else if (value instanceof List) {
810                String nameSlash = name + '/';
811                int i = 0;
812                for (Object v : (List<?>) value) {
813                    if (v instanceof State) {
814                        flatten(map, (State) v, nameSlash + i + '/');
815                    } else {
816                        map.put(nameSlash + i, (Serializable) v);
817                    }
818                    i++;
819                }
820            } else if (value instanceof Object[]) {
821                String nameSlash = name + '/';
822                int i = 0;
823                for (Object v : (Object[]) value) {
824                    map.put(nameSlash + i, (Serializable) v);
825                    i++;
826                }
827            } else {
828                map.put(name, value);
829            }
830        }
831    }
832
833    /** Keys used for document projection when marking all binaries for GC. */
834    protected DBObject binaryKeys;
835
836    @Override
837    protected void initBlobsPaths() {
838        MongoDBBlobFinder finder = new MongoDBBlobFinder();
839        finder.visit();
840        binaryKeys = finder.binaryKeys;
841    }
842
843    protected static class MongoDBBlobFinder extends BlobFinder {
844        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
845
846        @Override
847        protected void recordBlobPath() {
848            path.addLast(KEY_BLOB_DATA);
849            binaryKeys.put(StringUtils.join(path, "."), ONE);
850            path.removeLast();
851        }
852    }
853
854    @Override
855    public void markReferencedBinaries() {
856        BlobManager blobManager = Framework.getService(BlobManager.class);
857        // TODO add a query to not scan all documents
858        if (log.isTraceEnabled()) {
859            logQuery(new BasicDBObject(), binaryKeys);
860        }
861        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
862        try {
863            for (DBObject ob : cursor) {
864                markReferencedBinaries(ob, blobManager);
865            }
866        } finally {
867            cursor.close();
868        }
869    }
870
871    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
872        for (String key : ob.keySet()) {
873            Object value = ob.get(key);
874            if (value instanceof List) {
875                @SuppressWarnings("unchecked")
876                List<Object> list = (List<Object>) value;
877                for (Object v : list) {
878                    if (v instanceof DBObject) {
879                        markReferencedBinaries((DBObject) v, blobManager);
880                    } else {
881                        markReferencedBinary(v, blobManager);
882                    }
883                }
884            } else if (value instanceof Object[]) {
885                for (Object v : (Object[]) value) {
886                    markReferencedBinary(v, blobManager);
887                }
888            } else if (value instanceof DBObject) {
889                markReferencedBinaries((DBObject) value, blobManager);
890            } else {
891                markReferencedBinary(value, blobManager);
892            }
893        }
894    }
895
896    protected void markReferencedBinary(Object value, BlobManager blobManager) {
897        if (!(value instanceof String)) {
898            return;
899        }
900        String key = (String) value;
901        blobManager.markReferencedBinary(key, repositoryName);
902    }
903
904    protected static final DBObject LOCK_FIELDS;
905
906    static {
907        LOCK_FIELDS = new BasicDBObject();
908        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
909        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
910    }
911
912    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
913
914    @Override
915    public Lock getLock(String id) {
916        if (log.isTraceEnabled()) {
917            logQuery(id, LOCK_FIELDS);
918        }
919        DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
920        if (res == null) {
921            // document not found
922            throw new DocumentNotFoundException(id);
923        }
924        String owner = (String) res.get(KEY_LOCK_OWNER);
925        if (owner == null) {
926            // not locked
927            return null;
928        }
929        Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED));
930        return new Lock(owner, created);
931    }
932
933    @Override
934    public Lock setLock(String id, Lock lock) {
935        DBObject query = new BasicDBObject(KEY_ID, id);
936        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
937        DBObject setLock = new BasicDBObject();
938        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
939        setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated()));
940        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
941        if (log.isTraceEnabled()) {
942            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
943        }
944        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
945        if (res != null) {
946            // found a doc to lock
947            return null;
948        } else {
949            // doc not found, or lock owner already set
950            // get the old lock
951            if (log.isTraceEnabled()) {
952                logQuery(id, LOCK_FIELDS);
953            }
954            DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
955            if (old == null) {
956                // document not found
957                throw new DocumentNotFoundException(id);
958            }
959            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
960            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
961            if (oldOwner != null) {
962                return new Lock(oldOwner, oldCreated);
963            }
964            // no lock -- there was a race condition
965            // TODO do better
966            throw new ConcurrentUpdateException("Lock " + id);
967        }
968    }
969
970    @Override
971    public Lock removeLock(String id, String owner) {
972        DBObject query = new BasicDBObject(KEY_ID, id);
973        if (owner != null) {
974            // remove if owner matches or null
975            // implements LockManager.canLockBeRemoved inside MongoDB
976            Object ownerOrNull = Arrays.asList(owner, null);
977            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
978        } // else unconditional remove
979        // remove the lock
980        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
981        if (old != null) {
982            // found a doc and removed the lock, return previous lock
983            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
984            if (oldOwner == null) {
985                // was not locked
986                return null;
987            } else {
988                // return previous lock
989                Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
990                return new Lock(oldOwner, oldCreated);
991            }
992        } else {
993            // doc not found, or lock owner didn't match
994            // get the old lock
995            if (log.isTraceEnabled()) {
996                logQuery(id, LOCK_FIELDS);
997            }
998            old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
999            if (old == null) {
1000                // document not found
1001                throw new DocumentNotFoundException(id);
1002            }
1003            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1004            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1005            if (oldOwner != null) {
1006                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
1007                    // existing mismatched lock, flag failure
1008                    return new Lock(oldOwner, oldCreated, true);
1009                }
1010                // old owner should have matched -- there was a race condition
1011                // TODO do better
1012                throw new ConcurrentUpdateException("Unlock " + id);
1013            }
1014            // old owner null, should have matched -- there was a race condition
1015            // TODO do better
1016            throw new ConcurrentUpdateException("Unlock " + id);
1017        }
1018    }
1019
1020    @Override
1021    public void closeLockManager() {
1022
1023    }
1024
1025    @Override
1026    public void clearLockManagerCaches() {
1027    }
1028
1029}