001/* 002 * (C) Copyright 2014-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static com.mongodb.ErrorCategory.DUPLICATE_KEY; 022import static com.mongodb.ErrorCategory.fromErrorCode; 023import static java.util.concurrent.TimeUnit.MILLISECONDS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_TRASHED; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_RETAIN_UNTIL; 044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 045import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.COUNTER_FIELD; 046import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.COUNTER_NAME_UUID; 047import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.FULLTEXT_INDEX_NAME; 048import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.LANGUAGE_FIELD; 049import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.MONGODB_ID; 050import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.MONGODB_SET; 051import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.ONE; 052import static org.nuxeo.ecm.core.storage.mongodb.MongoDBRepository.ZERO; 053 054import java.io.Serializable; 055import java.security.SecureRandom; 056import java.util.ArrayList; 057import java.util.Arrays; 058import java.util.Calendar; 059import java.util.Collection; 060import java.util.HashSet; 061import java.util.List; 062import java.util.Map; 063import java.util.Random; 064import java.util.Set; 065import java.util.Spliterators; 066import java.util.UUID; 067import java.util.stream.Collectors; 068import java.util.stream.Stream; 069import java.util.stream.StreamSupport; 070 071import org.apache.logging.log4j.LogManager; 072import org.apache.logging.log4j.Logger; 073import org.bson.Document; 074import org.bson.conversions.Bson; 075import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 076import org.nuxeo.ecm.core.api.DocumentNotFoundException; 077import org.nuxeo.ecm.core.api.Lock; 078import org.nuxeo.ecm.core.api.NuxeoException; 079import org.nuxeo.ecm.core.api.PartialList; 080import org.nuxeo.ecm.core.api.ScrollResult; 081import org.nuxeo.ecm.core.api.lock.LockManager; 082import org.nuxeo.ecm.core.query.QueryParseException; 083import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 084import org.nuxeo.ecm.core.storage.State; 085import org.nuxeo.ecm.core.storage.State.StateDiff; 086import org.nuxeo.ecm.core.storage.dbs.DBSConnection; 087import org.nuxeo.ecm.core.storage.dbs.DBSConnectionBase; 088import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 089import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 090import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase.IdType; 091import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 092import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 093import org.nuxeo.runtime.mongodb.MongoDBOperators; 094 095import com.mongodb.DuplicateKeyException; 096import com.mongodb.MongoBulkWriteException; 097import com.mongodb.MongoExecutionTimeoutException; 098import com.mongodb.MongoWriteException; 099import com.mongodb.bulk.BulkWriteError; 100import com.mongodb.client.ClientSession; 101import com.mongodb.client.FindIterable; 102import com.mongodb.client.MongoCollection; 103import com.mongodb.client.MongoCursor; 104import com.mongodb.client.MongoIterable; 105import com.mongodb.client.model.CountOptions; 106import com.mongodb.client.model.Filters; 107import com.mongodb.client.model.FindOneAndUpdateOptions; 108import com.mongodb.client.model.IndexOptions; 109import com.mongodb.client.model.Indexes; 110import com.mongodb.client.model.Projections; 111import com.mongodb.client.model.ReturnDocument; 112import com.mongodb.client.model.Updates; 113import com.mongodb.client.result.DeleteResult; 114import com.mongodb.client.result.UpdateResult; 115 116/** 117 * MongoDB implementation of a {@link DBSConnection}. 118 * 119 * @since 11.1 (introduce in 5.9.4 as MongoDBRepository) 120 */ 121public class MongoDBConnection extends DBSConnectionBase { 122 123 private static final Logger log = LogManager.getLogger(MongoDBConnection.class); 124 125 protected static final Random RANDOM = new SecureRandom(); 126 127 protected final MongoDBRepository mongoDBRepository; 128 129 protected final MongoCollection<Document> coll; 130 131 /** The key to use to store the id in the database. */ 132 protected final String idKey; 133 134 /** True if we don't use MongoDB's native "_id" key to store the id. */ 135 protected final boolean useCustomId; 136 137 /** Number of values still available in the in-memory sequence. */ 138 protected long sequenceLeft; 139 140 /** 141 * Last value or randomized value used from the in-memory sequence. 142 * <p> 143 * When used as a randomized sequence, this value (and the rest of the next block) may only be used after a 144 * successful update of the in-database version for the next task needing a randomized value. 145 */ 146 protected long sequenceLastValue; 147 148 protected final MongoDBConverter converter; 149 150 protected ClientSession clientSession; 151 152 protected boolean transactionStarted; 153 154 public MongoDBConnection(MongoDBRepository repository) { 155 super(repository); 156 mongoDBRepository = repository; 157 coll = repository.getCollection(); 158 idKey = repository.getIdKey(); 159 useCustomId = KEY_ID.equals(idKey); 160 converter = repository.getConverter(); 161 if (repository.supportsSessions()) { 162 clientSession = repository.getClient().startSession(); 163 } else { 164 clientSession = null; 165 } 166 initRepository(repository.descriptor); 167 } 168 169 @Override 170 public void close() { 171 if (clientSession != null) { 172 clientSession.close(); 173 clientSession = null; 174 } 175 } 176 177 @Override 178 public void begin() { 179 if (clientSession != null) { 180 clientSession.startTransaction(); 181 transactionStarted = true; 182 } 183 } 184 185 @Override 186 public void commit() { 187 if (clientSession != null) { 188 try { 189 clientSession.commitTransaction(); 190 } finally { 191 transactionStarted = false; 192 } 193 } 194 } 195 196 @Override 197 public void rollback() { 198 if (clientSession != null) { 199 try { 200 clientSession.abortTransaction(); 201 } finally { 202 transactionStarted = false; 203 } 204 } 205 } 206 207 /** 208 * Initializes the MongoDB repository 209 * 210 * @param descriptor the MongoDB repository descriptor 211 * @since 11.1 212 */ 213 protected void initRepository(MongoDBRepositoryDescriptor descriptor) { 214 // check root presence 215 if (coll.countDocuments(converter.filterEq(KEY_ID, getRootId())) > 0) { 216 return; 217 } 218 // create required indexes 219 // code does explicit queries on those 220 if (useCustomId) { 221 coll.createIndex(Indexes.ascending(KEY_ID), new IndexOptions().unique(true)); 222 } 223 coll.createIndex(Indexes.ascending(KEY_PARENT_ID)); 224 coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS)); 225 coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID)); 226 coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID)); 227 coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID)); 228 coll.createIndex(Indexes.ascending(KEY_READ_ACL)); 229 IndexOptions parentNameIndexOptions = new IndexOptions(); 230 if (descriptor != null) { 231 parentNameIndexOptions.unique(Boolean.TRUE.equals(descriptor.getChildNameUniqueConstraintEnabled())); 232 } 233 coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME), parentNameIndexOptions); 234 // often used in user-generated queries 235 coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE)); 236 coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE)); 237 coll.createIndex(Indexes.ascending(KEY_IS_TRASHED)); 238 coll.createIndex(Indexes.ascending(KEY_RETAIN_UNTIL)); 239 if (!repository.isFulltextDisabled()) { 240 coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID)); 241 } 242 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER)); 243 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS)); 244 // TODO configure these from somewhere else 245 coll.createIndex(Indexes.descending("dc:modified")); 246 coll.createIndex(Indexes.ascending("rend:renditionName")); 247 coll.createIndex(Indexes.ascending("rend:sourceId")); 248 coll.createIndex(Indexes.ascending("rend:sourceVersionableId")); 249 coll.createIndex(Indexes.ascending("drv:subscriptions.enabled")); 250 coll.createIndex(Indexes.ascending("collectionMember:collectionIds")); 251 coll.createIndex(Indexes.ascending("nxtag:tags")); 252 coll.createIndex(Indexes.ascending("coldstorage:beingRetrieved")); 253 if (!repository.isFulltextSearchDisabled()) { 254 Bson indexKeys = Indexes.compoundIndex( // 255 Indexes.text(KEY_FULLTEXT_SIMPLE), // 256 Indexes.text(KEY_FULLTEXT_BINARY) // 257 ); 258 IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD); 259 coll.createIndex(indexKeys, indexOptions); 260 } 261 // create basic repository structure needed 262 IdType idType = repository.getIdType(); 263 if (idType == IdType.sequence || idType == IdType.sequenceHexRandomized || DBSRepositoryBase.DEBUG_UUIDS) { 264 // create the id counter 265 long counter; 266 if (idType == IdType.sequenceHexRandomized) { 267 counter = randomInitialSeed(); 268 } else { 269 counter = 0; 270 } 271 MongoCollection<Document> countersColl = mongoDBRepository.getCountersCollection(); 272 Document idCounter = new Document(); 273 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 274 idCounter.put(COUNTER_FIELD, Long.valueOf(counter)); 275 countersColl.insertOne(idCounter); 276 } 277 initRoot(); 278 } 279 280 protected synchronized long getNextSequenceId() { 281 long sequenceBlockSize = mongoDBRepository.sequenceBlockSize; 282 if (repository.getIdType() == IdType.sequence) { 283 if (sequenceLeft == 0) { 284 sequenceLeft = sequenceBlockSize; 285 sequenceLastValue = updateSequence(); 286 } 287 sequenceLastValue++; 288 } else { // idType == IdType.sequenceHexRandomized 289 if (sequenceLeft == 0) { 290 sequenceLeft = sequenceBlockSize; 291 sequenceLastValue = updateRandomizedSequence(); 292 } 293 sequenceLastValue = xorshift(sequenceLastValue); 294 } 295 sequenceLeft--; 296 return sequenceLastValue; 297 } 298 299 /** 300 * Allocates a new sequence block. The database contains the last value from the last block. 301 */ 302 protected long updateSequence() { 303 long sequenceBlockSize = mongoDBRepository.sequenceBlockSize; 304 MongoCollection<Document> countersColl = mongoDBRepository.getCountersCollection(); 305 Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID); 306 Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)); 307 Document idCounter = countersColl.findOneAndUpdate(filter, update, 308 new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)); 309 if (idCounter == null) { 310 throw new NuxeoException("Repository id counter not initialized"); 311 } 312 return idCounter.getLong(COUNTER_FIELD).longValue() - sequenceBlockSize; 313 } 314 315 /** 316 * Updates the randomized sequence, using xorshift. 317 */ 318 protected Long tryUpdateRandomizedSequence() { 319 long sequenceBlockSize = mongoDBRepository.sequenceBlockSize; 320 MongoCollection<Document> countersColl = mongoDBRepository.getCountersCollection(); 321 // find the current value 322 Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID); 323 Document res = countersColl.find(filter).first(); 324 if (res == null) { 325 throw new NuxeoException("Failed to read " + filter + " in collection " + countersColl.getNamespace()); 326 } 327 Long lastValue = res.getLong(COUNTER_FIELD); 328 // find the next value after this block is done 329 long newValue = xorshift(lastValue, sequenceBlockSize); 330 // store the next value for whoever needs it next 331 Bson updateFilter = Filters.and( // 332 filter, // 333 Filters.eq(COUNTER_FIELD, lastValue) // 334 ); 335 Bson update = Updates.set(COUNTER_FIELD, newValue); 336 log.trace("MongoDB: FINDANDMODIFY {} UPDATE {}", updateFilter, update); 337 boolean updated = countersColl.findOneAndUpdate(updateFilter, update) != null; 338 if (updated) { 339 return lastValue; 340 } else { 341 log.trace("MongoDB: -> FAILED (will retry)"); 342 return null; 343 } 344 } 345 346 protected static final int NB_TRY = 15; 347 348 protected long updateRandomizedSequence() { 349 long sleepDuration = 1; // start with 1ms 350 for (int i = 0; i < NB_TRY; i++) { 351 Long value = tryUpdateRandomizedSequence(); 352 if (value != null) { 353 return value.longValue(); 354 } 355 try { 356 Thread.sleep(sleepDuration); 357 } catch (InterruptedException e) { 358 Thread.currentThread().interrupt(); 359 throw new NuxeoException(); 360 } 361 sleepDuration *= 2; // exponential backoff 362 sleepDuration += System.nanoTime() % 4; // random jitter 363 } 364 throw new ConcurrentUpdateException("Failed to update randomized sequence"); 365 } 366 367 /** Initial seed generation. */ 368 protected long randomInitialSeed() { 369 long seed; 370 do { 371 seed = RANDOM.nextLong(); 372 } while (seed == 0); 373 return seed; 374 } 375 376 /** Iterated version of xorshift. */ 377 protected long xorshift(long n, long times) { 378 for (long i = 0; i < times; i++) { 379 n = xorshift(n); 380 } 381 return n; 382 } 383 384 /** 385 * xorshift algorithm from George Marsaglia, with period 2^64 - 1. 386 * 387 * @see <a href="https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf">xorshift algorithm from George 388 * Marsaglia</a> 389 */ 390 protected long xorshift(long n) { 391 n ^= (n << 13); 392 n ^= (n >>> 7); 393 n ^= (n << 17); 394 return n; 395 } 396 397 @Override 398 public String generateNewId() { 399 IdType idType = repository.getIdType(); 400 if (idType == IdType.sequence || idType == IdType.sequenceHexRandomized || DBSRepositoryBase.DEBUG_UUIDS) { 401 long id = getNextSequenceId(); 402 if (DBSRepositoryBase.DEBUG_UUIDS) { 403 return "UUID_" + id; 404 } else if (idType == IdType.sequence) { 405 return String.valueOf(id); 406 } else { // idType == IdType.sequenceHexRandomized 407 // hex version filled to 16 chars 408 String hex = Long.toHexString(id); 409 int nz = 16 - hex.length(); 410 if (nz > 0) { 411 hex = "0".repeat(nz) + hex; 412 } 413 return hex; 414 } 415 } else { 416 return UUID.randomUUID().toString(); 417 } 418 } 419 420 @Override 421 public void createState(State state) { 422 Document doc = converter.stateToBson(state); 423 log.trace("MongoDB: CREATE {}: {}", doc.get(idKey), doc); 424 try { 425 insertOne(doc); 426 } catch (DuplicateKeyException dke) { 427 log.trace("MongoDB: -> DUPLICATE KEY: {}", doc.get(idKey)); 428 throw new ConcurrentUpdateException(dke); 429 } 430 } 431 432 @Override 433 public void createStates(List<State> states) { 434 List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList()); 435 log.trace("MongoDB: CREATE [{}]: {}", 436 () -> docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", ")), 437 () -> docs); 438 try { 439 insertMany(docs); 440 } catch (MongoBulkWriteException mbwe) { 441 List<String> duplicates = mbwe.getWriteErrors() 442 .stream() 443 .filter(wr -> DUPLICATE_KEY.equals(fromErrorCode(wr.getCode()))) 444 .map(BulkWriteError::getMessage) 445 .collect(Collectors.toList()); 446 // Avoid hiding any others bulk errors 447 if (duplicates.size() == mbwe.getWriteErrors().size()) { 448 log.trace("MongoDB: -> DUPLICATE KEY: {}", duplicates); 449 var concurrentUpdateException = new ConcurrentUpdateException("Concurrent update"); 450 duplicates.forEach(concurrentUpdateException::addInfo); 451 throw concurrentUpdateException; 452 } 453 454 throw mbwe; 455 } 456 } 457 458 @Override 459 public State readState(String id) { 460 return findOne(converter.filterEq(KEY_ID, id)); 461 } 462 463 @Override 464 public State readPartialState(String id, Collection<String> keys) { 465 Document fields = new Document(); 466 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 467 return findOne(converter.filterEq(KEY_ID, id), fields); 468 } 469 470 @Override 471 public List<State> readStates(List<String> ids) { 472 return findAll(converter.filterIn(KEY_ID, ids)); 473 } 474 475 @Override 476 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 477 List<Document> updates = converter.diffToBson(diff); 478 for (Document update : updates) { 479 Document filter = new Document(); 480 converter.putToBson(filter, KEY_ID, id); 481 if (changeTokenUpdater == null) { 482 log.trace("MongoDB: UPDATE {}: {}", id, update); 483 } else { 484 // assume bson is identical to dbs internals 485 // condition works even if value is null 486 Map<String, Serializable> conditions = changeTokenUpdater.getConditions(); 487 Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates(); 488 if (update.containsKey(MONGODB_SET)) { 489 ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates); 490 } else { 491 Document set = new Document(); 492 set.putAll(tokenUpdates); 493 update.put(MONGODB_SET, set); 494 } 495 log.trace("MongoDB: UPDATE {}: IF {} THEN {}", id, conditions, update); 496 filter.putAll(conditions); 497 } 498 try { 499 UpdateResult w = updateMany(filter, update); 500 if (w.getModifiedCount() != 1) { 501 log.trace("MongoDB: -> CONCURRENT UPDATE: {}", id); 502 throw new ConcurrentUpdateException(id); 503 } 504 } catch (MongoWriteException mwe) { 505 if (DUPLICATE_KEY.equals(fromErrorCode(mwe.getCode()))) { 506 log.trace("MongoDB: -> DUPLICATE KEY: {}", id); 507 throw new ConcurrentUpdateException(mwe.getError().getMessage(), mwe); 508 } 509 throw mwe; 510 } 511 } 512 } 513 514 @Override 515 public void deleteStates(Set<String> ids) { 516 Bson filter = converter.filterIn(KEY_ID, ids); 517 log.trace("MongoDB: REMOVE {}", ids); 518 DeleteResult w = deleteMany(filter); 519 if (w.getDeletedCount() != ids.size()) { 520 log.debug("Removed {} docs for {} ids: {}", w::getDeletedCount, ids::size, () -> ids); 521 } 522 } 523 524 @Override 525 public State readChildState(String parentId, String name, Set<String> ignored) { 526 Bson filter = getChildQuery(parentId, name, ignored); 527 return findOne(filter); 528 } 529 530 protected NuxeoException newQueryTimeout(MongoExecutionTimeoutException cause, Bson filter) { 531 NuxeoException exc = new NuxeoException("Query timed out after " + mongoDBRepository.maxTimeMS + " ms", cause); 532 if (filter != null) { 533 String msg; 534 if (filter instanceof Document) { 535 msg = ((Document) filter).toJson(); 536 } else { 537 msg = filter.toString(); 538 } 539 exc.addInfo("Filter: " + msg); 540 } 541 return exc; 542 } 543 544 protected void logQuery(String id, Bson fields) { 545 logQuery(converter.filterEq(KEY_ID, id), fields); 546 } 547 548 protected void logQuery(Bson filter, Bson fields) { 549 if (fields == null) { 550 log.trace("MongoDB: QUERY {}", filter); 551 } else { 552 log.trace("MongoDB: QUERY {} KEYS {}", filter, fields); 553 } 554 } 555 556 protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) { 557 if (orderBy == null) { 558 log.trace("MongoDB: QUERY {} KEYS {} OFFSET {} LIMIT {}", query, fields, offset, limit); 559 } else { 560 log.trace("MongoDB: QUERY {} KEYS {} ORDER BY {} OFFSET {} LIMIT {}", query, fields, orderBy, offset, 561 limit); 562 } 563 } 564 565 @Override 566 public boolean hasChild(String parentId, String name, Set<String> ignored) { 567 Document filter = getChildQuery(parentId, name, ignored); 568 return exists(filter); 569 } 570 571 protected Document getChildQuery(String parentId, String name, Set<String> ignored) { 572 Document filter = new Document(); 573 converter.putToBson(filter, KEY_PARENT_ID, parentId); 574 converter.putToBson(filter, KEY_NAME, name); 575 addIgnoredIds(filter, ignored); 576 return filter; 577 } 578 579 protected void addIgnoredIds(Document filter, Set<String> ignored) { 580 if (!ignored.isEmpty()) { 581 Document notInIds = new Document(MongoDBOperators.NIN, converter.listToBson(KEY_ID, ignored)); 582 filter.put(idKey, notInIds); 583 } 584 } 585 586 @Override 587 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 588 Document filter = new Document(); 589 converter.putToBson(filter, key, value); 590 addIgnoredIds(filter, ignored); 591 return findAll(filter); 592 } 593 594 @Override 595 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 596 Document filter = new Document(); 597 converter.putToBson(filter, key1, value1); 598 converter.putToBson(filter, key2, value2); 599 addIgnoredIds(filter, ignored); 600 return findAll(filter); 601 } 602 603 @Override 604 public List<State> queryKeyValueWithOperator(String key1, Object value1, String key2, DBSQueryOperator operator, 605 Object value2, Set<String> ignored) { 606 Map<String, Object> comparatorAndValue; 607 switch (operator) { 608 case IN: 609 comparatorAndValue = Map.of(MongoDBOperators.IN, value2); 610 break; 611 case NOT_IN: 612 comparatorAndValue = Map.of(MongoDBOperators.NIN, value2); 613 break; 614 default: 615 throw new IllegalArgumentException(String.format("Unknown operator: %s", operator)); 616 } 617 Document filter = new Document(); 618 converter.putToBson(filter, key1, value1); 619 converter.putToBson(filter, key2, comparatorAndValue); 620 addIgnoredIds(filter, ignored); 621 return findAll(filter); 622 } 623 624 @Override 625 public Stream<State> getDescendants(String rootId, Set<String> keys) { 626 return getDescendants(rootId, keys, 0); 627 } 628 629 @Override 630 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 631 Bson filter = converter.filterEq(KEY_ANCESTOR_IDS, rootId); 632 Document fields = new Document(); 633 if (useCustomId) { 634 fields.put(MONGODB_ID, ZERO); 635 } 636 fields.put(idKey, ONE); 637 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 638 return stream(filter, fields, limit); 639 } 640 641 @Override 642 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 643 Document filter = new Document(); 644 converter.putToBson(filter, key, value); 645 addIgnoredIds(filter, ignored); 646 return exists(filter); 647 } 648 649 protected boolean exists(Bson filter) { 650 return exists(filter, justPresenceField()); 651 } 652 653 protected boolean exists(Bson filter, Bson projection) { 654 logQuery(filter, projection); 655 try { 656 return find(filter).projection(projection).first() != null; 657 } catch (MongoExecutionTimeoutException e) { 658 throw newQueryTimeout(e, filter); 659 } 660 } 661 662 protected State findOne(Bson filter) { 663 return findOne(filter, null); 664 } 665 666 protected State findOne(Bson filter, Bson projection) { 667 try (Stream<State> stream = stream(filter, projection)) { 668 return stream.findAny().orElse(null); 669 } 670 } 671 672 protected List<State> findAll(Bson filter) { 673 try (Stream<State> stream = stream(filter)) { 674 return stream.collect(Collectors.toList()); 675 } catch (MongoExecutionTimeoutException e) { 676 throw newQueryTimeout(e, filter); 677 } 678 } 679 680 protected Stream<State> stream(Bson filter) { 681 return stream(filter, null, 0); 682 } 683 684 protected Stream<State> stream(Bson filter, Bson projection) { 685 return stream(filter, projection, 0); 686 } 687 688 /** 689 * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}. 690 * <p> 691 * We should rely on this method, because it correctly handles cursor closed state. 692 * <p> 693 * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case 694 * on some call to {@link MongoIterable#first()}. 695 * 696 * @return a closeable {@link Stream} instance linked to {@link MongoCursor} 697 */ 698 protected Stream<State> stream(Bson filter, Bson projection, int limit) { 699 if (filter == null) { 700 // empty filter 701 filter = new Document(); 702 } 703 // it's ok if projection is null 704 logQuery(filter, projection); 705 706 boolean completedAbruptly = true; 707 MongoCursor<Document> cursor = find(filter).limit(limit).projection(projection).iterator(); 708 try { 709 Set<Object> seen = new HashSet<>(); 710 Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) // 711 .onClose(cursor::close) 712 .filter(doc -> seen.add(doc.get(idKey))) 713 // MongoDB cursors may return the same 714 // object several times 715 .map(converter::bsonToState); 716 // the stream takes responsibility for closing the session 717 completedAbruptly = false; 718 return stream; 719 } catch (MongoExecutionTimeoutException e) { 720 throw newQueryTimeout(e, filter); // NOSONAR (cursor is not leaked) 721 } finally { 722 if (completedAbruptly) { 723 cursor.close(); 724 } 725 } 726 } 727 728 protected Document justPresenceField() { 729 return new Document(MONGODB_ID, ONE); 730 } 731 732 @Override 733 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 734 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 735 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 736 MongoDBRepositoryQueryBuilder builder = new MongoDBRepositoryQueryBuilder((MongoDBRepository) repository, 737 evaluator.getExpression(), evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, 738 evaluator.fulltextSearchDisabled); 739 builder.walk(); 740 if (builder.hasFulltext && repository.isFulltextSearchDisabled()) { 741 throw new QueryParseException("Fulltext search disabled by configuration"); 742 } 743 Document filter = builder.getQuery(); 744 addPrincipals(filter, evaluator.principals); 745 Bson orderBy = builder.getOrderBy(); 746 Bson keys = builder.getProjection(); 747 // Don't do manual projection if there are no projection wildcards, as this brings no new 748 // information and is costly. The only difference is several identical rows instead of one. 749 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 750 if (manualProjection) { 751 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 752 // so we need the full state from the database 753 keys = null; 754 evaluator.parse(); 755 } 756 757 logQuery(filter, keys, orderBy, limit, offset); 758 759 List<Map<String, Serializable>> projections; 760 long totalSize; 761 try (MongoCursor<Document> cursor = find(filter).projection(keys) 762 .skip(offset) 763 .limit(limit) 764 .sort(orderBy) 765 .iterator()) { 766 projections = new ArrayList<>(); 767 DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys); 768 Iterable<Document> docs = () -> cursor; 769 for (Document doc : docs) { 770 State state = converter.bsonToState(doc); 771 if (manualProjection) { 772 projections.addAll(evaluator.matches(state)); 773 } else { 774 projections.add(flattener.flatten(state)); 775 } 776 } 777 } catch (MongoExecutionTimeoutException e) { 778 throw newQueryTimeout(e, filter); 779 } 780 if (countUpTo == -1) { 781 // count full size 782 if (limit == 0) { 783 totalSize = projections.size(); 784 } else if (manualProjection) { 785 totalSize = -1; // unknown due to manual projection 786 } else { 787 totalSize = countDocuments(filter); 788 } 789 } else if (countUpTo == 0) { 790 // no count 791 totalSize = -1; // not counted 792 } else { 793 // count only if less than countUpTo 794 if (limit == 0) { 795 totalSize = projections.size(); 796 } else if (manualProjection) { 797 totalSize = -1; // unknown due to manual projection 798 } else { 799 totalSize = countDocuments(filter, new CountOptions().limit(countUpTo + 1)); 800 } 801 if (totalSize > countUpTo) { 802 totalSize = -2; // truncated 803 } 804 } 805 if (!projections.isEmpty()) { 806 log.trace("MongoDB: -> {}", projections::size); 807 } 808 return new PartialList<>(projections, totalSize); 809 } 810 811 @SuppressWarnings("resource") // cursor is being registered, must not be closed 812 @Override 813 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 814 MongoDBCursorService cursorService = mongoDBRepository.getCursorService(); 815 cursorService.checkForTimedOutScroll(); 816 MongoDBRepositoryQueryBuilder builder = new MongoDBRepositoryQueryBuilder((MongoDBRepository) repository, 817 evaluator.getExpression(), evaluator.getSelectClause(), null, evaluator.pathResolver, 818 evaluator.fulltextSearchDisabled); 819 builder.walk(); 820 if (builder.hasFulltext && repository.isFulltextSearchDisabled()) { 821 throw new QueryParseException("Fulltext search disabled by configuration"); 822 } 823 Document filter = builder.getQuery(); 824 addPrincipals(filter, evaluator.principals); 825 Bson keys = builder.getProjection(); 826 logQuery(filter, keys, null, 0, 0); 827 828 MongoCursor<Document> cursor; 829 try { 830 cursor = find(filter).projection(keys).batchSize(batchSize).iterator(); 831 cursor.hasNext(); // check timeout asap - NOSONAR 832 } catch (MongoExecutionTimeoutException e) { 833 throw newQueryTimeout(e, filter); 834 } 835 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 836 return scroll(scrollId); 837 } 838 839 @Override 840 public ScrollResult<String> scroll(String scrollId) { 841 MongoDBCursorService cursorService = mongoDBRepository.getCursorService(); 842 try { 843 return cursorService.scroll(scrollId); 844 } catch (MongoExecutionTimeoutException e) { 845 throw newQueryTimeout(e, null); 846 } 847 } 848 849 protected void addPrincipals(Document query, Set<String> principals) { 850 if (principals != null) { 851 Document inPrincipals = new Document(MongoDBOperators.IN, new ArrayList<>(principals)); 852 query.put(KEY_READ_ACL, inPrincipals); 853 } 854 } 855 856 protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED); 857 858 protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER), 859 Updates.unset(KEY_LOCK_CREATED)); 860 861 @Override 862 public Lock getLock(String id) { 863 logQuery(id, LOCK_FIELDS); 864 // we do NOT want to use clientSession here because locks must be non-transactional 865 Document res = coll.find(converter.filterEq(KEY_ID, id)).projection(LOCK_FIELDS).first(); 866 if (res == null) { 867 // document not found 868 throw new DocumentNotFoundException(id); 869 } 870 String owner = res.getString(KEY_LOCK_OWNER); 871 if (owner == null) { 872 // not locked 873 return null; 874 } 875 Calendar created = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, res.get(KEY_LOCK_CREATED)); 876 return new Lock(owner, created); 877 } 878 879 @Override 880 public Lock setLock(String id, Lock lock) { 881 Bson filter = Filters.and( // 882 converter.filterEq(KEY_ID, id), // 883 Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set 884 ); 885 Bson setLock = Updates.combine( // 886 Updates.set(KEY_LOCK_OWNER, lock.getOwner()), // 887 Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(KEY_LOCK_CREATED, lock.getCreated())) // 888 ); 889 log.trace("MongoDB: FINDANDMODIFY {} UPDATE {}", filter, setLock); 890 // we do NOT want to use clientSession here because locks must be non-transactional 891 Document res = coll.findOneAndUpdate(filter, setLock); 892 if (res != null) { 893 // found a doc to lock 894 return null; 895 } else { 896 // doc not found, or lock owner already set 897 // get the old lock 898 logQuery(id, LOCK_FIELDS); 899 // we do NOT want to use clientSession here because locks must be non-transactional 900 Document old = coll.find(converter.filterEq(KEY_ID, id)).projection(LOCK_FIELDS).first(); 901 if (old == null) { 902 // document not found 903 throw new DocumentNotFoundException(id); 904 } 905 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 906 Calendar oldCreated = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, old.get(KEY_LOCK_CREATED)); 907 if (oldOwner != null) { 908 return new Lock(oldOwner, oldCreated); 909 } 910 // no lock -- there was a race condition 911 // TODO do better 912 throw new ConcurrentUpdateException("Lock " + id); 913 } 914 } 915 916 @Override 917 public Lock removeLock(String id, String owner) { 918 Document filter = new Document(); 919 converter.putToBson(filter, KEY_ID, id); 920 if (owner != null) { 921 // remove if owner matches or null 922 // implements LockManager.canLockBeRemoved inside MongoDB 923 Object ownerOrNull = Arrays.asList(owner, null); 924 filter.put(KEY_LOCK_OWNER, new Document(MongoDBOperators.IN, ownerOrNull)); 925 } 926 // else unconditional remove 927 // remove the lock 928 log.trace("MongoDB: FINDANDMODIFY {} UPDATE {}", filter, UNSET_LOCK_UPDATE); 929 // we do NOT want to use clientSession here because locks must be non-transactional 930 Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE); 931 if (old != null) { 932 // found a doc and removed the lock, return previous lock 933 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 934 if (oldOwner == null) { 935 // was not locked 936 return null; 937 } else { 938 // return previous lock 939 var oldCreated = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, old.get(KEY_LOCK_CREATED)); 940 return new Lock(oldOwner, oldCreated); 941 } 942 } else { 943 // doc not found, or lock owner didn't match 944 // get the old lock 945 logQuery(id, LOCK_FIELDS); 946 // we do NOT want to use clientSession here because locks must be non-transactional 947 old = coll.find(converter.filterEq(KEY_ID, id)).projection(LOCK_FIELDS).first(); 948 if (old == null) { 949 // document not found 950 throw new DocumentNotFoundException(id); 951 } 952 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 953 Calendar oldCreated = (Calendar) converter.bsonToSerializable(KEY_LOCK_CREATED, old.get(KEY_LOCK_CREATED)); 954 if (oldOwner != null) { 955 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 956 // existing mismatched lock, flag failure 957 return new Lock(oldOwner, oldCreated, true); 958 } 959 // old owner should have matched -- there was a race condition 960 // TODO do better 961 throw new ConcurrentUpdateException("Unlock " + id); 962 } 963 // old owner null, should have matched -- there was a race condition 964 // TODO do better 965 throw new ConcurrentUpdateException("Unlock " + id); 966 } 967 } 968 969 protected void insertOne(Document document) { 970 if (transactionStarted) { 971 coll.insertOne(clientSession, document); 972 } else { 973 coll.insertOne(document); 974 } 975 } 976 977 protected void insertMany(List<Document> documents) { 978 if (transactionStarted) { 979 coll.insertMany(clientSession, documents); 980 } else { 981 coll.insertMany(documents); 982 } 983 } 984 985 protected UpdateResult updateMany(Bson filter, Bson update) { 986 if (transactionStarted) { 987 return coll.updateMany(clientSession, filter, update); 988 } else { 989 return coll.updateMany(filter, update); 990 } 991 } 992 993 protected DeleteResult deleteMany(Bson filter) { 994 if (transactionStarted) { 995 return coll.deleteMany(clientSession, filter); 996 } else { 997 return coll.deleteMany(filter); 998 } 999 } 1000 1001 protected FindIterable<Document> find(Bson filter) { 1002 FindIterable<Document> it; 1003 if (transactionStarted) { 1004 it = coll.find(clientSession, filter); 1005 } else { 1006 it = coll.find(filter); 1007 } 1008 it.maxTime(mongoDBRepository.maxTimeMS, MILLISECONDS); 1009 return it; 1010 } 1011 1012 protected long countDocuments(Bson filter) { 1013 return countDocuments(filter, new CountOptions()); 1014 } 1015 1016 protected long countDocuments(Bson filter, CountOptions options) { 1017 options.maxTime(mongoDBRepository.maxTimeMS, MILLISECONDS); 1018 if (transactionStarted) { 1019 return coll.countDocuments(clientSession, filter, options); 1020 } else { 1021 return coll.countDocuments(filter, options); 1022 } 1023 } 1024 1025}