001/* 002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_TRASHED; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 042 043import java.io.Serializable; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Calendar; 047import java.util.Collection; 048import java.util.Collections; 049import java.util.HashSet; 050import java.util.List; 051import java.util.Map; 052import java.util.Set; 053import java.util.Spliterators; 054import java.util.UUID; 055import java.util.stream.Collectors; 056import java.util.stream.Stream; 057import java.util.stream.StreamSupport; 058 059import javax.resource.spi.ConnectionManager; 060 061import org.apache.commons.lang3.StringUtils; 062import org.apache.commons.logging.Log; 063import org.apache.commons.logging.LogFactory; 064import org.bson.Document; 065import org.bson.conversions.Bson; 066import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 067import org.nuxeo.ecm.core.api.CursorService; 068import org.nuxeo.ecm.core.api.DocumentNotFoundException; 069import org.nuxeo.ecm.core.api.Lock; 070import org.nuxeo.ecm.core.api.NuxeoException; 071import org.nuxeo.ecm.core.api.PartialList; 072import org.nuxeo.ecm.core.api.ScrollResult; 073import org.nuxeo.ecm.core.blob.DocumentBlobManager; 074import org.nuxeo.ecm.core.model.LockManager; 075import org.nuxeo.ecm.core.model.Repository; 076import org.nuxeo.ecm.core.query.QueryParseException; 077import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 078import org.nuxeo.ecm.core.storage.State; 079import org.nuxeo.ecm.core.storage.State.StateDiff; 080import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 081import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 082import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 083import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 084import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 085import org.nuxeo.runtime.api.Framework; 086import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 087 088import com.mongodb.Block; 089import com.mongodb.QueryOperators; 090import com.mongodb.client.FindIterable; 091import com.mongodb.client.MongoCollection; 092import com.mongodb.client.MongoCursor; 093import com.mongodb.client.MongoDatabase; 094import com.mongodb.client.MongoIterable; 095import com.mongodb.client.model.CountOptions; 096import com.mongodb.client.model.Filters; 097import com.mongodb.client.model.FindOneAndUpdateOptions; 098import com.mongodb.client.model.IndexOptions; 099import com.mongodb.client.model.Indexes; 100import com.mongodb.client.model.Projections; 101import com.mongodb.client.model.ReturnDocument; 102import com.mongodb.client.model.Updates; 103import com.mongodb.client.result.DeleteResult; 104import com.mongodb.client.result.UpdateResult; 105 106/** 107 * MongoDB implementation of a {@link Repository}. 108 * 109 * @since 5.9.4 110 */ 111public class MongoDBRepository extends DBSRepositoryBase { 112 113 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 114 115 /** 116 * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}. 117 * <p /> 118 * The connection id will be {@code repository/[REPOSITORY_NAME]}. 119 */ 120 public static final String REPOSITORY_CONNECTION_PREFIX = "repository/"; 121 122 public static final Long LONG_ZERO = Long.valueOf(0); 123 124 public static final Double ZERO = Double.valueOf(0); 125 126 public static final Double ONE = Double.valueOf(1); 127 128 public static final String MONGODB_ID = "_id"; 129 130 public static final String MONGODB_INC = "$inc"; 131 132 public static final String MONGODB_SET = "$set"; 133 134 public static final String MONGODB_UNSET = "$unset"; 135 136 public static final String MONGODB_PUSH = "$push"; 137 138 public static final String MONGODB_EACH = "$each"; 139 140 public static final String MONGODB_META = "$meta"; 141 142 public static final String MONGODB_TEXT_SCORE = "textScore"; 143 144 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 145 146 private static final String LANGUAGE_FIELD = "__language"; 147 148 protected static final String COUNTER_NAME_UUID = "ecm:id"; 149 150 protected static final String COUNTER_FIELD = "seq"; 151 152 protected final MongoCollection<Document> coll; 153 154 protected final MongoCollection<Document> countersColl; 155 156 /** The key to use to store the id in the database. */ 157 protected String idKey; 158 159 /** True if we don't use MongoDB's native "_id" key to store the id. */ 160 protected boolean useCustomId; 161 162 /** Number of values still available in the in-memory sequence. */ 163 protected long sequenceLeft; 164 165 /** Last value used from the in-memory sequence. */ 166 protected long sequenceLastValue; 167 168 /** Sequence allocation block size. */ 169 protected long sequenceBlockSize; 170 171 protected final MongoDBConverter converter; 172 173 protected final CursorService<MongoCursor<Document>, Document, String> cursorService; 174 175 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 176 super(cm, descriptor.name, descriptor); 177 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 178 // prefix with repository/ to group repository connection 179 MongoDatabase database = mongoService.getDatabase(REPOSITORY_CONNECTION_PREFIX + descriptor.name); 180 coll = database.getCollection(descriptor.name); 181 countersColl = database.getCollection(descriptor.name + ".counters"); 182 if (Boolean.TRUE.equals(descriptor.nativeId)) { 183 idKey = MONGODB_ID; 184 } else { 185 idKey = KEY_ID; 186 } 187 useCustomId = KEY_ID.equals(idKey); 188 if (idType == IdType.sequence || DEBUG_UUIDS) { 189 Integer sbs = descriptor.sequenceBlockSize; 190 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 191 sequenceLeft = 0; 192 } 193 converter = new MongoDBConverter(idKey); 194 cursorService = new CursorService<>(ob -> (String) ob.get(converter.keyToBson(KEY_ID))); 195 initRepository(); 196 } 197 198 @Override 199 public List<IdType> getAllowedIdTypes() { 200 return Arrays.asList(IdType.varchar, IdType.sequence); 201 } 202 203 @Override 204 public void shutdown() { 205 super.shutdown(); 206 cursorService.clear(); 207 } 208 209 protected void initRepository() { 210 // create required indexes 211 // code does explicit queries on those 212 if (useCustomId) { 213 coll.createIndex(Indexes.ascending(idKey)); 214 } 215 coll.createIndex(Indexes.ascending(KEY_PARENT_ID)); 216 coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS)); 217 coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID)); 218 coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID)); 219 coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID)); 220 coll.createIndex(Indexes.ascending(KEY_READ_ACL)); 221 coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME)); 222 // often used in user-generated queries 223 coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE)); 224 coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE)); 225 coll.createIndex(Indexes.ascending(KEY_IS_TRASHED)); 226 coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID)); 227 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER)); 228 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS)); 229 // TODO configure these from somewhere else 230 coll.createIndex(Indexes.descending("dc:modified")); 231 coll.createIndex(Indexes.ascending("rend:renditionName")); 232 coll.createIndex(Indexes.ascending("drv:subscriptions.enabled")); 233 coll.createIndex(Indexes.ascending("collectionMember:collectionIds")); 234 coll.createIndex(Indexes.ascending("nxtag:tags")); 235 if (!isFulltextDisabled()) { 236 Bson indexKeys = Indexes.compoundIndex( // 237 Indexes.text(KEY_FULLTEXT_SIMPLE), // 238 Indexes.text(KEY_FULLTEXT_BINARY) // 239 ); 240 IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD); 241 coll.createIndex(indexKeys, indexOptions); 242 } 243 // check root presence 244 if (coll.count(Filters.eq(idKey, getRootId())) > 0) { 245 return; 246 } 247 // create basic repository structure needed 248 if (idType == IdType.sequence || DEBUG_UUIDS) { 249 // create the id counter 250 Document idCounter = new Document(); 251 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 252 idCounter.put(COUNTER_FIELD, LONG_ZERO); 253 countersColl.insertOne(idCounter); 254 } 255 initRoot(); 256 } 257 258 protected synchronized Long getNextSequenceId() { 259 if (sequenceLeft == 0) { 260 // allocate a new sequence block 261 // the database contains the last value from the last block 262 Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID); 263 Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)); 264 Document idCounter = countersColl.findOneAndUpdate(filter, update, 265 new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)); 266 if (idCounter == null) { 267 throw new NuxeoException("Repository id counter not initialized"); 268 } 269 sequenceLeft = sequenceBlockSize; 270 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 271 } 272 sequenceLeft--; 273 sequenceLastValue++; 274 return Long.valueOf(sequenceLastValue); 275 } 276 277 @Override 278 public String generateNewId() { 279 if (idType == IdType.sequence || DEBUG_UUIDS) { 280 Long id = getNextSequenceId(); 281 if (DEBUG_UUIDS) { 282 return "UUID_" + id; 283 } 284 return id.toString(); 285 } else { 286 return UUID.randomUUID().toString(); 287 } 288 } 289 290 @Override 291 public void createState(State state) { 292 Document doc = converter.stateToBson(state); 293 if (log.isTraceEnabled()) { 294 log.trace("MongoDB: CREATE " + doc.get(idKey) + ": " + doc); 295 } 296 coll.insertOne(doc); 297 // TODO dupe exception 298 // throw new DocumentException("Already exists: " + id); 299 } 300 301 @Override 302 public void createStates(List<State> states) { 303 List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList()); 304 if (log.isTraceEnabled()) { 305 log.trace("MongoDB: CREATE [" 306 + docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", ")) 307 + "]: " + docs); 308 } 309 coll.insertMany(docs); 310 } 311 312 @Override 313 public State readState(String id) { 314 return findOne(Filters.eq(idKey, id)); 315 } 316 317 @Override 318 public State readPartialState(String id, Collection<String> keys) { 319 Document fields = new Document(); 320 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 321 return findOne(Filters.eq(idKey, id), fields); 322 } 323 324 @Override 325 public List<State> readStates(List<String> ids) { 326 return findAll(Filters.in(idKey, ids)); 327 } 328 329 @Override 330 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 331 List<Document> updates = converter.diffToBson(diff); 332 for (Document update : updates) { 333 Document filter = new Document(idKey, id); 334 if (changeTokenUpdater == null) { 335 if (log.isTraceEnabled()) { 336 log.trace("MongoDB: UPDATE " + id + ": " + update); 337 } 338 } else { 339 // assume bson is identical to dbs internals 340 // condition works even if value is null 341 Map<String, Serializable> conditions = changeTokenUpdater.getConditions(); 342 Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates(); 343 if (update.containsKey(MONGODB_SET)) { 344 ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates); 345 } else { 346 Document set = new Document(); 347 set.putAll(tokenUpdates); 348 update.put(MONGODB_SET, set); 349 } 350 if (log.isTraceEnabled()) { 351 log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update); 352 } 353 filter.putAll(conditions); 354 } 355 UpdateResult w = coll.updateMany(filter, update); 356 if (w.getModifiedCount() != 1) { 357 log.trace("MongoDB: -> CONCURRENT UPDATE: " + id); 358 throw new ConcurrentUpdateException(id); 359 } 360 // TODO dupe exception 361 // throw new DocumentException("Missing: " + id); 362 } 363 } 364 365 @Override 366 public void deleteStates(Set<String> ids) { 367 Bson filter = Filters.in(idKey, ids); 368 if (log.isTraceEnabled()) { 369 log.trace("MongoDB: REMOVE " + ids); 370 } 371 DeleteResult w = coll.deleteMany(filter); 372 if (w.getDeletedCount() != ids.size()) { 373 if (log.isDebugEnabled()) { 374 log.debug("Removed " + w.getDeletedCount() + " docs for " + ids.size() + " ids: " + ids); 375 } 376 } 377 } 378 379 @Override 380 public State readChildState(String parentId, String name, Set<String> ignored) { 381 Bson filter = getChildQuery(parentId, name, ignored); 382 return findOne(filter); 383 } 384 385 protected void logQuery(String id, Bson fields) { 386 logQuery(Filters.eq(idKey, id), fields); 387 } 388 389 protected void logQuery(Bson filter, Bson fields) { 390 if (fields == null) { 391 log.trace("MongoDB: QUERY " + filter); 392 } else { 393 log.trace("MongoDB: QUERY " + filter + " KEYS " + fields); 394 } 395 } 396 397 protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) { 398 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 399 + " OFFSET " + offset + " LIMIT " + limit); 400 } 401 402 @Override 403 public boolean hasChild(String parentId, String name, Set<String> ignored) { 404 Document filter = getChildQuery(parentId, name, ignored); 405 return exists(filter); 406 } 407 408 protected Document getChildQuery(String parentId, String name, Set<String> ignored) { 409 Document filter = new Document(); 410 filter.put(KEY_PARENT_ID, parentId); 411 filter.put(KEY_NAME, name); 412 addIgnoredIds(filter, ignored); 413 return filter; 414 } 415 416 protected void addIgnoredIds(Document filter, Set<String> ignored) { 417 if (!ignored.isEmpty()) { 418 Document notInIds = new Document(QueryOperators.NIN, new ArrayList<>(ignored)); 419 filter.put(idKey, notInIds); 420 } 421 } 422 423 @Override 424 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 425 Document filter = new Document(converter.keyToBson(key), value); 426 addIgnoredIds(filter, ignored); 427 return findAll(filter); 428 } 429 430 @Override 431 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 432 Document filter = new Document(converter.keyToBson(key1), value1); 433 filter.put(converter.keyToBson(key2), value2); 434 addIgnoredIds(filter, ignored); 435 return findAll(filter); 436 } 437 438 @Override 439 public Stream<State> getDescendants(String rootId, Set<String> keys) { 440 return getDescendants(rootId, keys, 0); 441 } 442 443 @Override 444 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 445 Bson filter = Filters.eq(KEY_ANCESTOR_IDS, rootId); 446 Document fields = new Document(); 447 if (useCustomId) { 448 fields.put(MONGODB_ID, ZERO); 449 } 450 fields.put(idKey, ONE); 451 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 452 return stream(filter, fields, limit); 453 } 454 455 @Override 456 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 457 Document filter = new Document(converter.keyToBson(key), value); 458 addIgnoredIds(filter, ignored); 459 return exists(filter); 460 } 461 462 protected boolean exists(Bson filter) { 463 return exists(filter, justPresenceField()); 464 } 465 466 protected boolean exists(Bson filter, Bson projection) { 467 if (log.isTraceEnabled()) { 468 logQuery(filter, projection); 469 } 470 return coll.find(filter).projection(projection).first() != null; 471 } 472 473 protected State findOne(Bson filter) { 474 return findOne(filter, null); 475 } 476 477 protected State findOne(Bson filter, Bson projection) { 478 try (Stream<State> stream = stream(filter, projection)) { 479 return stream.findAny().orElse(null); 480 } 481 } 482 483 protected List<State> findAll(Bson filter) { 484 try (Stream<State> stream = stream(filter)) { 485 return stream.collect(Collectors.toList()); 486 } 487 } 488 489 protected Stream<State> stream(Bson filter) { 490 return stream(filter, null, 0); 491 } 492 493 protected Stream<State> stream(Bson filter, Bson projection) { 494 return stream(filter, projection, 0); 495 } 496 497 /** 498 * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}. 499 * <p /> 500 * We should rely on this method, because it correctly handles cursor closed state. 501 * <p /> 502 * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case 503 * on some call to {@link MongoIterable#first()}. 504 * 505 * @return a closeable {@link Stream} instance linked to {@link MongoCursor} 506 */ 507 protected Stream<State> stream(Bson filter, Bson projection, int limit) { 508 if (filter == null) { 509 // empty filter 510 filter = new Document(); 511 } 512 // it's ok if projection is null 513 if (log.isTraceEnabled()) { 514 logQuery(filter, projection); 515 } 516 517 boolean completedAbruptly = true; 518 MongoCursor<Document> cursor = coll.find(filter).limit(limit).projection(projection).iterator(); 519 try { 520 Set<String> seen = new HashSet<>(); 521 Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) // 522 .onClose(cursor::close) 523 .filter(doc -> seen.add(doc.getString(idKey))) 524 // MongoDB cursors may return the same 525 // object several times 526 .map(converter::bsonToState); 527 // the stream takes responsibility for closing the session 528 completedAbruptly = false; 529 return stream; 530 } finally { 531 if (completedAbruptly) { 532 cursor.close(); 533 } 534 } 535 } 536 537 protected Document justPresenceField() { 538 return new Document(MONGODB_ID, ONE); 539 } 540 541 @Override 542 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 543 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 544 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 545 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 546 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 547 builder.walk(); 548 if (builder.hasFulltext && isFulltextDisabled()) { 549 throw new QueryParseException("Fulltext search disabled by configuration"); 550 } 551 Document filter = builder.getQuery(); 552 addPrincipals(filter, evaluator.principals); 553 Bson orderBy = builder.getOrderBy(); 554 Bson keys = builder.getProjection(); 555 // Don't do manual projection if there are no projection wildcards, as this brings no new 556 // information and is costly. The only difference is several identical rows instead of one. 557 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 558 if (manualProjection) { 559 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 560 // so we need the full state from the database 561 keys = null; 562 evaluator.parse(); 563 } 564 565 if (log.isTraceEnabled()) { 566 logQuery(filter, keys, orderBy, limit, offset); 567 } 568 569 List<Map<String, Serializable>> projections; 570 long totalSize; 571 try (MongoCursor<Document> cursor = coll.find(filter) 572 .projection(keys) 573 .skip(offset) 574 .limit(limit) 575 .sort(orderBy) 576 .iterator()) { 577 projections = new ArrayList<>(); 578 DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys); 579 Iterable<Document> docs = () -> cursor; 580 for (Document doc : docs) { 581 State state = converter.bsonToState(doc); 582 if (manualProjection) { 583 projections.addAll(evaluator.matches(state)); 584 } else { 585 projections.add(flattener.flatten(state)); 586 } 587 } 588 } 589 if (countUpTo == -1) { 590 // count full size 591 if (limit == 0) { 592 totalSize = projections.size(); 593 } else if (manualProjection) { 594 totalSize = -1; // unknown due to manual projection 595 } else { 596 totalSize = coll.count(filter); 597 } 598 } else if (countUpTo == 0) { 599 // no count 600 totalSize = -1; // not counted 601 } else { 602 // count only if less than countUpTo 603 if (limit == 0) { 604 totalSize = projections.size(); 605 } else if (manualProjection) { 606 totalSize = -1; // unknown due to manual projection 607 } else { 608 totalSize = coll.count(filter, new CountOptions().limit(countUpTo + 1)); 609 } 610 if (totalSize > countUpTo) { 611 totalSize = -2; // truncated 612 } 613 } 614 if (log.isTraceEnabled() && projections.size() != 0) { 615 log.trace("MongoDB: -> " + projections.size()); 616 } 617 return new PartialList<>(projections, totalSize); 618 } 619 620 @Override 621 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 622 cursorService.checkForTimedOutScroll(); 623 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 624 evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 625 builder.walk(); 626 if (builder.hasFulltext && isFulltextDisabled()) { 627 throw new QueryParseException("Fulltext search disabled by configuration"); 628 } 629 Bson filter = builder.getQuery(); 630 Bson keys = builder.getProjection(); 631 if (log.isTraceEnabled()) { 632 logQuery(filter, keys, null, 0, 0); 633 } 634 635 MongoCursor<Document> cursor = coll.find(filter).projection(keys).batchSize(batchSize).iterator(); 636 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 637 return scroll(scrollId); 638 } 639 640 @Override 641 public ScrollResult<String> scroll(String scrollId) { 642 return cursorService.scroll(scrollId); 643 } 644 645 protected void addPrincipals(Document query, Set<String> principals) { 646 if (principals != null) { 647 Document inPrincipals = new Document(QueryOperators.IN, new ArrayList<>(principals)); 648 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 649 } 650 } 651 652 /** Keys used for document projection when marking all binaries for GC. */ 653 protected Bson binaryKeys; 654 655 @Override 656 protected void initBlobsPaths() { 657 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 658 finder.visit(); 659 binaryKeys = Projections.fields(finder.binaryKeys); 660 } 661 662 protected static class MongoDBBlobFinder extends BlobFinder { 663 protected List<Bson> binaryKeys = new ArrayList<>(Collections.singleton(Projections.excludeId())); 664 665 @Override 666 protected void recordBlobPath() { 667 path.addLast(KEY_BLOB_DATA); 668 binaryKeys.add(Projections.include(StringUtils.join(path, "."))); 669 path.removeLast(); 670 } 671 } 672 673 @Override 674 public void markReferencedBinaries() { 675 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 676 // TODO add a query to not scan all documents 677 if (log.isTraceEnabled()) { 678 logQuery(new Document(), binaryKeys); 679 } 680 Block<Document> block = doc -> markReferencedBinaries(doc, blobManager); 681 coll.find().projection(binaryKeys).forEach(block); 682 } 683 684 protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) { 685 for (String key : ob.keySet()) { 686 Object value = ob.get(key); 687 if (value instanceof List) { 688 @SuppressWarnings("unchecked") 689 List<Object> list = (List<Object>) value; 690 for (Object v : list) { 691 if (v instanceof Document) { 692 markReferencedBinaries((Document) v, blobManager); 693 } else { 694 markReferencedBinary(v, blobManager); 695 } 696 } 697 } else if (value instanceof Object[]) { 698 for (Object v : (Object[]) value) { 699 markReferencedBinary(v, blobManager); 700 } 701 } else if (value instanceof Document) { 702 markReferencedBinaries((Document) value, blobManager); 703 } else { 704 markReferencedBinary(value, blobManager); 705 } 706 } 707 } 708 709 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 710 if (!(value instanceof String)) { 711 return; 712 } 713 String key = (String) value; 714 blobManager.markReferencedBinary(key, repositoryName); 715 } 716 717 protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED); 718 719 protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER), 720 Updates.unset(KEY_LOCK_CREATED)); 721 722 @Override 723 public Lock getLock(String id) { 724 if (log.isTraceEnabled()) { 725 logQuery(id, LOCK_FIELDS); 726 } 727 Document res = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 728 if (res == null) { 729 // document not found 730 throw new DocumentNotFoundException(id); 731 } 732 String owner = res.getString(KEY_LOCK_OWNER); 733 if (owner == null) { 734 // not locked 735 return null; 736 } 737 Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED)); 738 return new Lock(owner, created); 739 } 740 741 @Override 742 public Lock setLock(String id, Lock lock) { 743 Bson filter = Filters.and( // 744 Filters.eq(idKey, id), // 745 Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set 746 ); 747 Bson setLock = Updates.combine( // 748 Updates.set(KEY_LOCK_OWNER, lock.getOwner()), // 749 Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())) // 750 ); 751 if (log.isTraceEnabled()) { 752 log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + setLock); 753 } 754 Document res = coll.findOneAndUpdate(filter, setLock); 755 if (res != null) { 756 // found a doc to lock 757 return null; 758 } else { 759 // doc not found, or lock owner already set 760 // get the old lock 761 if (log.isTraceEnabled()) { 762 logQuery(id, LOCK_FIELDS); 763 } 764 Document old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 765 if (old == null) { 766 // document not found 767 throw new DocumentNotFoundException(id); 768 } 769 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 770 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 771 if (oldOwner != null) { 772 return new Lock(oldOwner, oldCreated); 773 } 774 // no lock -- there was a race condition 775 // TODO do better 776 throw new ConcurrentUpdateException("Lock " + id); 777 } 778 } 779 780 @Override 781 public Lock removeLock(String id, String owner) { 782 Document filter = new Document(idKey, id); 783 if (owner != null) { 784 // remove if owner matches or null 785 // implements LockManager.canLockBeRemoved inside MongoDB 786 Object ownerOrNull = Arrays.asList(owner, null); 787 filter.put(KEY_LOCK_OWNER, new Document(QueryOperators.IN, ownerOrNull)); 788 } // else unconditional remove 789 // remove the lock 790 if (log.isTraceEnabled()) { 791 log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + UNSET_LOCK_UPDATE); 792 } 793 Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE); 794 if (old != null) { 795 // found a doc and removed the lock, return previous lock 796 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 797 if (oldOwner == null) { 798 // was not locked 799 return null; 800 } else { 801 // return previous lock 802 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 803 return new Lock(oldOwner, oldCreated); 804 } 805 } else { 806 // doc not found, or lock owner didn't match 807 // get the old lock 808 if (log.isTraceEnabled()) { 809 logQuery(id, LOCK_FIELDS); 810 } 811 old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 812 if (old == null) { 813 // document not found 814 throw new DocumentNotFoundException(id); 815 } 816 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 817 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 818 if (oldOwner != null) { 819 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 820 // existing mismatched lock, flag failure 821 return new Lock(oldOwner, oldCreated, true); 822 } 823 // old owner should have matched -- there was a race condition 824 // TODO do better 825 throw new ConcurrentUpdateException("Unlock " + id); 826 } 827 // old owner null, should have matched -- there was a race condition 828 // TODO do better 829 throw new ConcurrentUpdateException("Unlock " + id); 830 } 831 } 832 833 @Override 834 public void closeLockManager() { 835 836 } 837 838 @Override 839 public void clearLockManagerCaches() { 840 } 841 842}