001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PREFIX;
040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
045import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
046
047import java.io.Serializable;
048import java.lang.reflect.Array;
049import java.net.UnknownHostException;
050import java.util.ArrayList;
051import java.util.Arrays;
052import java.util.Calendar;
053import java.util.Date;
054import java.util.HashMap;
055import java.util.HashSet;
056import java.util.List;
057import java.util.Map;
058import java.util.Map.Entry;
059import java.util.Set;
060import java.util.UUID;
061
062import javax.resource.spi.ConnectionManager;
063
064import org.apache.commons.lang.StringUtils;
065import org.apache.commons.logging.Log;
066import org.apache.commons.logging.LogFactory;
067import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
068import org.nuxeo.ecm.core.api.DocumentNotFoundException;
069import org.nuxeo.ecm.core.api.Lock;
070import org.nuxeo.ecm.core.api.NuxeoException;
071import org.nuxeo.ecm.core.api.PartialList;
072import org.nuxeo.ecm.core.api.model.Delta;
073import org.nuxeo.ecm.core.blob.BlobManager;
074import org.nuxeo.ecm.core.model.LockManager;
075import org.nuxeo.ecm.core.model.Repository;
076import org.nuxeo.ecm.core.query.QueryParseException;
077import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
078import org.nuxeo.ecm.core.storage.State;
079import org.nuxeo.ecm.core.storage.State.ListDiff;
080import org.nuxeo.ecm.core.storage.State.StateDiff;
081import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
082import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
083import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
084import org.nuxeo.ecm.core.storage.dbs.DBSSession;
085import org.nuxeo.runtime.api.Framework;
086
087import com.mongodb.BasicDBObject;
088import com.mongodb.DB;
089import com.mongodb.DBCollection;
090import com.mongodb.DBCursor;
091import com.mongodb.DBObject;
092import com.mongodb.MongoClient;
093import com.mongodb.MongoClientURI;
094import com.mongodb.QueryOperators;
095import com.mongodb.ServerAddress;
096import com.mongodb.WriteResult;
097
098/**
099 * MongoDB implementation of a {@link Repository}.
100 *
101 * @since 5.9.4
102 */
103public class MongoDBRepository extends DBSRepositoryBase {
104
105    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
106
107    private static final Long ZERO = Long.valueOf(0);
108
109    private static final Long ONE = Long.valueOf(1);
110
111    private static final Long MINUS_ONE = Long.valueOf(-11);
112
113    public static final String DB_DEFAULT = "nuxeo";
114
115    public static final String MONGODB_ID = "_id";
116
117    public static final String MONGODB_INC = "$inc";
118
119    public static final String MONGODB_SET = "$set";
120
121    public static final String MONGODB_UNSET = "$unset";
122
123    public static final String MONGODB_PUSH = "$push";
124
125    public static final String MONGODB_EACH = "$each";
126
127    public static final String MONGODB_META = "$meta";
128
129    public static final String MONGODB_TEXT_SCORE = "textScore";
130
131    private static final String MONGODB_INDEX_TEXT = "text";
132
133    private static final String MONGODB_INDEX_NAME = "name";
134
135    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
136
137    private static final String FULLTEXT_INDEX_NAME = "fulltext";
138
139    private static final String LANGUAGE_FIELD = "__language";
140
141    protected static final String COUNTER_NAME_UUID = "ecm:id";
142
143    protected static final String COUNTER_FIELD = "seq";
144
145    protected MongoClient mongoClient;
146
147    protected DBCollection coll;
148
149    protected DBCollection countersColl;
150
151    public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) {
152        super(cm, descriptor.name, descriptor.getFulltextDescriptor());
153        try {
154            mongoClient = newMongoClient(descriptor);
155            coll = getCollection(descriptor, mongoClient);
156            countersColl = getCountersCollection(descriptor, mongoClient);
157        } catch (UnknownHostException e) {
158            throw new RuntimeException(e);
159        }
160        initRepository();
161    }
162
163    @Override
164    public void shutdown() {
165        super.shutdown();
166        mongoClient.close();
167    }
168
169    // used also by unit tests
170    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
171        String server = descriptor.server;
172        if (StringUtils.isBlank(server)) {
173            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
174        }
175        if (server.startsWith("mongodb://")) {
176            // allow mongodb:// URI syntax for the server, to pass everything in one string
177            return new MongoClient(new MongoClientURI(server));
178        } else {
179            return new MongoClient(new ServerAddress(server));
180        }
181    }
182
183    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
184        if (StringUtils.isBlank(dbname)) {
185            dbname = DB_DEFAULT;
186        }
187        DB db = mongoClient.getDB(dbname);
188        return db.getCollection(collection);
189    }
190
191    // used also by unit tests
192    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
193        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
194    }
195
196    // used also by unit tests
197    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
198        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
199    }
200
201    protected Object valueToBson(Object value) {
202        if (value instanceof State) {
203            return stateToBson((State) value);
204        } else if (value instanceof List) {
205            @SuppressWarnings("unchecked")
206            List<Object> values = (List<Object>) value;
207            return listToBson(values);
208        } else if (value instanceof Object[]) {
209            return listToBson(Arrays.asList((Object[]) value));
210        } else {
211            return serializableToBson(value);
212        }
213    }
214
215    protected DBObject stateToBson(State state) {
216        DBObject ob = new BasicDBObject();
217        for (Entry<String, Serializable> en : state.entrySet()) {
218            Object val = valueToBson(en.getValue());
219            if (val != null) {
220                ob.put(en.getKey(), val);
221            }
222        }
223        return ob;
224    }
225
226    protected List<Object> listToBson(List<Object> values) {
227        ArrayList<Object> objects = new ArrayList<Object>(values.size());
228        for (Object value : values) {
229            objects.add(valueToBson(value));
230        }
231        return objects;
232    }
233
234    protected State bsonToState(DBObject ob) {
235        if (ob == null) {
236            return null;
237        }
238        State state = new State(ob.keySet().size());
239        for (String key : ob.keySet()) {
240            if (MONGODB_ID.equals(key)) {
241                // skip ObjectId
242                continue;
243            }
244            state.put(key, bsonToValue(ob.get(key)));
245        }
246        return state;
247    }
248
249    protected Serializable bsonToValue(Object value) {
250        if (value instanceof List) {
251            @SuppressWarnings("unchecked")
252            List<Object> list = (List<Object>) value;
253            if (list.isEmpty()) {
254                return null;
255            } else {
256                if (list.get(0) instanceof DBObject) {
257                    List<Serializable> l = new ArrayList<>(list.size());
258                    for (Object el : list) {
259                        l.add(bsonToState((DBObject) el));
260                    }
261                    return (Serializable) l;
262                } else {
263                    // turn the list into a properly-typed array
264                    Class<?> klass = Object.class;
265                    for (Object o : list) {
266                        if (o != null) {
267                            klass = scalarToSerializableClass(o.getClass());
268                            break;
269                        }
270                    }
271                    Object[] ar = (Object[]) Array.newInstance(klass, list.size());
272                    int i = 0;
273                    for (Object el : list) {
274                        ar[i++] = scalarToSerializable(el);
275                    }
276                    return ar;
277                }
278            }
279        } else if (value instanceof DBObject) {
280            return bsonToState((DBObject) value);
281        } else {
282            return scalarToSerializable(value);
283        }
284    }
285
286    public static class Updates {
287        public BasicDBObject set = new BasicDBObject();
288
289        public BasicDBObject unset = new BasicDBObject();
290
291        public BasicDBObject push = new BasicDBObject();
292
293        public BasicDBObject inc = new BasicDBObject();
294    }
295
296    /**
297     * Constructs a list of MongoDB updates from the given {@link StateDiff}.
298     * <p>
299     * We need a list because some cases need two operations to avoid conflicts.
300     */
301    protected List<DBObject> diffToBson(StateDiff diff) {
302        Updates updates = new Updates();
303        diffToUpdates(diff, null, updates);
304        UpdateListBuilder builder = new UpdateListBuilder();
305        for (Entry<String, Object> en : updates.set.entrySet()) {
306            builder.update(MONGODB_SET, en.getKey(), en.getValue());
307        }
308        for (Entry<String, Object> en : updates.unset.entrySet()) {
309            builder.update(MONGODB_UNSET, en.getKey(), en.getValue());
310        }
311        for (Entry<String, Object> en : updates.push.entrySet()) {
312            builder.update(MONGODB_PUSH, en.getKey(), en.getValue());
313        }
314        for (Entry<String, Object> en : updates.inc.entrySet()) {
315            builder.update(MONGODB_INC, en.getKey(), en.getValue());
316        }
317        return builder.updateList;
318    }
319
320    /**
321     * Update list builder to prevent several updates of the same field.
322     * <p>
323     * This happens if two operations act on two fields where one is a prefix of the other.
324     * <p>
325     * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837)
326     *
327     * @since 5.9.5
328     */
329    protected static class UpdateListBuilder {
330
331        protected List<DBObject> updateList = new ArrayList<>(1);
332
333        protected DBObject update;
334
335        protected List<String> keys;
336
337        protected UpdateListBuilder() {
338            newUpdate();
339        }
340
341        protected void newUpdate() {
342            updateList.add(update = new BasicDBObject());
343            keys = new ArrayList<>();
344        }
345
346        protected void update(String op, String key, Object value) {
347            if (conflicts(key, keys)) {
348                newUpdate();
349            }
350            keys.add(key);
351            DBObject map = (DBObject) update.get(op);
352            if (map == null) {
353                update.put(op, map = new BasicDBObject());
354            }
355            map.put(key, value);
356        }
357
358        /**
359         * Checks if the key conflicts with one of the previous keys.
360         * <p>
361         * A conflict occurs if one key is equals to or is a prefix of the other.
362         */
363        protected boolean conflicts(String key, List<String> previousKeys) {
364            String keydot = key + '.';
365            for (String prev : previousKeys) {
366                if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) {
367                    return true;
368                }
369            }
370            return false;
371        }
372    }
373
374    protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) {
375        String elemPrefix = prefix == null ? "" : prefix + '.';
376        for (Entry<String, Serializable> en : diff.entrySet()) {
377            String name = elemPrefix + en.getKey();
378            Serializable value = en.getValue();
379            if (value instanceof StateDiff) {
380                diffToUpdates((StateDiff) value, name, updates);
381            } else if (value instanceof ListDiff) {
382                diffToUpdates((ListDiff) value, name, updates);
383            } else if (value instanceof Delta) {
384                diffToUpdates((Delta) value, name, updates);
385            } else {
386                // not a diff
387                updates.set.put(name, valueToBson(value));
388            }
389        }
390    }
391
392    protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) {
393        if (listDiff.diff != null) {
394            String elemPrefix = prefix == null ? "" : prefix + '.';
395            int i = 0;
396            for (Object value : listDiff.diff) {
397                String name = elemPrefix + i;
398                if (value instanceof StateDiff) {
399                    diffToUpdates((StateDiff) value, name, updates);
400                } else if (value != NOP) {
401                    // set value
402                    updates.set.put(name, valueToBson(value));
403                }
404                i++;
405            }
406        }
407        if (listDiff.rpush != null) {
408            Object pushed;
409            if (listDiff.rpush.size() == 1) {
410                // no need to use $each for one element
411                pushed = valueToBson(listDiff.rpush.get(0));
412            } else {
413                pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush));
414            }
415            updates.push.put(prefix, pushed);
416        }
417    }
418
419    protected void diffToUpdates(Delta delta, String prefix, Updates updates) {
420        Object inc = valueToBson(delta.getDeltaValue());
421        updates.inc.put(prefix, inc);
422    }
423
424    protected Object serializableToBson(Object value) {
425        if (value instanceof Calendar) {
426            return ((Calendar) value).getTime();
427        }
428        return value;
429    }
430
431    protected Serializable scalarToSerializable(Object val) {
432        if (val instanceof Date) {
433            Calendar cal = Calendar.getInstance();
434            cal.setTime((Date) val);
435            return cal;
436        }
437        return (Serializable) val;
438    }
439
440    protected Class<?> scalarToSerializableClass(Class<?> klass) {
441        if (Date.class.isAssignableFrom(klass)) {
442            return Calendar.class;
443        }
444        return klass;
445    }
446
447    protected void initRepository() {
448        // create required indexes
449        // code does explicit queries on those
450        coll.createIndex(new BasicDBObject(KEY_ID, ONE));
451        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
452        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
453        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
454        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
455        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
456        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
457        DBObject parentChild = new BasicDBObject();
458        parentChild.put(KEY_PARENT_ID, ONE);
459        parentChild.put(KEY_NAME, ONE);
460        coll.createIndex(parentChild);
461        // often used in user-generated queries
462        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
463        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
464        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
465        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
466        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
467        // TODO configure these from somewhere else
468        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
469        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
470        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
471        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
472        if (!isFulltextDisabled()) {
473            DBObject indexKeys = new BasicDBObject();
474            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
475            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
476            DBObject indexOptions = new BasicDBObject();
477            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
478            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
479            coll.createIndex(indexKeys, indexOptions);
480        }
481        // check root presence
482        DBObject query = new BasicDBObject(KEY_ID, getRootId());
483        if (coll.findOne(query, justPresenceField()) != null) {
484            return;
485        }
486        // create basic repository structure needed
487        if (DEBUG_UUIDS) {
488            // create the id counter
489            DBObject idCounter = new BasicDBObject();
490            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
491            idCounter.put(COUNTER_FIELD, ZERO);
492            countersColl.insert(idCounter);
493        }
494        initRoot();
495    }
496
497    protected Long getNextUuidSeq() {
498        DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
499        DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE));
500        boolean returnNew = true;
501        DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false);
502        if (idCounter == null) {
503            throw new RuntimeException("Repository id counter not initialized");
504        }
505        return (Long) idCounter.get(COUNTER_FIELD);
506    }
507
508    @Override
509    public String generateNewId() {
510        if (DEBUG_UUIDS) {
511            Long id = getNextUuidSeq();
512            return "UUID_" + id;
513        } else {
514            return UUID.randomUUID().toString();
515        }
516    }
517
518    @Override
519    public void createState(State state) {
520        DBObject ob = stateToBson(state);
521        if (log.isTraceEnabled()) {
522            log.trace("MongoDB: CREATE " + ob.get(KEY_ID) + ": " + ob);
523        }
524        coll.insert(ob);
525        // TODO dupe exception
526        // throw new DocumentException("Already exists: " + id);
527    }
528
529    @Override
530    public State readState(String id) {
531        DBObject query = new BasicDBObject(KEY_ID, id);
532        return findOne(query);
533    }
534
535    @Override
536    public List<State> readStates(List<String> ids) {
537        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
538        return findAll(query, ids.size());
539    }
540
541    @Override
542    public void updateState(String id, StateDiff diff) {
543        DBObject query = new BasicDBObject(KEY_ID, id);
544        for (DBObject update : diffToBson(diff)) {
545            if (log.isTraceEnabled()) {
546                log.trace("MongoDB: UPDATE " + id + ": " + update);
547            }
548            coll.update(query, update);
549            // TODO dupe exception
550            // throw new DocumentException("Missing: " + id);
551        }
552    }
553
554    @Override
555    public void deleteStates(Set<String> ids) {
556        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
557        if (log.isTraceEnabled()) {
558            log.trace("MongoDB: REMOVE " + ids);
559        }
560        WriteResult w = coll.remove(query);
561        if (w.getN() != ids.size()) {
562            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
563        }
564    }
565
566    @Override
567    public State readChildState(String parentId, String name, Set<String> ignored) {
568        DBObject query = getChildQuery(parentId, name, ignored);
569        return findOne(query);
570    }
571
572    protected void logQuery(String id, DBObject fields) {
573        logQuery(new BasicDBObject(KEY_ID, id), fields);
574    }
575
576    protected void logQuery(DBObject query, DBObject fields) {
577        if (fields == null) {
578            log.trace("MongoDB: QUERY " + query);
579        } else {
580            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
581        }
582    }
583
584    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
585        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
586                + " OFFSET " + offset + " LIMIT " + limit);
587    }
588
589    @Override
590    public boolean hasChild(String parentId, String name, Set<String> ignored) {
591        DBObject query = getChildQuery(parentId, name, ignored);
592        if (log.isTraceEnabled()) {
593            logQuery(query, justPresenceField());
594        }
595        return coll.findOne(query, justPresenceField()) != null;
596    }
597
598    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
599        DBObject query = new BasicDBObject();
600        query.put(KEY_PARENT_ID, parentId);
601        query.put(KEY_NAME, name);
602        addIgnoredIds(query, ignored);
603        return query;
604    }
605
606    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
607        if (!ignored.isEmpty()) {
608            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
609            query.put(KEY_ID, notInIds);
610        }
611    }
612
613    @Override
614    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
615        DBObject query = new BasicDBObject(key, value);
616        addIgnoredIds(query, ignored);
617        return findAll(query, 0);
618    }
619
620    @Override
621    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
622        DBObject query = new BasicDBObject(key1, value1);
623        query.put(key2, value2);
624        addIgnoredIds(query, ignored);
625        return findAll(query, 0);
626    }
627
628    @Override
629    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
630            Map<String, Object[]> targetProxies) {
631        DBObject query = new BasicDBObject(key, value);
632        DBObject fields = new BasicDBObject();
633        fields.put(MONGODB_ID, ZERO);
634        fields.put(KEY_ID, ONE);
635        fields.put(KEY_IS_PROXY, ONE);
636        fields.put(KEY_PROXY_TARGET_ID, ONE);
637        fields.put(KEY_PROXY_IDS, ONE);
638        if (log.isTraceEnabled()) {
639            logQuery(query, fields);
640        }
641        DBCursor cursor = coll.find(query, fields);
642        try {
643            for (DBObject ob : cursor) {
644                String id = (String) ob.get(KEY_ID);
645                ids.add(id);
646                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
647                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
648                    proxyTargets.put(id, targetId);
649                }
650                if (targetProxies != null) {
651                    Object[] proxyIds = (Object[]) bsonToValue(ob.get(KEY_PROXY_IDS));
652                    if (proxyIds != null) {
653                        targetProxies.put(id, proxyIds);
654                    }
655                }
656            }
657        } finally {
658            cursor.close();
659        }
660    }
661
662    @Override
663    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
664        DBObject query = new BasicDBObject(key, value);
665        addIgnoredIds(query, ignored);
666        if (log.isTraceEnabled()) {
667            logQuery(query, justPresenceField());
668        }
669        return coll.findOne(query, justPresenceField()) != null;
670    }
671
672    protected State findOne(DBObject query) {
673        if (log.isTraceEnabled()) {
674            logQuery(query, null);
675        }
676        return bsonToState(coll.findOne(query));
677    }
678
679    protected List<State> findAll(DBObject query, int sizeHint) {
680        if (log.isTraceEnabled()) {
681            logQuery(query, null);
682        }
683        DBCursor cursor = coll.find(query);
684        Set<String> seen = new HashSet<>();
685        try {
686            List<State> list = new ArrayList<>(sizeHint);
687            for (DBObject ob : cursor) {
688                if (!seen.add((String) ob.get(KEY_ID))) {
689                    // MongoDB cursors may return the same
690                    // object several times
691                    continue;
692                }
693                list.add(bsonToState(ob));
694            }
695            return list;
696        } finally {
697            cursor.close();
698        }
699    }
700
701    protected DBObject justPresenceField() {
702        return new BasicDBObject(MONGODB_ID, ONE);
703    }
704
705    @Override
706    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
707            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
708        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
709        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(evaluator.getExpression(), evaluator.getSelectClause(),
710                orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled);
711        builder.walk();
712        if (builder.hasFulltext && isFulltextDisabled()) {
713            throw new QueryParseException("Fulltext search disabled by configuration");
714        }
715        DBObject query = builder.getQuery();
716        addPrincipals(query, evaluator.principals);
717        DBObject orderBy = builder.getOrderBy();
718        DBObject keys = builder.getProjection();
719        // Don't do manual projection if there are no projection wildcards, as this brings no new
720        // information and is costly. The only difference is several identical rows instead of one.
721        boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard();
722        if (manualProjection) {
723            // we'll do post-treatment to re-evaluate the query to get proper wildcard projections
724            // so we need the full state from the database
725            keys = new BasicDBObject();
726            evaluator.parse();
727        }
728
729        if (log.isTraceEnabled()) {
730            logQuery(query, keys, orderBy, limit, offset);
731        }
732
733        List<Map<String, Serializable>> projections;
734        long totalSize;
735        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
736        try {
737            if (orderBy != null) {
738                cursor.sort(orderBy);
739            }
740            projections = new ArrayList<>();
741            for (DBObject ob : cursor) {
742                State state = bsonToState(ob);
743                if (manualProjection) {
744                    projections.addAll(evaluator.matches(state));
745                } else {
746                    projections.add(flatten(state));
747                }
748            }
749            if (countUpTo == -1) {
750                // count full size
751                if (limit == 0) {
752                    totalSize = projections.size();
753                } else {
754                    totalSize = cursor.count();
755                }
756            } else if (countUpTo == 0) {
757                // no count
758                totalSize = -1; // not counted
759            } else {
760                // count only if less than countUpTo
761                if (limit == 0) {
762                    totalSize = projections.size();
763                } else {
764                    totalSize = cursor.copy().limit(countUpTo + 1).count();
765                }
766                if (totalSize > countUpTo) {
767                    totalSize = -2; // truncated
768                }
769            }
770        } finally {
771            cursor.close();
772        }
773        if (log.isTraceEnabled() && projections.size() != 0) {
774            log.trace("MongoDB:    -> " + projections.size());
775        }
776        return new PartialList<>(projections, totalSize);
777    }
778
779    protected void addPrincipals(DBObject query, Set<String> principals) {
780        if (principals != null) {
781            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
782            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
783        }
784    }
785
786    /**
787     * Flatten and convert from internal names to NXQL.
788     */
789    protected Map<String, Serializable> flatten(State state) {
790        Map<String, Serializable> map = new HashMap<>();
791        flatten(map, state, null);
792        return map;
793    }
794
795    protected void flatten(Map<String, Serializable> map, State state, String prefix) {
796        for (Entry<String, Serializable> en : state.entrySet()) {
797            String key = en.getKey();
798            Serializable value = en.getValue();
799            String name;
800            if (key.startsWith(KEY_PREFIX)) {
801                name = DBSSession.convToNXQL(key);
802                if (name == null) {
803                    // present in state but not returned to caller
804                    continue;
805                }
806            } else {
807                name = key;
808            }
809            name = prefix == null ? name : prefix + name;
810            if (value instanceof State) {
811                flatten(map, (State) value, name + '/');
812            } else if (value instanceof List) {
813                String nameSlash = name + '/';
814                int i = 0;
815                for (Object v : (List<?>) value) {
816                    if (v instanceof State) {
817                        flatten(map, (State) v, nameSlash + i + '/');
818                    } else {
819                        map.put(nameSlash + i, (Serializable) v);
820                    }
821                    i++;
822                }
823            } else if (value instanceof Object[]) {
824                String nameSlash = name + '/';
825                int i = 0;
826                for (Object v : (Object[]) value) {
827                    map.put(nameSlash + i, (Serializable) v);
828                    i++;
829                }
830            } else {
831                map.put(name, value);
832            }
833        }
834    }
835
836    /** Keys used for document projection when marking all binaries for GC. */
837    protected DBObject binaryKeys;
838
839    @Override
840    protected void initBlobsPaths() {
841        MongoDBBlobFinder finder = new MongoDBBlobFinder();
842        finder.visit();
843        binaryKeys = finder.binaryKeys;
844    }
845
846    protected static class MongoDBBlobFinder extends BlobFinder {
847        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
848
849        @Override
850        protected void recordBlobPath() {
851            path.addLast(KEY_BLOB_DATA);
852            binaryKeys.put(StringUtils.join(path, "."), ONE);
853            path.removeLast();
854        }
855    }
856
857    @Override
858    public void markReferencedBinaries() {
859        BlobManager blobManager = Framework.getService(BlobManager.class);
860        // TODO add a query to not scan all documents
861        if (log.isTraceEnabled()) {
862            logQuery(new BasicDBObject(), binaryKeys);
863        }
864        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
865        try {
866            for (DBObject ob : cursor) {
867                markReferencedBinaries(ob, blobManager);
868            }
869        } finally {
870            cursor.close();
871        }
872    }
873
874    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
875        for (String key : ob.keySet()) {
876            Object value = ob.get(key);
877            if (value instanceof List) {
878                @SuppressWarnings("unchecked")
879                List<Object> list = (List<Object>) value;
880                for (Object v : list) {
881                    if (v instanceof DBObject) {
882                        markReferencedBinaries((DBObject) v, blobManager);
883                    } else {
884                        markReferencedBinary(v, blobManager);
885                    }
886                }
887            } else if (value instanceof Object[]) {
888                for (Object v : (Object[]) value) {
889                    markReferencedBinary(v, blobManager);
890                }
891            } else if (value instanceof DBObject) {
892                markReferencedBinaries((DBObject) value, blobManager);
893            } else {
894                markReferencedBinary(value, blobManager);
895            }
896        }
897    }
898
899    protected void markReferencedBinary(Object value, BlobManager blobManager) {
900        if (!(value instanceof String)) {
901            return;
902        }
903        String key = (String) value;
904        blobManager.markReferencedBinary(key, repositoryName);
905    }
906
907    protected static final DBObject LOCK_FIELDS;
908
909    static {
910        LOCK_FIELDS = new BasicDBObject();
911        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
912        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
913    }
914
915    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
916
917    @Override
918    public Lock getLock(String id) {
919        if (log.isTraceEnabled()) {
920            logQuery(id, LOCK_FIELDS);
921        }
922        DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
923        if (res == null) {
924            // document not found
925            throw new DocumentNotFoundException(id);
926        }
927        String owner = (String) res.get(KEY_LOCK_OWNER);
928        if (owner == null) {
929            // not locked
930            return null;
931        }
932        Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED));
933        return new Lock(owner, created);
934    }
935
936    @Override
937    public Lock setLock(String id, Lock lock) {
938        DBObject query = new BasicDBObject(KEY_ID, id);
939        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
940        DBObject setLock = new BasicDBObject();
941        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
942        setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated()));
943        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
944        if (log.isTraceEnabled()) {
945            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
946        }
947        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
948        if (res != null) {
949            // found a doc to lock
950            return null;
951        } else {
952            // doc not found, or lock owner already set
953            // get the old lock
954            if (log.isTraceEnabled()) {
955                logQuery(id, LOCK_FIELDS);
956            }
957            DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
958            if (old == null) {
959                // document not found
960                throw new DocumentNotFoundException(id);
961            }
962            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
963            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
964            if (oldOwner != null) {
965                return new Lock(oldOwner, oldCreated);
966            }
967            // no lock -- there was a race condition
968            // TODO do better
969            throw new ConcurrentUpdateException("Lock " + id);
970        }
971    }
972
973    @Override
974    public Lock removeLock(String id, String owner) {
975        DBObject query = new BasicDBObject(KEY_ID, id);
976        if (owner != null) {
977            // remove if owner matches or null
978            // implements LockManager.canLockBeRemoved inside MongoDB
979            Object ownerOrNull = Arrays.asList(owner, null);
980            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
981        } // else unconditional remove
982        // remove the lock
983        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
984        if (old != null) {
985            // found a doc and removed the lock, return previous lock
986            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
987            if (oldOwner == null) {
988                // was not locked
989                return null;
990            } else {
991                // return previous lock
992                Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
993                return new Lock(oldOwner, oldCreated);
994            }
995        } else {
996            // doc not found, or lock owner didn't match
997            // get the old lock
998            if (log.isTraceEnabled()) {
999                logQuery(id, LOCK_FIELDS);
1000            }
1001            old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
1002            if (old == null) {
1003                // document not found
1004                throw new DocumentNotFoundException(id);
1005            }
1006            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
1007            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
1008            if (oldOwner != null) {
1009                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
1010                    // existing mismatched lock, flag failure
1011                    return new Lock(oldOwner, oldCreated, true);
1012                }
1013                // old owner should have matched -- there was a race condition
1014                // TODO do better
1015                throw new ConcurrentUpdateException("Unlock " + id);
1016            }
1017            // old owner null, should have matched -- there was a race condition
1018            // TODO do better
1019            throw new ConcurrentUpdateException("Unlock " + id);
1020        }
1021    }
1022
1023    @Override
1024    public void closeLockManager() {
1025
1026    }
1027
1028    @Override
1029    public void clearLockManagerCaches() {
1030    }
1031
1032}