001/* 002 * (C) Copyright 2014-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_TRASHED; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 042 043import java.io.Serializable; 044import java.util.ArrayList; 045import java.util.Arrays; 046import java.util.Calendar; 047import java.util.Collection; 048import java.util.Collections; 049import java.util.HashSet; 050import java.util.List; 051import java.util.Map; 052import java.util.Set; 053import java.util.Spliterators; 054import java.util.UUID; 055import java.util.stream.Collectors; 056import java.util.stream.Stream; 057import java.util.stream.StreamSupport; 058 059import javax.resource.spi.ConnectionManager; 060 061import org.apache.commons.lang3.StringUtils; 062import org.apache.commons.logging.Log; 063import org.apache.commons.logging.LogFactory; 064import org.bson.Document; 065import org.bson.conversions.Bson; 066import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 067import org.nuxeo.ecm.core.api.CursorService; 068import org.nuxeo.ecm.core.api.DocumentNotFoundException; 069import org.nuxeo.ecm.core.api.Lock; 070import org.nuxeo.ecm.core.api.NuxeoException; 071import org.nuxeo.ecm.core.api.PartialList; 072import org.nuxeo.ecm.core.api.ScrollResult; 073import org.nuxeo.ecm.core.blob.DocumentBlobManager; 074import org.nuxeo.ecm.core.model.LockManager; 075import org.nuxeo.ecm.core.model.Repository; 076import org.nuxeo.ecm.core.query.QueryParseException; 077import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 078import org.nuxeo.ecm.core.storage.State; 079import org.nuxeo.ecm.core.storage.State.StateDiff; 080import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 081import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 082import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 083import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 084import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 085import org.nuxeo.runtime.api.Framework; 086import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 087 088import com.mongodb.Block; 089import com.mongodb.QueryOperators; 090import com.mongodb.client.FindIterable; 091import com.mongodb.client.MongoCollection; 092import com.mongodb.client.MongoCursor; 093import com.mongodb.client.MongoDatabase; 094import com.mongodb.client.MongoIterable; 095import com.mongodb.client.model.CountOptions; 096import com.mongodb.client.model.Filters; 097import com.mongodb.client.model.FindOneAndUpdateOptions; 098import com.mongodb.client.model.IndexOptions; 099import com.mongodb.client.model.Indexes; 100import com.mongodb.client.model.Projections; 101import com.mongodb.client.model.ReturnDocument; 102import com.mongodb.client.model.Updates; 103import com.mongodb.client.result.DeleteResult; 104import com.mongodb.client.result.UpdateResult; 105 106/** 107 * MongoDB implementation of a {@link Repository}. 108 * 109 * @since 5.9.4 110 */ 111public class MongoDBRepository extends DBSRepositoryBase { 112 113 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 114 115 /** 116 * Prefix used to retrieve a MongoDB connection from {@link MongoDBConnectionService}. 117 * <p /> 118 * The connection id will be {@code repository/[REPOSITORY_NAME]}. 119 */ 120 public static final String REPOSITORY_CONNECTION_PREFIX = "repository/"; 121 122 public static final Long LONG_ZERO = Long.valueOf(0); 123 124 public static final Double ZERO = Double.valueOf(0); 125 126 public static final Double ONE = Double.valueOf(1); 127 128 public static final String MONGODB_ID = "_id"; 129 130 public static final String MONGODB_INC = "$inc"; 131 132 public static final String MONGODB_SET = "$set"; 133 134 public static final String MONGODB_UNSET = "$unset"; 135 136 public static final String MONGODB_PUSH = "$push"; 137 138 public static final String MONGODB_EACH = "$each"; 139 140 public static final String MONGODB_META = "$meta"; 141 142 public static final String MONGODB_TEXT_SCORE = "textScore"; 143 144 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 145 146 private static final String LANGUAGE_FIELD = "__language"; 147 148 protected static final String COUNTER_NAME_UUID = "ecm:id"; 149 150 protected static final String COUNTER_FIELD = "seq"; 151 152 protected final MongoCollection<Document> coll; 153 154 protected final MongoCollection<Document> countersColl; 155 156 /** The key to use to store the id in the database. */ 157 protected String idKey; 158 159 /** True if we don't use MongoDB's native "_id" key to store the id. */ 160 protected boolean useCustomId; 161 162 /** Number of values still available in the in-memory sequence. */ 163 protected long sequenceLeft; 164 165 /** Last value used from the in-memory sequence. */ 166 protected long sequenceLastValue; 167 168 /** Sequence allocation block size. */ 169 protected long sequenceBlockSize; 170 171 protected final MongoDBConverter converter; 172 173 protected final CursorService<MongoCursor<Document>, Document, String> cursorService; 174 175 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 176 super(cm, descriptor.name, descriptor); 177 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 178 // prefix with repository/ to group repository connection 179 MongoDatabase database = mongoService.getDatabase(REPOSITORY_CONNECTION_PREFIX + descriptor.name); 180 coll = database.getCollection(descriptor.name); 181 countersColl = database.getCollection(descriptor.name + ".counters"); 182 if (Boolean.TRUE.equals(descriptor.nativeId)) { 183 idKey = MONGODB_ID; 184 } else { 185 idKey = KEY_ID; 186 } 187 useCustomId = KEY_ID.equals(idKey); 188 if (idType == IdType.sequence || DEBUG_UUIDS) { 189 Integer sbs = descriptor.sequenceBlockSize; 190 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 191 sequenceLeft = 0; 192 } 193 converter = new MongoDBConverter(idKey); 194 cursorService = new CursorService<>(ob -> (String) ob.get(converter.keyToBson(KEY_ID))); 195 initRepository(); 196 } 197 198 @Override 199 public List<IdType> getAllowedIdTypes() { 200 return Arrays.asList(IdType.varchar, IdType.sequence); 201 } 202 203 @Override 204 public void shutdown() { 205 super.shutdown(); 206 cursorService.clear(); 207 } 208 209 protected void initRepository() { 210 // create required indexes 211 // code does explicit queries on those 212 if (useCustomId) { 213 coll.createIndex(Indexes.ascending(idKey)); 214 } 215 coll.createIndex(Indexes.ascending(KEY_PARENT_ID)); 216 coll.createIndex(Indexes.ascending(KEY_ANCESTOR_IDS)); 217 coll.createIndex(Indexes.ascending(KEY_VERSION_SERIES_ID)); 218 coll.createIndex(Indexes.ascending(KEY_PROXY_TARGET_ID)); 219 coll.createIndex(Indexes.ascending(KEY_PROXY_VERSION_SERIES_ID)); 220 coll.createIndex(Indexes.ascending(KEY_READ_ACL)); 221 coll.createIndex(Indexes.ascending(KEY_PARENT_ID, KEY_NAME)); 222 // often used in user-generated queries 223 coll.createIndex(Indexes.ascending(KEY_PRIMARY_TYPE)); 224 coll.createIndex(Indexes.ascending(KEY_LIFECYCLE_STATE)); 225 coll.createIndex(Indexes.ascending(KEY_IS_TRASHED)); 226 if(!isFulltextDisabled()) { 227 coll.createIndex(Indexes.ascending(KEY_FULLTEXT_JOBID)); 228 } 229 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER)); 230 coll.createIndex(Indexes.ascending(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS)); 231 // TODO configure these from somewhere else 232 coll.createIndex(Indexes.descending("dc:modified")); 233 coll.createIndex(Indexes.ascending("rend:renditionName")); 234 coll.createIndex(Indexes.ascending("drv:subscriptions.enabled")); 235 coll.createIndex(Indexes.ascending("collectionMember:collectionIds")); 236 coll.createIndex(Indexes.ascending("nxtag:tags")); 237 if (!isFulltextSearchDisabled()) { 238 Bson indexKeys = Indexes.compoundIndex( // 239 Indexes.text(KEY_FULLTEXT_SIMPLE), // 240 Indexes.text(KEY_FULLTEXT_BINARY) // 241 ); 242 IndexOptions indexOptions = new IndexOptions().name(FULLTEXT_INDEX_NAME).languageOverride(LANGUAGE_FIELD); 243 coll.createIndex(indexKeys, indexOptions); 244 } 245 // check root presence 246 if (coll.count(Filters.eq(idKey, getRootId())) > 0) { 247 return; 248 } 249 // create basic repository structure needed 250 if (idType == IdType.sequence || DEBUG_UUIDS) { 251 // create the id counter 252 Document idCounter = new Document(); 253 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 254 idCounter.put(COUNTER_FIELD, LONG_ZERO); 255 countersColl.insertOne(idCounter); 256 } 257 initRoot(); 258 } 259 260 protected synchronized Long getNextSequenceId() { 261 if (sequenceLeft == 0) { 262 // allocate a new sequence block 263 // the database contains the last value from the last block 264 Bson filter = Filters.eq(MONGODB_ID, COUNTER_NAME_UUID); 265 Bson update = Updates.inc(COUNTER_FIELD, Long.valueOf(sequenceBlockSize)); 266 Document idCounter = countersColl.findOneAndUpdate(filter, update, 267 new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)); 268 if (idCounter == null) { 269 throw new NuxeoException("Repository id counter not initialized"); 270 } 271 sequenceLeft = sequenceBlockSize; 272 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 273 } 274 sequenceLeft--; 275 sequenceLastValue++; 276 return Long.valueOf(sequenceLastValue); 277 } 278 279 @Override 280 public String generateNewId() { 281 if (idType == IdType.sequence || DEBUG_UUIDS) { 282 Long id = getNextSequenceId(); 283 if (DEBUG_UUIDS) { 284 return "UUID_" + id; 285 } 286 return id.toString(); 287 } else { 288 return UUID.randomUUID().toString(); 289 } 290 } 291 292 @Override 293 public void createState(State state) { 294 Document doc = converter.stateToBson(state); 295 if (log.isTraceEnabled()) { 296 log.trace("MongoDB: CREATE " + doc.get(idKey) + ": " + doc); 297 } 298 coll.insertOne(doc); 299 // TODO dupe exception 300 // throw new DocumentException("Already exists: " + id); 301 } 302 303 @Override 304 public void createStates(List<State> states) { 305 List<Document> docs = states.stream().map(converter::stateToBson).collect(Collectors.toList()); 306 if (log.isTraceEnabled()) { 307 log.trace("MongoDB: CREATE [" 308 + docs.stream().map(doc -> doc.get(idKey).toString()).collect(Collectors.joining(", ")) 309 + "]: " + docs); 310 } 311 coll.insertMany(docs); 312 } 313 314 @Override 315 public State readState(String id) { 316 return findOne(Filters.eq(idKey, id)); 317 } 318 319 @Override 320 public State readPartialState(String id, Collection<String> keys) { 321 Document fields = new Document(); 322 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 323 return findOne(Filters.eq(idKey, id), fields); 324 } 325 326 @Override 327 public List<State> readStates(List<String> ids) { 328 return findAll(Filters.in(idKey, ids)); 329 } 330 331 @Override 332 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 333 List<Document> updates = converter.diffToBson(diff); 334 for (Document update : updates) { 335 Document filter = new Document(idKey, id); 336 if (changeTokenUpdater == null) { 337 if (log.isTraceEnabled()) { 338 log.trace("MongoDB: UPDATE " + id + ": " + update); 339 } 340 } else { 341 // assume bson is identical to dbs internals 342 // condition works even if value is null 343 Map<String, Serializable> conditions = changeTokenUpdater.getConditions(); 344 Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates(); 345 if (update.containsKey(MONGODB_SET)) { 346 ((Document) update.get(MONGODB_SET)).putAll(tokenUpdates); 347 } else { 348 Document set = new Document(); 349 set.putAll(tokenUpdates); 350 update.put(MONGODB_SET, set); 351 } 352 if (log.isTraceEnabled()) { 353 log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update); 354 } 355 filter.putAll(conditions); 356 } 357 UpdateResult w = coll.updateMany(filter, update); 358 if (w.getModifiedCount() != 1) { 359 log.trace("MongoDB: -> CONCURRENT UPDATE: " + id); 360 throw new ConcurrentUpdateException(id); 361 } 362 // TODO dupe exception 363 // throw new DocumentException("Missing: " + id); 364 } 365 } 366 367 @Override 368 public void deleteStates(Set<String> ids) { 369 Bson filter = Filters.in(idKey, ids); 370 if (log.isTraceEnabled()) { 371 log.trace("MongoDB: REMOVE " + ids); 372 } 373 DeleteResult w = coll.deleteMany(filter); 374 if (w.getDeletedCount() != ids.size()) { 375 if (log.isDebugEnabled()) { 376 log.debug("Removed " + w.getDeletedCount() + " docs for " + ids.size() + " ids: " + ids); 377 } 378 } 379 } 380 381 @Override 382 public State readChildState(String parentId, String name, Set<String> ignored) { 383 Bson filter = getChildQuery(parentId, name, ignored); 384 return findOne(filter); 385 } 386 387 protected void logQuery(String id, Bson fields) { 388 logQuery(Filters.eq(idKey, id), fields); 389 } 390 391 protected void logQuery(Bson filter, Bson fields) { 392 if (fields == null) { 393 log.trace("MongoDB: QUERY " + filter); 394 } else { 395 log.trace("MongoDB: QUERY " + filter + " KEYS " + fields); 396 } 397 } 398 399 protected void logQuery(Bson query, Bson fields, Bson orderBy, int limit, int offset) { 400 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 401 + " OFFSET " + offset + " LIMIT " + limit); 402 } 403 404 @Override 405 public boolean hasChild(String parentId, String name, Set<String> ignored) { 406 Document filter = getChildQuery(parentId, name, ignored); 407 return exists(filter); 408 } 409 410 protected Document getChildQuery(String parentId, String name, Set<String> ignored) { 411 Document filter = new Document(); 412 filter.put(KEY_PARENT_ID, parentId); 413 filter.put(KEY_NAME, name); 414 addIgnoredIds(filter, ignored); 415 return filter; 416 } 417 418 protected void addIgnoredIds(Document filter, Set<String> ignored) { 419 if (!ignored.isEmpty()) { 420 Document notInIds = new Document(QueryOperators.NIN, new ArrayList<>(ignored)); 421 filter.put(idKey, notInIds); 422 } 423 } 424 425 @Override 426 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 427 Document filter = new Document(converter.keyToBson(key), value); 428 addIgnoredIds(filter, ignored); 429 return findAll(filter); 430 } 431 432 @Override 433 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 434 Document filter = new Document(converter.keyToBson(key1), value1); 435 filter.put(converter.keyToBson(key2), value2); 436 addIgnoredIds(filter, ignored); 437 return findAll(filter); 438 } 439 440 @Override 441 public Stream<State> getDescendants(String rootId, Set<String> keys) { 442 return getDescendants(rootId, keys, 0); 443 } 444 445 @Override 446 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 447 Bson filter = Filters.eq(KEY_ANCESTOR_IDS, rootId); 448 Document fields = new Document(); 449 if (useCustomId) { 450 fields.put(MONGODB_ID, ZERO); 451 } 452 fields.put(idKey, ONE); 453 keys.forEach(key -> fields.put(converter.keyToBson(key), ONE)); 454 return stream(filter, fields, limit); 455 } 456 457 @Override 458 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 459 Document filter = new Document(converter.keyToBson(key), value); 460 addIgnoredIds(filter, ignored); 461 return exists(filter); 462 } 463 464 protected boolean exists(Bson filter) { 465 return exists(filter, justPresenceField()); 466 } 467 468 protected boolean exists(Bson filter, Bson projection) { 469 if (log.isTraceEnabled()) { 470 logQuery(filter, projection); 471 } 472 return coll.find(filter).projection(projection).first() != null; 473 } 474 475 protected State findOne(Bson filter) { 476 return findOne(filter, null); 477 } 478 479 protected State findOne(Bson filter, Bson projection) { 480 try (Stream<State> stream = stream(filter, projection)) { 481 return stream.findAny().orElse(null); 482 } 483 } 484 485 protected List<State> findAll(Bson filter) { 486 try (Stream<State> stream = stream(filter)) { 487 return stream.collect(Collectors.toList()); 488 } 489 } 490 491 protected Stream<State> stream(Bson filter) { 492 return stream(filter, null, 0); 493 } 494 495 protected Stream<State> stream(Bson filter, Bson projection) { 496 return stream(filter, projection, 0); 497 } 498 499 /** 500 * Logs, runs request and constructs a closeable {@link Stream} on top of {@link MongoCursor}. 501 * <p /> 502 * We should rely on this method, because it correctly handles cursor closed state. 503 * <p /> 504 * Note: Looping on {@link FindIterable} or {@link MongoIterable} could lead to cursor leaks. This is also the case 505 * on some call to {@link MongoIterable#first()}. 506 * 507 * @return a closeable {@link Stream} instance linked to {@link MongoCursor} 508 */ 509 protected Stream<State> stream(Bson filter, Bson projection, int limit) { 510 if (filter == null) { 511 // empty filter 512 filter = new Document(); 513 } 514 // it's ok if projection is null 515 if (log.isTraceEnabled()) { 516 logQuery(filter, projection); 517 } 518 519 boolean completedAbruptly = true; 520 MongoCursor<Document> cursor = coll.find(filter).limit(limit).projection(projection).iterator(); 521 try { 522 Set<String> seen = new HashSet<>(); 523 Stream<State> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(cursor, 0), false) // 524 .onClose(cursor::close) 525 .filter(doc -> seen.add(doc.getString(idKey))) 526 // MongoDB cursors may return the same 527 // object several times 528 .map(converter::bsonToState); 529 // the stream takes responsibility for closing the session 530 completedAbruptly = false; 531 return stream; 532 } finally { 533 if (completedAbruptly) { 534 cursor.close(); 535 } 536 } 537 } 538 539 protected Document justPresenceField() { 540 return new Document(MONGODB_ID, ONE); 541 } 542 543 @Override 544 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 545 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 546 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 547 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 548 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 549 builder.walk(); 550 if (builder.hasFulltext && isFulltextSearchDisabled()) { 551 throw new QueryParseException("Fulltext search disabled by configuration"); 552 } 553 Document filter = builder.getQuery(); 554 addPrincipals(filter, evaluator.principals); 555 Bson orderBy = builder.getOrderBy(); 556 Bson keys = builder.getProjection(); 557 // Don't do manual projection if there are no projection wildcards, as this brings no new 558 // information and is costly. The only difference is several identical rows instead of one. 559 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 560 if (manualProjection) { 561 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 562 // so we need the full state from the database 563 keys = null; 564 evaluator.parse(); 565 } 566 567 if (log.isTraceEnabled()) { 568 logQuery(filter, keys, orderBy, limit, offset); 569 } 570 571 List<Map<String, Serializable>> projections; 572 long totalSize; 573 try (MongoCursor<Document> cursor = coll.find(filter) 574 .projection(keys) 575 .skip(offset) 576 .limit(limit) 577 .sort(orderBy) 578 .iterator()) { 579 projections = new ArrayList<>(); 580 DBSStateFlattener flattener = new DBSStateFlattener(builder.propertyKeys); 581 Iterable<Document> docs = () -> cursor; 582 for (Document doc : docs) { 583 State state = converter.bsonToState(doc); 584 if (manualProjection) { 585 projections.addAll(evaluator.matches(state)); 586 } else { 587 projections.add(flattener.flatten(state)); 588 } 589 } 590 } 591 if (countUpTo == -1) { 592 // count full size 593 if (limit == 0) { 594 totalSize = projections.size(); 595 } else if (manualProjection) { 596 totalSize = -1; // unknown due to manual projection 597 } else { 598 totalSize = coll.count(filter); 599 } 600 } else if (countUpTo == 0) { 601 // no count 602 totalSize = -1; // not counted 603 } else { 604 // count only if less than countUpTo 605 if (limit == 0) { 606 totalSize = projections.size(); 607 } else if (manualProjection) { 608 totalSize = -1; // unknown due to manual projection 609 } else { 610 totalSize = coll.count(filter, new CountOptions().limit(countUpTo + 1)); 611 } 612 if (totalSize > countUpTo) { 613 totalSize = -2; // truncated 614 } 615 } 616 if (log.isTraceEnabled() && projections.size() != 0) { 617 log.trace("MongoDB: -> " + projections.size()); 618 } 619 return new PartialList<>(projections, totalSize); 620 } 621 622 @Override 623 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 624 cursorService.checkForTimedOutScroll(); 625 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 626 evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 627 builder.walk(); 628 if (builder.hasFulltext && isFulltextSearchDisabled()) { 629 throw new QueryParseException("Fulltext search disabled by configuration"); 630 } 631 Bson filter = builder.getQuery(); 632 Bson keys = builder.getProjection(); 633 if (log.isTraceEnabled()) { 634 logQuery(filter, keys, null, 0, 0); 635 } 636 637 MongoCursor<Document> cursor = coll.find(filter).projection(keys).batchSize(batchSize).iterator(); 638 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 639 return scroll(scrollId); 640 } 641 642 @Override 643 public ScrollResult<String> scroll(String scrollId) { 644 return cursorService.scroll(scrollId); 645 } 646 647 protected void addPrincipals(Document query, Set<String> principals) { 648 if (principals != null) { 649 Document inPrincipals = new Document(QueryOperators.IN, new ArrayList<>(principals)); 650 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 651 } 652 } 653 654 /** Keys used for document projection when marking all binaries for GC. */ 655 protected Bson binaryKeys; 656 657 @Override 658 protected void initBlobsPaths() { 659 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 660 finder.visit(); 661 binaryKeys = Projections.fields(finder.binaryKeys); 662 } 663 664 protected static class MongoDBBlobFinder extends BlobFinder { 665 protected List<Bson> binaryKeys = new ArrayList<>(Collections.singleton(Projections.excludeId())); 666 667 @Override 668 protected void recordBlobPath() { 669 path.addLast(KEY_BLOB_DATA); 670 binaryKeys.add(Projections.include(StringUtils.join(path, "."))); 671 path.removeLast(); 672 } 673 } 674 675 @Override 676 public void markReferencedBinaries() { 677 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 678 // TODO add a query to not scan all documents 679 if (log.isTraceEnabled()) { 680 logQuery(new Document(), binaryKeys); 681 } 682 Block<Document> block = doc -> markReferencedBinaries(doc, blobManager); 683 coll.find().projection(binaryKeys).forEach(block); 684 } 685 686 protected void markReferencedBinaries(Document ob, DocumentBlobManager blobManager) { 687 for (String key : ob.keySet()) { 688 Object value = ob.get(key); 689 if (value instanceof List) { 690 @SuppressWarnings("unchecked") 691 List<Object> list = (List<Object>) value; 692 for (Object v : list) { 693 if (v instanceof Document) { 694 markReferencedBinaries((Document) v, blobManager); 695 } else { 696 markReferencedBinary(v, blobManager); 697 } 698 } 699 } else if (value instanceof Object[]) { 700 for (Object v : (Object[]) value) { 701 markReferencedBinary(v, blobManager); 702 } 703 } else if (value instanceof Document) { 704 markReferencedBinaries((Document) value, blobManager); 705 } else { 706 markReferencedBinary(value, blobManager); 707 } 708 } 709 } 710 711 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 712 if (!(value instanceof String)) { 713 return; 714 } 715 String key = (String) value; 716 blobManager.markReferencedBinary(key, repositoryName); 717 } 718 719 protected static final Bson LOCK_FIELDS = Projections.include(KEY_LOCK_OWNER, KEY_LOCK_CREATED); 720 721 protected static final Bson UNSET_LOCK_UPDATE = Updates.combine(Updates.unset(KEY_LOCK_OWNER), 722 Updates.unset(KEY_LOCK_CREATED)); 723 724 @Override 725 public Lock getLock(String id) { 726 if (log.isTraceEnabled()) { 727 logQuery(id, LOCK_FIELDS); 728 } 729 Document res = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 730 if (res == null) { 731 // document not found 732 throw new DocumentNotFoundException(id); 733 } 734 String owner = res.getString(KEY_LOCK_OWNER); 735 if (owner == null) { 736 // not locked 737 return null; 738 } 739 Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED)); 740 return new Lock(owner, created); 741 } 742 743 @Override 744 public Lock setLock(String id, Lock lock) { 745 Bson filter = Filters.and( // 746 Filters.eq(idKey, id), // 747 Filters.exists(KEY_LOCK_OWNER, false) // select doc if no lock is set 748 ); 749 Bson setLock = Updates.combine( // 750 Updates.set(KEY_LOCK_OWNER, lock.getOwner()), // 751 Updates.set(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())) // 752 ); 753 if (log.isTraceEnabled()) { 754 log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + setLock); 755 } 756 Document res = coll.findOneAndUpdate(filter, setLock); 757 if (res != null) { 758 // found a doc to lock 759 return null; 760 } else { 761 // doc not found, or lock owner already set 762 // get the old lock 763 if (log.isTraceEnabled()) { 764 logQuery(id, LOCK_FIELDS); 765 } 766 Document old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 767 if (old == null) { 768 // document not found 769 throw new DocumentNotFoundException(id); 770 } 771 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 772 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 773 if (oldOwner != null) { 774 return new Lock(oldOwner, oldCreated); 775 } 776 // no lock -- there was a race condition 777 // TODO do better 778 throw new ConcurrentUpdateException("Lock " + id); 779 } 780 } 781 782 @Override 783 public Lock removeLock(String id, String owner) { 784 Document filter = new Document(idKey, id); 785 if (owner != null) { 786 // remove if owner matches or null 787 // implements LockManager.canLockBeRemoved inside MongoDB 788 Object ownerOrNull = Arrays.asList(owner, null); 789 filter.put(KEY_LOCK_OWNER, new Document(QueryOperators.IN, ownerOrNull)); 790 } // else unconditional remove 791 // remove the lock 792 if (log.isTraceEnabled()) { 793 log.trace("MongoDB: FINDANDMODIFY " + filter + " UPDATE " + UNSET_LOCK_UPDATE); 794 } 795 Document old = coll.findOneAndUpdate(filter, UNSET_LOCK_UPDATE); 796 if (old != null) { 797 // found a doc and removed the lock, return previous lock 798 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 799 if (oldOwner == null) { 800 // was not locked 801 return null; 802 } else { 803 // return previous lock 804 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 805 return new Lock(oldOwner, oldCreated); 806 } 807 } else { 808 // doc not found, or lock owner didn't match 809 // get the old lock 810 if (log.isTraceEnabled()) { 811 logQuery(id, LOCK_FIELDS); 812 } 813 old = coll.find(Filters.eq(idKey, id)).projection(LOCK_FIELDS).first(); 814 if (old == null) { 815 // document not found 816 throw new DocumentNotFoundException(id); 817 } 818 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 819 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 820 if (oldOwner != null) { 821 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 822 // existing mismatched lock, flag failure 823 return new Lock(oldOwner, oldCreated, true); 824 } 825 // old owner should have matched -- there was a race condition 826 // TODO do better 827 throw new ConcurrentUpdateException("Unlock " + id); 828 } 829 // old owner null, should have matched -- there was a race condition 830 // TODO do better 831 throw new ConcurrentUpdateException("Unlock " + id); 832 } 833 } 834 835 @Override 836 public void closeLockManager() { 837 838 } 839 840 @Override 841 public void clearLockManagerCaches() { 842 } 843 844}