001/* 002 * (C) Copyright 2014-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 024 025import java.time.Duration; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Set; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.logging.log4j.LogManager; 033import org.apache.logging.log4j.Logger; 034import org.bson.Document; 035import org.bson.conversions.Bson; 036import org.nuxeo.ecm.core.blob.DocumentBlobManager; 037import org.nuxeo.ecm.core.model.Repository; 038import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 039import org.nuxeo.ecm.core.storage.dbs.DBSSession; 040import org.nuxeo.runtime.api.Framework; 041import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 042 043import com.mongodb.MongoClientException; 044import com.mongodb.client.ClientSession; 045import com.mongodb.client.MongoClient; 046import com.mongodb.client.MongoCollection; 047import com.mongodb.client.MongoDatabase; 048import com.mongodb.client.model.Projections; 049 050/** 051 * MongoDB implementation of a {@link Repository}. 052 * 053 * @since 5.9.4 054 */ 055public class MongoDBRepository extends DBSRepositoryBase { 056 057 private static final Logger log = LogManager.getLogger(MongoDBRepository.class); 058 059 /** 060 * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}. 061 * <p> 062 * The connection id will be {@code repository/[REPOSITORY_NAME]}. 063 */ 064 public static final String REPOSITORY_CONNECTION_PREFIX = "repository/"; 065 066 public static final Long LONG_ZERO = Long.valueOf(0); 067 068 public static final Double ZERO = Double.valueOf(0); 069 070 public static final Double ONE = Double.valueOf(1); 071 072 public static final String MONGODB_ID = "_id"; 073 074 public static final String MONGODB_INC = "$inc"; 075 076 public static final String MONGODB_SET = "$set"; 077 078 public static final String MONGODB_UNSET = "$unset"; 079 080 public static final String MONGODB_PUSH = "$push"; 081 082 /** @since 11.5 */ 083 public static final String MONGODB_PULLALL = "$pullAll"; 084 085 public static final String MONGODB_EACH = "$each"; 086 087 public static final String MONGODB_META = "$meta"; 088 089 public static final String MONGODB_TEXT_SCORE = "textScore"; 090 091 public static final String FULLTEXT_INDEX_NAME = "fulltext"; 092 093 public static final String LANGUAGE_FIELD = "__language"; 094 095 protected static final int SEQUENCE_RANDOMIZED_BLOCKSIZE_DEFAULT = 1000; 096 097 public static final String COUNTER_NAME_UUID = "ecm:id"; 098 099 public static final String COUNTER_FIELD = "seq"; 100 101 /** 102 * Default maximum execution time for a query. 103 * 104 * @since 11.1 105 */ 106 protected static final Duration MAX_TIME_DEFAULT = Duration.ofHours(1); 107 108 /** The key to use to store the id in the database. */ 109 protected String idKey; 110 111 /** Sequence allocation block size. */ 112 protected long sequenceBlockSize; 113 114 protected final MongoDBRepositoryDescriptor descriptor; 115 116 protected final MongoClient mongoClient; 117 118 protected final MongoDBConverter converter; 119 120 protected final MongoDBCursorService cursorService; 121 122 protected final MongoCollection<Document> coll; 123 124 protected final MongoCollection<Document> countersColl; 125 126 protected final boolean supportsSessions; 127 128 protected final boolean supportsTransactions; 129 130 /** 131 * Maximum execution time for a query. 132 * 133 * @since 11.1 134 */ 135 protected final long maxTimeMS; 136 137 public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) { 138 super(descriptor.name, descriptor); 139 this.descriptor = descriptor; 140 141 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 142 String connectionId = REPOSITORY_CONNECTION_PREFIX + descriptor.name; 143 mongoClient = mongoService.getClient(connectionId); 144 String dbname = mongoService.getDatabaseName(connectionId); 145 MongoDatabase database = mongoClient.getDatabase(dbname); 146 coll = database.getCollection(descriptor.name); 147 countersColl = database.getCollection(descriptor.name + ".counters"); 148 Duration maxTime = mongoService.getConfig(connectionId).maxTime; 149 if (maxTime == null) { 150 maxTime = MAX_TIME_DEFAULT; 151 } 152 maxTimeMS = maxTime.toMillis(); 153 154 if (Boolean.TRUE.equals(descriptor.nativeId)) { 155 idKey = MONGODB_ID; 156 } else { 157 idKey = KEY_ID; 158 } 159 boolean useCustomId = KEY_ID.equals(idKey); 160 if (idType == IdType.sequence || idType == IdType.sequenceHexRandomized || DEBUG_UUIDS) { 161 Integer sbs = descriptor.sequenceBlockSize; 162 if (sbs == null) { 163 sequenceBlockSize = idType == IdType.sequenceHexRandomized ? SEQUENCE_RANDOMIZED_BLOCKSIZE_DEFAULT : 1; 164 } else { 165 sequenceBlockSize = sbs.longValue(); 166 } 167 } 168 Set<String> idValuesKeys; 169 if (idType == IdType.sequenceHexRandomized) { 170 // store these ids as longs 171 idValuesKeys = DBSSession.ID_VALUES_KEYS; 172 } else { 173 idValuesKeys = Set.of(); 174 } 175 converter = new MongoDBConverter(useCustomId ? null : KEY_ID, DBSSession.TRUE_OR_NULL_BOOLEAN_KEYS, 176 idValuesKeys); 177 cursorService = new MongoDBCursorService(converter); 178 179 // check session and transaction support 180 boolean hasSessions; 181 boolean hasTransactions; 182 try (ClientSession session = mongoClient.startSession()) { 183 hasSessions = true; 184 try { 185 session.startTransaction(); 186 session.abortTransaction(); 187 hasTransactions = true; 188 } catch (MongoClientException e) { 189 hasTransactions = false; 190 } 191 } catch (MongoClientException ee) { 192 // startSession may throw 193 // "Sessions are not supported by the MongoDB cluster to which this client is connected" 194 // startTransaction may throw 195 hasSessions = false; 196 hasTransactions = false; 197 } 198 // TODO: reactivate sessions/transactions when they can be better tested 199 hasSessions = false; 200 hasTransactions = false; 201 supportsSessions = hasSessions; 202 supportsTransactions = hasTransactions; 203 initRepository(); 204 } 205 206 @Override 207 public void shutdown() { 208 super.shutdown(); 209 cursorService.clear(); 210 } 211 212 protected void initRepository() { 213 // the first connection will init the repository 214 getConnection().close(); 215 } 216 217 @Override 218 public MongoDBConnection getConnection() { 219 return new MongoDBConnection(this); 220 } 221 222 @Override 223 public List<IdType> getAllowedIdTypes() { 224 return Arrays.asList(IdType.varchar, IdType.sequence, IdType.sequenceHexRandomized); 225 } 226 227 protected boolean supportsSessions() { 228 return supportsSessions; 229 } 230 231 @Override 232 public boolean supportsTransactions() { 233 return supportsTransactions; 234 } 235 236 protected MongoClient getClient() { 237 return mongoClient; 238 } 239 240 protected MongoDBCursorService getCursorService() { 241 return cursorService; 242 } 243 244 protected MongoCollection<Document> getCollection() { 245 return coll; 246 } 247 248 protected MongoCollection<Document> getCountersCollection() { 249 return countersColl; 250 } 251 252 protected String getIdKey() { 253 return idKey; 254 } 255 256 protected MongoDBConverter getConverter() { 257 return converter; 258 } 259 260 /** Keys used for document projection when marking all binaries for GC. */ 261 protected Bson binaryKeys; 262 263 @Override 264 protected void initBlobsPaths() { 265 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 266 finder.visit(); 267 if (isFulltextStoredInBlob()) { 268 finder.recordBlobKey(KEY_FULLTEXT_BINARY); 269 } 270 binaryKeys = Projections.fields(finder.binaryKeys); 271 } 272 273 protected static class MongoDBBlobFinder extends BlobFinder { 274 protected List<Bson> binaryKeys = new ArrayList<>(Set.of(Projections.excludeId())); 275 276 @Override 277 protected void recordBlobPath() { 278 path.addLast(KEY_BLOB_DATA); 279 recordBlobKey(StringUtils.join(path, ".")); 280 path.removeLast(); 281 } 282 283 protected void recordBlobKey(String key) { 284 binaryKeys.add(Projections.include(key)); 285 } 286 } 287 288 @Override 289 public void markReferencedBinaries() { 290 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 291 // TODO add a query to not scan all documents 292 log.trace("MongoDB: QUERY {} KEYS {}", Document::new, () -> binaryKeys); 293 coll.find().projection(binaryKeys).forEach(doc -> markReferencedBinaries(doc, blobManager)); 294 } 295 296 protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) { 297 for (var value : ob.values()) { 298 if (value instanceof List) { 299 @SuppressWarnings("unchecked") 300 List<Object> list = (List<Object>) value; 301 for (Object v : list) { 302 if (v instanceof Document) { 303 markReferencedBinaries((Document) v, blobManager); 304 } else { 305 markReferencedBinary(v, blobManager); 306 } 307 } 308 } else if (value instanceof Object[]) { 309 for (Object v : (Object[]) value) { 310 markReferencedBinary(v, blobManager); 311 } 312 } else if (value instanceof Document) { 313 markReferencedBinaries((Document) value, blobManager); 314 } else { 315 markReferencedBinary(value, blobManager); 316 } 317 } 318 } 319 320 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 321 if (!(value instanceof String)) { 322 return; 323 } 324 String key = (String) value; 325 blobManager.markReferencedBinary(key, repositoryName); 326 } 327 328}