001/*
002 * (C) Copyright 2014-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mongodb;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
024
025import java.time.Duration;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Set;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.logging.log4j.LogManager;
033import org.apache.logging.log4j.Logger;
034import org.bson.Document;
035import org.bson.conversions.Bson;
036import org.nuxeo.ecm.core.blob.DocumentBlobManager;
037import org.nuxeo.ecm.core.model.Repository;
038import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
039import org.nuxeo.ecm.core.storage.dbs.DBSSession;
040import org.nuxeo.runtime.api.Framework;
041import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
042
043import com.mongodb.MongoClientException;
044import com.mongodb.client.ClientSession;
045import com.mongodb.client.MongoClient;
046import com.mongodb.client.MongoCollection;
047import com.mongodb.client.MongoDatabase;
048import com.mongodb.client.model.Projections;
049
050/**
051 * MongoDB implementation of a {@link Repository}.
052 *
053 * @since 5.9.4
054 */
055public class MongoDBRepository extends DBSRepositoryBase {
056
057    private static final Logger log = LogManager.getLogger(MongoDBRepository.class);
058
059    /**
060     * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}.
061     * <p>
062     * The connection id will be {@code repository/[REPOSITORY_NAME]}.
063     */
064    public static final String REPOSITORY_CONNECTION_PREFIX = "repository/";
065
066    public static final Long LONG_ZERO = Long.valueOf(0);
067
068    public static final Double ZERO = Double.valueOf(0);
069
070    public static final Double ONE = Double.valueOf(1);
071
072    public static final String MONGODB_ID = "_id";
073
074    public static final String MONGODB_INC = "$inc";
075
076    public static final String MONGODB_SET = "$set";
077
078    public static final String MONGODB_UNSET = "$unset";
079
080    public static final String MONGODB_PUSH = "$push";
081
082    /** @since 11.5 */
083    public static final String MONGODB_PULLALL = "$pullAll";
084
085    public static final String MONGODB_EACH = "$each";
086
087    public static final String MONGODB_META = "$meta";
088
089    public static final String MONGODB_TEXT_SCORE = "textScore";
090
091    public static final String FULLTEXT_INDEX_NAME = "fulltext";
092
093    public static final String LANGUAGE_FIELD = "__language";
094
095    protected static final int SEQUENCE_RANDOMIZED_BLOCKSIZE_DEFAULT = 1000;
096
097    public static final String COUNTER_NAME_UUID = "ecm:id";
098
099    public static final String COUNTER_FIELD = "seq";
100
101    /**
102     * Default maximum execution time for a query.
103     *
104     * @since 11.1
105     */
106    protected static final Duration MAX_TIME_DEFAULT = Duration.ofHours(1);
107
108    /** The key to use to store the id in the database. */
109    protected String idKey;
110
111    /** Sequence allocation block size. */
112    protected long sequenceBlockSize;
113
114    protected final MongoDBRepositoryDescriptor descriptor;
115
116    protected final MongoClient mongoClient;
117
118    protected final MongoDBConverter converter;
119
120    protected final MongoDBCursorService cursorService;
121
122    protected final MongoCollection<Document> coll;
123
124    protected final MongoCollection<Document> countersColl;
125
126    protected final boolean supportsSessions;
127
128    protected final boolean supportsTransactions;
129
130    /**
131     * Maximum execution time for a query.
132     *
133     * @since 11.1
134     */
135    protected final long maxTimeMS;
136
137    public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) {
138        super(descriptor.name, descriptor);
139        this.descriptor = descriptor;
140
141        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
142        String connectionId = REPOSITORY_CONNECTION_PREFIX + descriptor.name;
143        mongoClient = mongoService.getClient(connectionId);
144        String dbname = mongoService.getDatabaseName(connectionId);
145        MongoDatabase database = mongoClient.getDatabase(dbname);
146        coll = database.getCollection(descriptor.name);
147        countersColl = database.getCollection(descriptor.name + ".counters");
148        Duration maxTime = mongoService.getConfig(connectionId).maxTime;
149        if (maxTime == null) {
150            maxTime = MAX_TIME_DEFAULT;
151        }
152        maxTimeMS = maxTime.toMillis();
153
154        if (Boolean.TRUE.equals(descriptor.nativeId)) {
155            idKey = MONGODB_ID;
156        } else {
157            idKey = KEY_ID;
158        }
159        boolean useCustomId = KEY_ID.equals(idKey);
160        if (idType == IdType.sequence || idType == IdType.sequenceHexRandomized || DEBUG_UUIDS) {
161            Integer sbs = descriptor.sequenceBlockSize;
162            if (sbs == null) {
163                sequenceBlockSize = idType == IdType.sequenceHexRandomized ? SEQUENCE_RANDOMIZED_BLOCKSIZE_DEFAULT : 1;
164            } else {
165                sequenceBlockSize = sbs.longValue();
166            }
167        }
168        Set<String> idValuesKeys;
169        if (idType == IdType.sequenceHexRandomized) {
170            // store these ids as longs
171            idValuesKeys = DBSSession.ID_VALUES_KEYS;
172        } else {
173            idValuesKeys = Set.of();
174        }
175        converter = new MongoDBConverter(useCustomId ? null : KEY_ID, DBSSession.TRUE_OR_NULL_BOOLEAN_KEYS,
176                idValuesKeys);
177        cursorService = new MongoDBCursorService(converter);
178
179        // check session and transaction support
180        boolean hasSessions;
181        boolean hasTransactions;
182        try (ClientSession session = mongoClient.startSession()) {
183            hasSessions = true;
184            try {
185                session.startTransaction();
186                session.abortTransaction();
187                hasTransactions = true;
188            } catch (MongoClientException e) {
189                hasTransactions = false;
190            }
191        } catch (MongoClientException ee) {
192            // startSession may throw
193            // "Sessions are not supported by the MongoDB cluster to which this client is connected"
194            // startTransaction may throw
195            hasSessions = false;
196            hasTransactions = false;
197        }
198        // TODO: reactivate sessions/transactions when they can be better tested
199        hasSessions = false;
200        hasTransactions = false;
201        supportsSessions = hasSessions;
202        supportsTransactions = hasTransactions;
203        initRepository();
204    }
205
206    @Override
207    public void shutdown() {
208        super.shutdown();
209        cursorService.clear();
210    }
211
212    protected void initRepository() {
213        // the first connection will init the repository
214        getConnection().close();
215    }
216
217    @Override
218    public MongoDBConnection getConnection() {
219        return new MongoDBConnection(this);
220    }
221
222    @Override
223    public List<IdType> getAllowedIdTypes() {
224        return Arrays.asList(IdType.varchar, IdType.sequence, IdType.sequenceHexRandomized);
225    }
226
227    protected boolean supportsSessions() {
228        return supportsSessions;
229    }
230
231    @Override
232    public boolean supportsTransactions() {
233        return supportsTransactions;
234    }
235
236    protected MongoClient getClient() {
237        return mongoClient;
238    }
239
240    protected MongoDBCursorService getCursorService() {
241        return cursorService;
242    }
243
244    protected MongoCollection<Document> getCollection() {
245        return coll;
246    }
247
248    protected MongoCollection<Document> getCountersCollection() {
249        return countersColl;
250    }
251
252    protected String getIdKey() {
253        return idKey;
254    }
255
256    protected MongoDBConverter getConverter() {
257        return converter;
258    }
259
260    /** Keys used for document projection when marking all binaries for GC. */
261    protected Bson binaryKeys;
262
263    @Override
264    protected void initBlobsPaths() {
265        MongoDBBlobFinder finder = new MongoDBBlobFinder();
266        finder.visit();
267        if (isFulltextStoredInBlob()) {
268            finder.recordBlobKey(KEY_FULLTEXT_BINARY);
269        }
270        binaryKeys = Projections.fields(finder.binaryKeys);
271    }
272
273    protected static class MongoDBBlobFinder extends BlobFinder {
274        protected List<Bson> binaryKeys = new ArrayList<>(Set.of(Projections.excludeId()));
275
276        @Override
277        protected void recordBlobPath() {
278            path.addLast(KEY_BLOB_DATA);
279            recordBlobKey(StringUtils.join(path, "."));
280            path.removeLast();
281        }
282
283        protected void recordBlobKey(String key) {
284            binaryKeys.add(Projections.include(key));
285        }
286    }
287
288    @Override
289    public void markReferencedBinaries() {
290        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
291        // TODO add a query to not scan all documents
292        log.trace("MongoDB: QUERY {} KEYS {}", Document::new, () -> binaryKeys);
293        coll.find().projection(binaryKeys).forEach(doc -> markReferencedBinaries(doc, blobManager));
294    }
295
296    protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) {
297        for (var value : ob.values()) {
298            if (value instanceof List) {
299                @SuppressWarnings("unchecked")
300                List<Object> list = (List<Object>) value;
301                for (Object v : list) {
302                    if (v instanceof Document) {
303                        markReferencedBinaries((Document) v, blobManager);
304                    } else {
305                        markReferencedBinary(v, blobManager);
306                    }
307                }
308            } else if (value instanceof Object[]) {
309                for (Object v : (Object[]) value) {
310                    markReferencedBinary(v, blobManager);
311                }
312            } else if (value instanceof Document) {
313                markReferencedBinaries((Document) value, blobManager);
314            } else {
315                markReferencedBinary(value, blobManager);
316            }
317        }
318    }
319
320    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
321        if (!(value instanceof String)) {
322            return;
323        }
324        String key = (String) value;
325        blobManager.markReferencedBinary(key, repositoryName);
326    }
327
328}