001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 041 042import java.io.Serializable; 043import java.util.ArrayList; 044import java.util.Arrays; 045import java.util.Calendar; 046import java.util.Collection; 047import java.util.Collections; 048import java.util.HashSet; 049import java.util.List; 050import java.util.Map; 051import java.util.Set; 052import java.util.Spliterators; 053import java.util.UUID; 054import java.util.stream.Collectors; 055import java.util.stream.Stream; 056import java.util.stream.StreamSupport; 057 058import javax.resource.spi.ConnectionManager; 059 060import org.apache.commons.lang.StringUtils; 061import org.apache.commons.logging.Log; 062import org.apache.commons.logging.LogFactory; 063import org.bson.Document; 064import org.bson.conversions.Bson; 065import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 066import org.nuxeo.ecm.core.api.CursorService; 067import org.nuxeo.ecm.core.api.DocumentNotFoundException; 068import org.nuxeo.ecm.core.api.Lock; 069import org.nuxeo.ecm.core.api.NuxeoException; 070import org.nuxeo.ecm.core.api.PartialList; 071import org.nuxeo.ecm.core.api.ScrollResult; 072import org.nuxeo.ecm.core.blob.DocumentBlobManager; 073import org.nuxeo.ecm.core.model.LockManager; 074import org.nuxeo.ecm.core.model.Repository; 075import org.nuxeo.ecm.core.query.QueryParseException; 076import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 077import org.nuxeo.ecm.core.storage.State; 078import org.nuxeo.ecm.core.storage.State.StateDiff; 079import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 080import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 081import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 082import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 083import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 084import org.nuxeo.runtime.api.Framework; 085import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 086 087import com.mongodb.Block; 088import com.mongodb.QueryOperators; 089import com.mongodb.client.FindIterable; 090import com.mongodb.client.MongoCollection; 091import com.mongodb.client.MongoCursor; 092import com.mongodb.client.MongoDatabase; 093import com.mongodb.client.MongoIterable; 094import com.mongodb.client.model.CountOptions; 095import com.mongodb.client.model.Filters; 096import com.mongodb.client.model.FindOneAndUpdateOptions; 097import com.mongodb.client.model.IndexOptions; 098import com.mongodb.client.model.Indexes; 099import com.mongodb.client.model.Projections; 100import com.mongodb.client.model.ReturnDocument; 101import com.mongodb.client.model.Updates; 102import com.mongodb.client.result.DeleteResult; 103import com.mongodb.client.result.UpdateResult; 104 105/** 106 * MongoDB implementation of a {@link Repository}. 107 * 108 * @since 5.9.4 109 */ 110public class MongoDBRepository extends DBSRepositoryBase { 111 112 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 113 114 /** 115 * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}. 116 * <p /> 117 * The connection id will be {@code repository/[REPOSITORY_NAME]}. 118 */ 119 public static final String REPOSITORY_CONNECTION_PREFIX = "repository/"; 120 121 public static final Long LONG_ZERO = Long.valueOf(0); 122 123 public static final Double ZERO = Double.valueOf(0); 124 125 public static final Double ONE = Double.valueOf(1); 126 127 public static final String MONGODB_ID = "_id"; 128 129 public static final String MONGODB_INC = "$inc"; 130 131 public static final String MONGODB_SET = "$set"; 132 133 public static final String MONGODB_UNSET = "$unset"; 134 135 public static final String MONGODB_PUSH = "$push"; 136 137 public static final String MONGODB_EACH = "$each"; 138 139 public static final String MONGODB_META = "$meta"; 140 141 public static final String MONGODB_TEXT_SCORE = "textScore"; 142 143 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 144 145 private static final String LANGUAGE_FIELD = "__language"; 146 147 protected static final String COUNTER_NAME_UUID = "ecm:id"; 148 149 protected static final String COUNTER_FIELD = "seq"; 150 151 protected final MongoCollection<Document> coll; 152 153 protected final MongoCollection<Document> countersColl; 154 155 /** The key to use to store the id in the database. */ 156 protected String idKey; 157 158 /** True if we don't use MongoDB's native "_id" key to store the id. */ 159 protected boolean useCustomId; 160 161 /** Number of values still available in the in-memory sequence. */ 162 protected long sequenceLeft; 163 164 /** Last value used from the in-memory sequence. */ 165 protected long sequenceLastValue; 166 167 /** Sequence allocation block size. */ 168 protected long sequenceBlockSize; 169 170 protected final MongoDBConverter converter; 171 172 protected final CursorService<MongoCursor<Document>, Document, String> cursorService; 173 174 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 175 super(cm, descriptor.name, descriptor); 176 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 177 // prefix with repository/ to group repository connection 178 MongoDatabase database = mongoService.getDatabase(REPOSITORY_CONNECTION_PREFIX + descriptor.name); 179 coll = database.getCollection(descriptor.name); 180 countersColl = database.getCollection(descriptor.name + ".counters"); 181 if (Boolean.TRUE.equals(descriptor.nativeId)) { 182 idKey = MONGODB_ID; 183 } else { 184 idKey = KEY_ID; 185 } 186 useCustomId = KEY_ID.equals(idKey); 187 if (idType == IdType.sequence || DEBUG_UUIDS) { 188 Integer sbs = descriptor.sequenceBlockSize; 189 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 190 sequenceLeft = 0; 191 } 192 converter = new MongoDBConverter(idKey); 193 cursorService = new CursorService<>(ob -> (String) ob.get(converter.keyToBson(KEY_ID))); 194 initRepository(); 195 } 196 197 @Override 198 public List<IdType> getAllowedIdTypes() { 199 return Arrays.asList(IdType.varchar, IdType.sequence); 200 } 201 202 @Override 203 public void shutdown() { 204 super.shutdown(); 205 cursorService.clear(); 206 } 207 208 protected void initRepository() { 209 // create required indexes 210 // code does explicit queries on those 211 if (useCustomId) { 212 coll.createIndex(Indexes.ascending(idKey)); 213 } 214 coll.createIndex(Indexes.ascending(KEY_PARENT_ID)); 215 coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS)); 216 coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID)); 217 coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID)); 218 coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID)); 219 coll.createIndex(Indexes.ascending(KEY_READ_ACL)); 220 coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME)); 221 // often used in user-generated queries 222 coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE)); 223 coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE)); 224 coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID)); 225 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER)); 226 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS)); 227 // TODO configure these from somewhere else 228 coll.createIndex(Indexes.descending("dc:modified")); 229 coll.createIndex(Indexes.ascending("rend:renditionName")); 230 coll.createIndex(Indexes.ascending("drv:subscriptions.enabled")); 231 coll.createIndex(Indexes.ascending("collectionMember:collectionIds")); 232 coll.createIndex(Indexes.ascending("nxtag:tags")); 233 if (!isFulltextDisabled()) { 234 Bson indexKeys = Indexes.compoundIndex( // 235 Indexes.text(KEY_FULLTEXT_SIMPLE), // 236 Indexes.text(KEY_FULLTEXT_BINARY) // 237 ); 238 IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD); 239 coll.createIndex(indexKeys, indexOptions); 240 } 241 // check root presence 242 if (coll.count(Filters.eq(idKey, getRootId())) > 0) { 243 return; 244 } 245 // create basic repository structure needed 246 if (idType == IdType.sequence || DEBUG_UUIDS) { 247 // create the id counter 248 Document idCounter = new Document(); 249 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 250 idCounter.put(COUNTER_FIELD, LONG_ZERO); 251 countersColl.insertOne(idCounter); 252 } 253 initRoot(); 254 } 255 256 protected synchronized Long getNextSequenceId() { 257 if (sequenceLeft == 0) { 258 // allocate a new sequence block 259 // the database contains the last value from the last block 260 Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID); 261 Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)); 262 Document idCounter = countersColl.findOneAndUpdate(filter, update, 263 new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)); 264 if (idCounter == null) { 265 throw new NuxeoException("Repository id counter not initialized"); 266 } 267 sequenceLeft = sequenceBlockSize; 268 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 269 } 270 sequenceLeft--; 271 sequenceLastValue++; 272 return Long.valueOf(sequenceLastValue); 273 } 274 275 @Override 276 public String generateNewId() { 277 if (idType == IdType.sequence || DEBUG_UUIDS) { 278 Long id = getNextSequenceId(); 279 if (DEBUG_UUIDS) { 280 return "UUID_" + id; 281 } 282 return id.toString(); 283 } else { 284 return UUID.randomUUID().toString(); 285 } 286 } 287 288 @Override 289 public void createState(State state) { 290 Document doc = converter.stateToBson(state); 291 if (log.isTraceEnabled()) { 292 log.trace("MongoDB: CREATE " + doc.get(idKey) + ": " + doc); 293 } 294 coll.insertOne(doc); 295 // TODO dupe exception 296 // throw new DocumentException("Already exists: " + id); 297 } 298 299 @Override 300 public void createStates(List<State> states) { 301 List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList()); 302 if (log.isTraceEnabled()) { 303 log.trace("MongoDB: CREATE [" 304 + docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", ")) 305 + "]: " + docs); 306 } 307 coll.insertMany(docs); 308 } 309 310 @Override 311 public State readState(String id) { 312 return findOne(Filters.eq(idKey, id)); 313 } 314 315 @Override 316 public State readPartialState(String id, Collection<String> keys) { 317 Document fields = new Document(); 318 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 319 return findOne(Filters.eq(idKey, id), fields); 320 } 321 322 @Override 323 public List<State> readStates(List<String> ids) { 324 return findAll(Filters.in(idKey, ids)); 325 } 326 327 @Override 328 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 329 List<Document> updates = converter.diffToBson(diff); 330 for (Document update : updates) { 331 Document filter = new Document(idKey, id); 332 if (changeTokenUpdater == null) { 333 if (log.isTraceEnabled()) { 334 log.trace("MongoDB: UPDATE " + id + ": " + update); 335 } 336 } else { 337 // assume bson is identical to dbs internals 338 // condition works even if value is null 339 Map<String, Serializable> conditions = changeTokenUpdater.getConditions(); 340 Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates(); 341 if (update.containsKey(MONGODB_SET)) { 342 ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates); 343 } else { 344 Document set = new Document(); 345 set.putAll(tokenUpdates); 346 update.put(MONGODB_SET, set); 347 } 348 if (log.isTraceEnabled()) { 349 log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update); 350 } 351 filter.putAll(conditions); 352 } 353 UpdateResult w = coll.updateMany(filter, update); 354 if (w.getModifiedCount() != 1) { 355 log.trace("MongoDB: -> CONCURRENT UPDATE: " + id); 356 throw new ConcurrentUpdateException(id); 357 } 358 // TODO dupe exception 359 // throw new DocumentException("Missing: " + id); 360 } 361 } 362 363 @Override 364 public void deleteStates(Set<String> ids) { 365 Bson filter = Filters.in(idKey, ids); 366 if (log.isTraceEnabled()) { 367 log.trace("MongoDB: REMOVE " + ids); 368 } 369 DeleteResult w = coll.deleteMany(filter); 370 if (w.getDeletedCount() != ids.size()) { 371 if (log.isDebugEnabled()) { 372 log.debug("Removed " + w.getDeletedCount() + " docs for " + ids.size() + " ids: " + ids); 373 } 374 } 375 } 376 377 @Override 378 public State readChildState(String parentId, String name, Set<String> ignored) { 379 Bson filter = getChildQuery(parentId, name, ignored); 380 return findOne(filter); 381 } 382 383 protected void logQuery(String id, Bson fields) { 384 logQuery(Filters.eq(idKey, id), fields); 385 } 386 387 protected void logQuery(Bson filter, Bson fields) { 388 if (fields == null) { 389 log.trace("MongoDB: QUERY " + filter); 390 } else { 391 log.trace("MongoDB: QUERY " + filter + " KEYS " + fields); 392 } 393 } 394 395 protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) { 396 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 397 + " OFFSET " + offset + " LIMIT " + limit); 398 } 399 400 @Override 401 public boolean hasChild(String parentId, String name, Set<String> ignored) { 402 Document filter = getChildQuery(parentId, name, ignored); 403 return exists(filter); 404 } 405 406 protected Document getChildQuery(String parentId, String name, Set<String> ignored) { 407 Document filter = new Document(); 408 filter.put(KEY_PARENT_ID, parentId); 409 filter.put(KEY_NAME, name); 410 addIgnoredIds(filter, ignored); 411 return filter; 412 } 413 414 protected void addIgnoredIds(Document filter, Set<String> ignored) { 415 if (!ignored.isEmpty()) { 416 Document notInIds = new Document(QueryOperators.NIN, new ArrayList<>(ignored)); 417 filter.put(idKey, notInIds); 418 } 419 } 420 421 @Override 422 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 423 Document filter = new Document(converter.keyToBson(key), value); 424 addIgnoredIds(filter, ignored); 425 return findAll(filter); 426 } 427 428 @Override 429 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 430 Document filter = new Document(converter.keyToBson(key1), value1); 431 filter.put(converter.keyToBson(key2), value2); 432 addIgnoredIds(filter, ignored); 433 return findAll(filter); 434 } 435 436 @Override 437 public Stream<State> getDescendants(String rootId, Set<String> keys) { 438 return getDescendants(rootId, keys, 0); 439 } 440 441 @Override 442 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 443 Bson filter = Filters.eq(KEY_ANCESTOR_IDS, rootId); 444 Document fields = new Document(); 445 if (useCustomId) { 446 fields.put(MONGODB_ID, ZERO); 447 } 448 fields.put(idKey, ONE); 449 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 450 return stream(filter, fields, limit); 451 } 452 453 @Override 454 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 455 Document filter = new Document(converter.keyToBson(key), value); 456 addIgnoredIds(filter, ignored); 457 return exists(filter); 458 } 459 460 protected boolean exists(Bson filter) { 461 return exists(filter, justPresenceField()); 462 } 463 464 protected boolean exists(Bson filter, Bson projection) { 465 if (log.isTraceEnabled()) { 466 logQuery(filter, projection); 467 } 468 return coll.find(filter).projection(projection).first() != null; 469 } 470 471 protected State findOne(Bson filter) { 472 return findOne(filter, null); 473 } 474 475 protected State findOne(Bson filter, Bson projection) { 476 try (Stream<State> stream = stream(filter, projection)) { 477 return stream.findAny().orElse(null); 478 } 479 } 480 481 protected List<State> findAll(Bson filter) { 482 try (Stream<State> stream = stream(filter)) { 483 return stream.collect(Collectors.toList()); 484 } 485 } 486 487 protected Stream<State> stream(Bson filter) { 488 return stream(filter, null, 0); 489 } 490 491 protected Stream<State> stream(Bson filter, Bson projection) { 492 return stream(filter, projection, 0); 493 } 494 495 /** 496 * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}. 497 * <p /> 498 * We should rely on this method, because it correctly handles cursor closed state. 499 * <p /> 500 * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case 501 * on some call to {@link MongoIterable#first()}. 502 * 503 * @return a closeable {@link Stream} instance linked to {@link MongoCursor} 504 */ 505 protected Stream<State> stream(Bson filter, Bson projection, int limit) { 506 if (filter == null) { 507 // empty filter 508 filter = new Document(); 509 } 510 // it's ok if projection is null 511 if (log.isTraceEnabled()) { 512 logQuery(filter, projection); 513 } 514 515 boolean completedAbruptly = true; 516 MongoCursor<Document> cursor = coll.find(filter).limit(limit).projection(projection).iterator(); 517 try { 518 Set<String> seen = new HashSet<>(); 519 Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) // 520 .onClose(cursor::close) 521 .filter(doc -> seen.add(doc.getString(idKey))) 522 // MongoDB cursors may return the same 523 // object several times 524 .map(converter::bsonToState); 525 // the stream takes responsibility for closing the session 526 completedAbruptly = false; 527 return stream; 528 } finally { 529 if (completedAbruptly) { 530 cursor.close(); 531 } 532 } 533 } 534 535 protected Document justPresenceField() { 536 return new Document(MONGODB_ID, ONE); 537 } 538 539 @Override 540 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 541 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 542 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 543 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 544 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 545 builder.walk(); 546 if (builder.hasFulltext && isFulltextDisabled()) { 547 throw new QueryParseException("Fulltext search disabled by configuration"); 548 } 549 Document filter = builder.getQuery(); 550 addPrincipals(filter, evaluator.principals); 551 Bson orderBy = builder.getOrderBy(); 552 Bson keys = builder.getProjection(); 553 // Don't do manual projection if there are no projection wildcards, as this brings no new 554 // information and is costly. The only difference is several identical rows instead of one. 555 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 556 if (manualProjection) { 557 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 558 // so we need the full state from the database 559 keys = null; 560 evaluator.parse(); 561 } 562 563 if (log.isTraceEnabled()) { 564 logQuery(filter, keys, orderBy, limit, offset); 565 } 566 567 List<Map<String, Serializable>> projections; 568 long totalSize; 569 try (MongoCursor<Document> cursor = coll.find(filter) 570 .projection(keys) 571 .skip(offset) 572 .limit(limit) 573 .sort(orderBy) 574 .iterator()) { 575 projections = new ArrayList<>(); 576 DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys); 577 Iterable<Document> docs = () -> cursor; 578 for (Document doc : docs) { 579 State state = converter.bsonToState(doc); 580 if (manualProjection) { 581 projections.addAll(evaluator.matches(state)); 582 } else { 583 projections.add(flattener.flatten(state)); 584 } 585 } 586 } 587 if (countUpTo == -1) { 588 // count full size 589 if (limit == 0) { 590 totalSize = projections.size(); 591 } else if (manualProjection) { 592 totalSize = -1; // unknown due to manual projection 593 } else { 594 totalSize = coll.count(filter); 595 } 596 } else if (countUpTo == 0) { 597 // no count 598 totalSize = -1; // not counted 599 } else { 600 // count only if less than countUpTo 601 if (limit == 0) { 602 totalSize = projections.size(); 603 } else if (manualProjection) { 604 totalSize = -1; // unknown due to manual projection 605 } else { 606 totalSize = coll.count(filter, new CountOptions().limit(countUpTo + 1)); 607 } 608 if (totalSize > countUpTo) { 609 totalSize = -2; // truncated 610 } 611 } 612 if (log.isTraceEnabled() && projections.size() != 0) { 613 log.trace("MongoDB: -> " + projections.size()); 614 } 615 return new PartialList<>(projections, totalSize); 616 } 617 618 @Override 619 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 620 cursorService.checkForTimedOutScroll(); 621 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 622 evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 623 builder.walk(); 624 if (builder.hasFulltext && isFulltextDisabled()) { 625 throw new QueryParseException("Fulltext search disabled by configuration"); 626 } 627 Bson filter = builder.getQuery(); 628 Bson keys = builder.getProjection(); 629 if (log.isTraceEnabled()) { 630 logQuery(filter, keys, null, 0, 0); 631 } 632 633 MongoCursor<Document> cursor = coll.find(filter).projection(keys).batchSize(batchSize).iterator(); 634 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 635 return scroll(scrollId); 636 } 637 638 @Override 639 public ScrollResult<String> scroll(String scrollId) { 640 return cursorService.scroll(scrollId); 641 } 642 643 protected void addPrincipals(Document query, Set<String> principals) { 644 if (principals != null) { 645 Document inPrincipals = new Document(QueryOperators.IN, new ArrayList<>(principals)); 646 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 647 } 648 } 649 650 /** Keys used for document projection when marking all binaries for GC. */ 651 protected Bson binaryKeys; 652 653 @Override 654 protected void initBlobsPaths() { 655 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 656 finder.visit(); 657 binaryKeys = Projections.fields(finder.binaryKeys); 658 } 659 660 protected static class MongoDBBlobFinder extends BlobFinder { 661 protected List<Bson> binaryKeys = new ArrayList<>(Collections.singleton(Projections.excludeId())); 662 663 @Override 664 protected void recordBlobPath() { 665 path.addLast(KEY_BLOB_DATA); 666 binaryKeys.add(Projections.include(StringUtils.join(path, "."))); 667 path.removeLast(); 668 } 669 } 670 671 @Override 672 public void markReferencedBinaries() { 673 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 674 // TODO add a query to not scan all documents 675 if (log.isTraceEnabled()) { 676 logQuery(new Document(), binaryKeys); 677 } 678 Block<Document> block = doc -> markReferencedBinaries(doc, blobManager); 679 coll.find().projection(binaryKeys).forEach(block); 680 } 681 682 protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) { 683 for (String key : ob.keySet()) { 684 Object value = ob.get(key); 685 if (value instanceof List) { 686 @SuppressWarnings("unchecked") 687 List<Object> list = (List<Object>) value; 688 for (Object v : list) { 689 if (v instanceof Document) { 690 markReferencedBinaries((Document) v, blobManager); 691 } else { 692 markReferencedBinary(v, blobManager); 693 } 694 } 695 } else if (value instanceof Object[]) { 696 for (Object v : (Object[]) value) { 697 markReferencedBinary(v, blobManager); 698 } 699 } else if (value instanceof Document) { 700 markReferencedBinaries((Document) value, blobManager); 701 } else { 702 markReferencedBinary(value, blobManager); 703 } 704 } 705 } 706 707 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 708 if (!(value instanceof String)) { 709 return; 710 } 711 String key = (String) value; 712 blobManager.markReferencedBinary(key, repositoryName); 713 } 714 715 protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED); 716 717 protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER), 718 Updates.unset(KEY_LOCK_CREATED)); 719 720 @Override 721 public Lock getLock(String id) { 722 if (log.isTraceEnabled()) { 723 logQuery(id, LOCK_FIELDS); 724 } 725 Document res = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 726 if (res == null) { 727 // document not found 728 throw new DocumentNotFoundException(id); 729 } 730 String owner = res.getString(KEY_LOCK_OWNER); 731 if (owner == null) { 732 // not locked 733 return null; 734 } 735 Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED)); 736 return new Lock(owner, created); 737 } 738 739 @Override 740 public Lock setLock(String id, Lock lock) { 741 Bson filter = Filters.and( // 742 Filters.eq(idKey, id), // 743 Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set 744 ); 745 Bson setLock = Updates.combine( // 746 Updates.set(KEY_LOCK_OWNER, lock.getOwner()), // 747 Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())) // 748 ); 749 if (log.isTraceEnabled()) { 750 log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + setLock); 751 } 752 Document res = coll.findOneAndUpdate(filter, setLock); 753 if (res != null) { 754 // found a doc to lock 755 return null; 756 } else { 757 // doc not found, or lock owner already set 758 // get the old lock 759 if (log.isTraceEnabled()) { 760 logQuery(id, LOCK_FIELDS); 761 } 762 Document old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 763 if (old == null) { 764 // document not found 765 throw new DocumentNotFoundException(id); 766 } 767 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 768 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 769 if (oldOwner != null) { 770 return new Lock(oldOwner, oldCreated); 771 } 772 // no lock -- there was a race condition 773 // TODO do better 774 throw new ConcurrentUpdateException("Lock " + id); 775 } 776 } 777 778 @Override 779 public Lock removeLock(String id, String owner) { 780 Document filter = new Document(idKey, id); 781 if (owner != null) { 782 // remove if owner matches or null 783 // implements LockManager.canLockBeRemoved inside MongoDB 784 Object ownerOrNull = Arrays.asList(owner, null); 785 filter.put(KEY_LOCK_OWNER, new Document(QueryOperators.IN, ownerOrNull)); 786 } // else unconditional remove 787 // remove the lock 788 if (log.isTraceEnabled()) { 789 log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + UNSET_LOCK_UPDATE); 790 } 791 Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE); 792 if (old != null) { 793 // found a doc and removed the lock, return previous lock 794 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 795 if (oldOwner == null) { 796 // was not locked 797 return null; 798 } else { 799 // return previous lock 800 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 801 return new Lock(oldOwner, oldCreated); 802 } 803 } else { 804 // doc not found, or lock owner didn't match 805 // get the old lock 806 if (log.isTraceEnabled()) { 807 logQuery(id, LOCK_FIELDS); 808 } 809 old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 810 if (old == null) { 811 // document not found 812 throw new DocumentNotFoundException(id); 813 } 814 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 815 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 816 if (oldOwner != null) { 817 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 818 // existing mismatched lock, flag failure 819 return new Lock(oldOwner, oldCreated, true); 820 } 821 // old owner should have matched -- there was a race condition 822 // TODO do better 823 throw new ConcurrentUpdateException("Unlock " + id); 824 } 825 // old owner null, should have matched -- there was a race condition 826 // TODO do better 827 throw new ConcurrentUpdateException("Unlock " + id); 828 } 829 } 830 831 @Override 832 public void closeLockManager() { 833 834 } 835 836 @Override 837 public void clearLockManagerCaches() { 838 } 839 840}