001/*
002 * Copyright (c) 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.ecm.core.storage.mongodb;
013
014import static java.lang.Boolean.TRUE;
015import static org.nuxeo.ecm.core.storage.State.NOP;
016import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS;
017import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER;
018import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL;
019import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP;
020import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE;
033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID;
036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL;
037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID;
038
039import java.io.Serializable;
040import java.lang.reflect.Array;
041import java.net.UnknownHostException;
042import java.util.ArrayList;
043import java.util.Arrays;
044import java.util.Calendar;
045import java.util.Date;
046import java.util.HashSet;
047import java.util.List;
048import java.util.Map;
049import java.util.Map.Entry;
050import java.util.Set;
051import java.util.UUID;
052
053import org.apache.commons.lang.StringUtils;
054import org.apache.commons.logging.Log;
055import org.apache.commons.logging.LogFactory;
056import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
057import org.nuxeo.ecm.core.api.DocumentNotFoundException;
058import org.nuxeo.ecm.core.api.Lock;
059import org.nuxeo.ecm.core.api.NuxeoException;
060import org.nuxeo.ecm.core.api.PartialList;
061import org.nuxeo.ecm.core.api.model.Delta;
062import org.nuxeo.ecm.core.blob.BlobManager;
063import org.nuxeo.ecm.core.model.LockManager;
064import org.nuxeo.ecm.core.model.Repository;
065import org.nuxeo.ecm.core.query.sql.model.Expression;
066import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
067import org.nuxeo.ecm.core.query.sql.model.SelectClause;
068import org.nuxeo.ecm.core.storage.State;
069import org.nuxeo.ecm.core.storage.State.ListDiff;
070import org.nuxeo.ecm.core.storage.State.StateDiff;
071import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
072import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
073import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
074import org.nuxeo.runtime.api.Framework;
075
076import com.mongodb.BasicDBObject;
077import com.mongodb.DB;
078import com.mongodb.DBCollection;
079import com.mongodb.DBCursor;
080import com.mongodb.DBObject;
081import com.mongodb.MongoClient;
082import com.mongodb.MongoClientURI;
083import com.mongodb.QueryOperators;
084import com.mongodb.ServerAddress;
085import com.mongodb.WriteResult;
086
087/**
088 * MongoDB implementation of a {@link Repository}.
089 *
090 * @since 5.9.4
091 */
092public class MongoDBRepository extends DBSRepositoryBase {
093
094    private static final Log log = LogFactory.getLog(MongoDBRepository.class);
095
096    private static final Long ZERO = Long.valueOf(0);
097
098    private static final Long ONE = Long.valueOf(1);
099
100    private static final Long MINUS_ONE = Long.valueOf(-11);
101
102    public static final String DB_DEFAULT = "nuxeo";
103
104    public static final String MONGODB_ID = "_id";
105
106    public static final String MONGODB_INC = "$inc";
107
108    public static final String MONGODB_SET = "$set";
109
110    public static final String MONGODB_UNSET = "$unset";
111
112    public static final String MONGODB_PUSH = "$push";
113
114    public static final String MONGODB_EACH = "$each";
115
116    public static final String MONGODB_META = "$meta";
117
118    public static final String MONGODB_TEXT_SCORE = "textScore";
119
120    private static final String MONGODB_INDEX_TEXT = "text";
121
122    private static final String MONGODB_INDEX_NAME = "name";
123
124    private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override";
125
126    private static final String FULLTEXT_INDEX_NAME = "fulltext";
127
128    private static final String LANGUAGE_FIELD = "__language";
129
130    protected static final String COUNTER_NAME_UUID = "ecm:id";
131
132    protected static final String COUNTER_FIELD = "seq";
133
134    protected MongoClient mongoClient;
135
136    protected DBCollection coll;
137
138    protected DBCollection countersColl;
139
140    public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) {
141        super(descriptor.name, descriptor.getFulltextDisabled());
142        try {
143            mongoClient = newMongoClient(descriptor);
144            coll = getCollection(descriptor, mongoClient);
145            countersColl = getCountersCollection(descriptor, mongoClient);
146        } catch (UnknownHostException e) {
147            throw new RuntimeException(e);
148        }
149        initRepository();
150    }
151
152    @Override
153    public void shutdown() {
154        super.shutdown();
155        mongoClient.close();
156    }
157
158    // used also by unit tests
159    public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException {
160        String server = descriptor.server;
161        if (StringUtils.isBlank(server)) {
162            throw new NuxeoException("Missing <server> in MongoDB repository descriptor");
163        }
164        if (server.startsWith("mongodb://")) {
165            // allow mongodb:// URI syntax for the server, to pass everything in one string
166            return new MongoClient(new MongoClientURI(server));
167        } else {
168            return new MongoClient(new ServerAddress(server));
169        }
170    }
171
172    protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) {
173        if (StringUtils.isBlank(dbname)) {
174            dbname = DB_DEFAULT;
175        }
176        DB db = mongoClient.getDB(dbname);
177        return db.getCollection(collection);
178    }
179
180    // used also by unit tests
181    public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
182        return getCollection(mongoClient, descriptor.dbname, descriptor.name);
183    }
184
185    // used also by unit tests
186    public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) {
187        return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters");
188    }
189
190    protected Object valueToBson(Object value) {
191        if (value instanceof State) {
192            return stateToBson((State) value);
193        } else if (value instanceof List) {
194            @SuppressWarnings("unchecked")
195            List<Object> values = (List<Object>) value;
196            return listToBson(values);
197        } else if (value instanceof Object[]) {
198            return listToBson(Arrays.asList((Object[]) value));
199        } else {
200            return serializableToBson(value);
201        }
202    }
203
204    protected DBObject stateToBson(State state) {
205        DBObject ob = new BasicDBObject();
206        for (Entry<String, Serializable> en : state.entrySet()) {
207            Object val = valueToBson(en.getValue());
208            if (val != null) {
209                ob.put(en.getKey(), val);
210            }
211        }
212        return ob;
213    }
214
215    protected List<Object> listToBson(List<Object> values) {
216        ArrayList<Object> objects = new ArrayList<Object>(values.size());
217        for (Object value : values) {
218            objects.add(valueToBson(value));
219        }
220        return objects;
221    }
222
223    protected State bsonToState(DBObject ob) {
224        if (ob == null) {
225            return null;
226        }
227        State state = new State(ob.keySet().size());
228        for (String key : ob.keySet()) {
229            Object val = ob.get(key);
230            Serializable value;
231            if (val instanceof List) {
232                @SuppressWarnings("unchecked")
233                List<Object> list = (List<Object>) val;
234                if (list.isEmpty()) {
235                    value = null;
236                } else {
237                    if (list.get(0) instanceof DBObject) {
238                        List<Serializable> l = new ArrayList<>(list.size());
239                        for (Object el : list) {
240                            l.add(bsonToState((DBObject) el));
241                        }
242                        value = (Serializable) l;
243                    } else {
244                        // turn the list into a properly-typed array
245                        Class<?> klass = list.get(0).getClass();
246                        Object[] ar = (Object[]) Array.newInstance(klass, list.size());
247                        int i = 0;
248                        for (Object el : list) {
249                            ar[i++] = scalarToSerializable(el);
250                        }
251                        value = ar;
252                    }
253                }
254            } else if (val instanceof DBObject) {
255                value = bsonToState((DBObject) val);
256            } else {
257                if (MONGODB_ID.equals(key)) {
258                    // skip ObjectId
259                    continue;
260                }
261                value = scalarToSerializable(val);
262            }
263            state.put(key, value);
264        }
265        return state;
266    }
267
268    public static class Updates {
269        public BasicDBObject set = new BasicDBObject();
270
271        public BasicDBObject unset = new BasicDBObject();
272
273        public BasicDBObject push = new BasicDBObject();
274
275        public BasicDBObject inc = new BasicDBObject();
276    }
277
278    /**
279     * Constructs a list of MongoDB updates from the given {@link StateDiff}.
280     * <p>
281     * We need a list because some cases need two operations to avoid conflicts.
282     */
283    protected List<DBObject> diffToBson(StateDiff diff) {
284        Updates updates = new Updates();
285        diffToUpdates(diff, null, updates);
286        UpdateListBuilder builder = new UpdateListBuilder();
287        for (Entry<String, Object> en : updates.set.entrySet()) {
288            builder.update(MONGODB_SET, en.getKey(), en.getValue());
289        }
290        for (Entry<String, Object> en : updates.unset.entrySet()) {
291            builder.update(MONGODB_UNSET, en.getKey(), en.getValue());
292        }
293        for (Entry<String, Object> en : updates.push.entrySet()) {
294            builder.update(MONGODB_PUSH, en.getKey(), en.getValue());
295        }
296        for (Entry<String, Object> en : updates.inc.entrySet()) {
297            builder.update(MONGODB_INC, en.getKey(), en.getValue());
298        }
299        return builder.updateList;
300    }
301
302    /**
303     * Update list builder to prevent several updates of the same field.
304     * <p>
305     * This happens if two operations act on two fields where one is a prefix of the other.
306     * <p>
307     * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837)
308     *
309     * @since 5.9.5
310     */
311    protected static class UpdateListBuilder {
312
313        protected List<DBObject> updateList = new ArrayList<>(1);
314
315        protected DBObject update;
316
317        protected List<String> keys;
318
319        protected UpdateListBuilder() {
320            newUpdate();
321        }
322
323        protected void newUpdate() {
324            updateList.add(update = new BasicDBObject());
325            keys = new ArrayList<>();
326        }
327
328        protected void update(String op, String key, Object value) {
329            if (conflicts(key, keys)) {
330                newUpdate();
331            }
332            keys.add(key);
333            DBObject map = (DBObject) update.get(op);
334            if (map == null) {
335                update.put(op, map = new BasicDBObject());
336            }
337            map.put(key, value);
338        }
339
340        /**
341         * Checks if the key conflicts with one of the previous keys.
342         * <p>
343         * A conflict occurs if one key is equals to or is a prefix of the other.
344         */
345        protected boolean conflicts(String key, List<String> previousKeys) {
346            String keydot = key + '.';
347            for (String prev : previousKeys) {
348                if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) {
349                    return true;
350                }
351            }
352            return false;
353        }
354    }
355
356    protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) {
357        String elemPrefix = prefix == null ? "" : prefix + '.';
358        for (Entry<String, Serializable> en : diff.entrySet()) {
359            String name = elemPrefix + en.getKey();
360            Serializable value = en.getValue();
361            if (value instanceof StateDiff) {
362                diffToUpdates((StateDiff) value, name, updates);
363            } else if (value instanceof ListDiff) {
364                diffToUpdates((ListDiff) value, name, updates);
365            } else if (value instanceof Delta) {
366                diffToUpdates((Delta) value, name, updates);
367            } else {
368                // not a diff
369                updates.set.put(name, valueToBson(value));
370            }
371        }
372    }
373
374    protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) {
375        if (listDiff.diff != null) {
376            String elemPrefix = prefix == null ? "" : prefix + '.';
377            int i = 0;
378            for (Object value : listDiff.diff) {
379                String name = elemPrefix + i;
380                if (value instanceof StateDiff) {
381                    diffToUpdates((StateDiff) value, name, updates);
382                } else if (value != NOP) {
383                    // set value
384                    updates.set.put(name, valueToBson(value));
385                }
386                i++;
387            }
388        }
389        if (listDiff.rpush != null) {
390            Object pushed;
391            if (listDiff.rpush.size() == 1) {
392                // no need to use $each for one element
393                pushed = valueToBson(listDiff.rpush.get(0));
394            } else {
395                pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush));
396            }
397            updates.push.put(prefix, pushed);
398        }
399    }
400
401    protected void diffToUpdates(Delta delta, String prefix, Updates updates) {
402        Object inc = valueToBson(delta.getDeltaValue());
403        updates.inc.put(prefix, inc);
404    }
405
406    protected Object serializableToBson(Object value) {
407        if (value instanceof Calendar) {
408            return ((Calendar) value).getTime();
409        }
410        return value;
411    }
412
413    protected Serializable scalarToSerializable(Object val) {
414        if (val instanceof Date) {
415            Calendar cal = Calendar.getInstance();
416            cal.setTime((Date) val);
417            return cal;
418        }
419        return (Serializable) val;
420    }
421
422    protected void initRepository() {
423        // create required indexes
424        // code does explicit queries on those
425        coll.createIndex(new BasicDBObject(KEY_ID, ONE));
426        coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE));
427        coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE));
428        coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE));
429        coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE));
430        coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE));
431        coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE));
432        DBObject parentChild = new BasicDBObject();
433        parentChild.put(KEY_PARENT_ID, ONE);
434        parentChild.put(KEY_NAME, ONE);
435        coll.createIndex(parentChild);
436        // often used in user-generated queries
437        coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE));
438        coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE));
439        coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE));
440        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE));
441        coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE));
442        // TODO configure these from somewhere else
443        coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE));
444        coll.createIndex(new BasicDBObject("rend:renditionName", ONE));
445        coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE));
446        coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE));
447        if (!fulltextDisabled) {
448            DBObject indexKeys = new BasicDBObject();
449            indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT);
450            indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT);
451            DBObject indexOptions = new BasicDBObject();
452            indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME);
453            indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD);
454            coll.createIndex(indexKeys, indexOptions);
455        }
456        // check root presence
457        DBObject query = new BasicDBObject(KEY_ID, getRootId());
458        if (coll.findOne(query, justPresenceField()) != null) {
459            return;
460        }
461        // create basic repository structure needed
462        if (DEBUG_UUIDS) {
463            // create the id counter
464            DBObject idCounter = new BasicDBObject();
465            idCounter.put(MONGODB_ID, COUNTER_NAME_UUID);
466            idCounter.put(COUNTER_FIELD, ZERO);
467            countersColl.insert(idCounter);
468        }
469        initRoot();
470    }
471
472    protected Long getNextUuidSeq() {
473        DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID);
474        DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE));
475        boolean returnNew = true;
476        DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false);
477        if (idCounter == null) {
478            throw new RuntimeException("Repository id counter not initialized");
479        }
480        return (Long) idCounter.get(COUNTER_FIELD);
481    }
482
483    @Override
484    public String generateNewId() {
485        if (DEBUG_UUIDS) {
486            Long id = getNextUuidSeq();
487            return "UUID_" + id;
488        } else {
489            return UUID.randomUUID().toString();
490        }
491    }
492
493    @Override
494    public void createState(State state) {
495        DBObject ob = stateToBson(state);
496        if (log.isTraceEnabled()) {
497            log.trace("MongoDB: CREATE " + ob);
498        }
499        coll.insert(ob);
500        // TODO dupe exception
501        // throw new DocumentException("Already exists: " + id);
502    }
503
504    @Override
505    public State readState(String id) {
506        DBObject query = new BasicDBObject(KEY_ID, id);
507        return findOne(query);
508    }
509
510    @Override
511    public List<State> readStates(List<String> ids) {
512        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
513        return findAll(query, ids.size());
514    }
515
516    @Override
517    public void updateState(String id, StateDiff diff) {
518        DBObject query = new BasicDBObject(KEY_ID, id);
519        for (DBObject update : diffToBson(diff)) {
520            if (log.isTraceEnabled()) {
521                log.trace("MongoDB: UPDATE " + id + ": " + update);
522            }
523            coll.update(query, update);
524            // TODO dupe exception
525            // throw new DocumentException("Missing: " + id);
526        }
527    }
528
529    @Override
530    public void deleteStates(Set<String> ids) {
531        DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids));
532        if (log.isTraceEnabled()) {
533            log.trace("MongoDB: REMOVE " + ids);
534        }
535        WriteResult w = coll.remove(query);
536        if (w.getN() != ids.size()) {
537            log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids);
538        }
539    }
540
541    @Override
542    public State readChildState(String parentId, String name, Set<String> ignored) {
543        DBObject query = getChildQuery(parentId, name, ignored);
544        return findOne(query);
545    }
546
547    protected void logQuery(String id, DBObject fields) {
548        logQuery(new BasicDBObject(KEY_ID, id), fields);
549    }
550
551    protected void logQuery(DBObject query, DBObject fields) {
552        if (fields == null) {
553            log.trace("MongoDB: QUERY " + query);
554        } else {
555            log.trace("MongoDB: QUERY " + query + " KEYS " + fields);
556        }
557    }
558
559    protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) {
560        log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy)
561                + " OFFSET " + offset + " LIMIT " + limit);
562    }
563
564    @Override
565    public boolean hasChild(String parentId, String name, Set<String> ignored) {
566        DBObject query = getChildQuery(parentId, name, ignored);
567        if (log.isTraceEnabled()) {
568            logQuery(query, justPresenceField());
569        }
570        return coll.findOne(query, justPresenceField()) != null;
571    }
572
573    protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) {
574        DBObject query = new BasicDBObject();
575        query.put(KEY_PARENT_ID, parentId);
576        query.put(KEY_NAME, name);
577        addIgnoredIds(query, ignored);
578        return query;
579    }
580
581    protected void addIgnoredIds(DBObject query, Set<String> ignored) {
582        if (!ignored.isEmpty()) {
583            DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored));
584            query.put(KEY_ID, notInIds);
585        }
586    }
587
588    @Override
589    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
590        DBObject query = new BasicDBObject(key, value);
591        addIgnoredIds(query, ignored);
592        return findAll(query, 0);
593    }
594
595    @Override
596    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
597        DBObject query = new BasicDBObject(key1, value1);
598        query.put(key2, value2);
599        addIgnoredIds(query, ignored);
600        return findAll(query, 0);
601    }
602
603    @Override
604    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
605            Map<String, Object[]> targetProxies) {
606        DBObject query = new BasicDBObject(key, value);
607        DBObject fields = new BasicDBObject();
608        fields.put(MONGODB_ID, ZERO);
609        fields.put(KEY_ID, ONE);
610        fields.put(KEY_IS_PROXY, ONE);
611        fields.put(KEY_PROXY_TARGET_ID, ONE);
612        fields.put(KEY_PROXY_IDS, ONE);
613        if (log.isTraceEnabled()) {
614            logQuery(query, fields);
615        }
616        DBCursor cursor = coll.find(query, fields);
617        try {
618            for (DBObject ob : cursor) {
619                String id = (String) ob.get(KEY_ID);
620                ids.add(id);
621                if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) {
622                    String targetId = (String) ob.get(KEY_PROXY_TARGET_ID);
623                    proxyTargets.put(id, targetId);
624                }
625                if (targetProxies != null) {
626                    Object[] proxyIds = (Object[]) ob.get(KEY_PROXY_IDS);
627                    if (proxyIds != null) {
628                        targetProxies.put(id, proxyIds);
629                    }
630                }
631            }
632        } finally {
633            cursor.close();
634        }
635    }
636
637    @Override
638    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
639        DBObject query = new BasicDBObject(key, value);
640        addIgnoredIds(query, ignored);
641        if (log.isTraceEnabled()) {
642            logQuery(query, justPresenceField());
643        }
644        return coll.findOne(query, justPresenceField()) != null;
645    }
646
647    protected State findOne(DBObject query) {
648        if (log.isTraceEnabled()) {
649            logQuery(query, null);
650        }
651        return bsonToState(coll.findOne(query));
652    }
653
654    protected List<State> findAll(DBObject query, int sizeHint) {
655        if (log.isTraceEnabled()) {
656            logQuery(query, null);
657        }
658        DBCursor cursor = coll.find(query);
659        Set<String> seen = new HashSet<>();
660        try {
661            List<State> list = new ArrayList<>(sizeHint);
662            for (DBObject ob : cursor) {
663                if (!seen.add((String) ob.get(KEY_ID))) {
664                    // MongoDB cursors may return the same
665                    // object several times
666                    continue;
667                }
668                list.add(bsonToState(ob));
669            }
670            return list;
671        } finally {
672            cursor.close();
673        }
674    }
675
676    protected DBObject justPresenceField() {
677        return new BasicDBObject(MONGODB_ID, ONE);
678    }
679
680    @Override
681    public PartialList<State> queryAndFetch(Expression expression, SelectClause selectClause, OrderByClause orderByClause,
682            int limit, int offset, int countUpTo, DBSExpressionEvaluator evaluator, boolean deepCopy) {
683        MongoDBQueryBuilder builder = new MongoDBQueryBuilder(expression, selectClause, orderByClause,
684                evaluator.pathResolver);
685        builder.walk();
686        if (builder.hasFulltext && fulltextDisabled) {
687            throw new RuntimeException("Fulltext disabled by configuration");
688        }
689        DBObject query = builder.getQuery();
690        addPrincipals(query, evaluator.principals);
691        DBObject orderBy = builder.getOrderBy();
692        DBObject keys = builder.getProjection();
693
694        if (log.isTraceEnabled()) {
695            logQuery(query, keys, orderBy, limit, offset);
696        }
697
698        List<State> list;
699        long totalSize;
700        DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit);
701        try {
702            if (orderBy != null) {
703                cursor = cursor.sort(orderBy);
704            }
705            list = new ArrayList<>();
706            for (DBObject ob : cursor) {
707                list.add(bsonToState(ob));
708            }
709            if (countUpTo == -1) {
710                // count full size
711                if (limit == 0) {
712                    totalSize = list.size();
713                } else {
714                    totalSize = cursor.count();
715                }
716            } else if (countUpTo == 0) {
717                // no count
718                totalSize = -1; // not counted
719            } else {
720                // count only if less than countUpTo
721                if (limit == 0) {
722                    totalSize = list.size();
723                } else {
724                    totalSize = cursor.copy().limit(countUpTo + 1).count();
725                }
726                if (totalSize > countUpTo) {
727                    totalSize = -2; // truncated
728                }
729            }
730        } finally {
731            cursor.close();
732        }
733        if (log.isTraceEnabled() && list.size() != 0) {
734            log.trace("MongoDB:    -> " + list.size());
735        }
736        return new PartialList<>(list, totalSize);
737    }
738
739    protected void addPrincipals(DBObject query, Set<String> principals) {
740        if (principals != null) {
741            DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals));
742            query.put(DBSDocument.KEY_READ_ACL, inPrincipals);
743        }
744    }
745
746    /** Keys used for document projection when marking all binaries for GC. */
747    protected DBObject binaryKeys;
748
749    @Override
750    protected void initBlobsPaths() {
751        MongoDBBlobFinder finder = new MongoDBBlobFinder();
752        finder.visit();
753        binaryKeys = finder.binaryKeys;
754    }
755
756    protected static class MongoDBBlobFinder extends BlobFinder {
757        protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO);
758
759        @Override
760        protected void recordBlobPath() {
761            path.addLast(KEY_BLOB_DATA);
762            binaryKeys.put(StringUtils.join(path, "."), ONE);
763            path.removeLast();
764        }
765    }
766
767    @Override
768    public void markReferencedBinaries() {
769        BlobManager blobManager = Framework.getService(BlobManager.class);
770        // TODO add a query to not scan all documents
771        if (log.isTraceEnabled()) {
772            logQuery(new BasicDBObject(), binaryKeys);
773        }
774        DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys);
775        try {
776            for (DBObject ob : cursor) {
777                markReferencedBinaries(ob, blobManager);
778            }
779        } finally {
780            cursor.close();
781        }
782    }
783
784    protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) {
785        for (String key : ob.keySet()) {
786            Object value = ob.get(key);
787            if (value instanceof List) {
788                @SuppressWarnings("unchecked")
789                List<Object> list = (List<Object>) value;
790                for (Object v : list) {
791                    if (v instanceof DBObject) {
792                        markReferencedBinaries((DBObject) v, blobManager);
793                    } else {
794                        markReferencedBinary(v, blobManager);
795                    }
796                }
797            } else if (value instanceof Object[]) {
798                for (Object v : (Object[]) value) {
799                    markReferencedBinary(v, blobManager);
800                }
801            } else if (value instanceof DBObject) {
802                markReferencedBinaries((DBObject) value, blobManager);
803            } else {
804                markReferencedBinary(value, blobManager);
805            }
806        }
807    }
808
809    protected void markReferencedBinary(Object value, BlobManager blobManager) {
810        if (!(value instanceof String)) {
811            return;
812        }
813        String key = (String) value;
814        blobManager.markReferencedBinary(key, repositoryName);
815    }
816
817    protected static final DBObject LOCK_FIELDS;
818
819    static {
820        LOCK_FIELDS = new BasicDBObject();
821        LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE);
822        LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE);
823    }
824
825    protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS);
826
827    @Override
828    public Lock getLock(String id) {
829        if (log.isTraceEnabled()) {
830            logQuery(id, LOCK_FIELDS);
831        }
832        DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
833        if (res == null) {
834            // document not found
835            throw new DocumentNotFoundException(id);
836        }
837        String owner = (String) res.get(KEY_LOCK_OWNER);
838        if (owner == null) {
839            // not locked
840            return null;
841        }
842        Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED));
843        return new Lock(owner, created);
844    }
845
846    @Override
847    public Lock setLock(String id, Lock lock) {
848        DBObject query = new BasicDBObject(KEY_ID, id);
849        query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set
850        DBObject setLock = new BasicDBObject();
851        setLock.put(KEY_LOCK_OWNER, lock.getOwner());
852        setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated()));
853        DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock);
854        if (log.isTraceEnabled()) {
855            log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate);
856        }
857        DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false);
858        if (res != null) {
859            // found a doc to lock
860            return null;
861        } else {
862            // doc not found, or lock owner already set
863            // get the old lock
864            if (log.isTraceEnabled()) {
865                logQuery(id, LOCK_FIELDS);
866            }
867            DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
868            if (old == null) {
869                // document not found
870                throw new DocumentNotFoundException(id);
871            }
872            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
873            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
874            if (oldOwner != null) {
875                return new Lock(oldOwner, oldCreated);
876            }
877            // no lock -- there was a race condition
878            // TODO do better
879            throw new ConcurrentUpdateException("Lock " + id);
880        }
881    }
882
883    @Override
884    public Lock removeLock(String id, String owner) {
885        DBObject query = new BasicDBObject(KEY_ID, id);
886        if (owner != null) {
887            // remove if owner matches or null
888            // implements LockManager.canLockBeRemoved inside MongoDB
889            Object ownerOrNull = Arrays.asList(owner, null);
890            query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull));
891        } // else unconditional remove
892        // remove the lock
893        DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false);
894        if (old != null) {
895            // found a doc and removed the lock, return previous lock
896            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
897            if (oldOwner == null) {
898                // was not locked
899                return null;
900            } else {
901                // return previous lock
902                Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
903                return new Lock(oldOwner, oldCreated);
904            }
905        } else {
906            // doc not found, or lock owner didn't match
907            // get the old lock
908            if (log.isTraceEnabled()) {
909                logQuery(id, LOCK_FIELDS);
910            }
911            old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS);
912            if (old == null) {
913                // document not found
914                throw new DocumentNotFoundException(id);
915            }
916            String oldOwner = (String) old.get(KEY_LOCK_OWNER);
917            Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED));
918            if (oldOwner != null) {
919                if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
920                    // existing mismatched lock, flag failure
921                    return new Lock(oldOwner, oldCreated, true);
922                }
923                // old owner should have matched -- there was a race condition
924                // TODO do better
925                throw new ConcurrentUpdateException("Unlock " + id);
926            }
927            // old owner null, should have matched -- there was a race condition
928            // TODO do better
929            throw new ConcurrentUpdateException("Unlock " + id);
930        }
931    }
932
933    @Override
934    public void closeLockManager() {
935
936    }
937
938    @Override
939    public void clearLockManagerCaches() {
940    }
941
942}