001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 044 045import java.io.Serializable; 046import java.net.UnknownHostException; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Calendar; 050import java.util.HashSet; 051import java.util.List; 052import java.util.Map; 053import java.util.Set; 054import java.util.UUID; 055import java.util.stream.Collectors; 056 057import javax.resource.spi.ConnectionManager; 058 059import org.apache.commons.lang.StringUtils; 060import org.apache.commons.logging.Log; 061import org.apache.commons.logging.LogFactory; 062import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 063import org.nuxeo.ecm.core.api.CursorService; 064import org.nuxeo.ecm.core.api.DocumentNotFoundException; 065import org.nuxeo.ecm.core.api.Lock; 066import org.nuxeo.ecm.core.api.NuxeoException; 067import org.nuxeo.ecm.core.api.PartialList; 068import org.nuxeo.ecm.core.api.ScrollResult; 069import org.nuxeo.ecm.core.blob.DocumentBlobManager; 070import org.nuxeo.ecm.core.model.LockManager; 071import org.nuxeo.ecm.core.model.Repository; 072import org.nuxeo.ecm.core.query.QueryParseException; 073import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 074import org.nuxeo.ecm.core.storage.State; 075import org.nuxeo.ecm.core.storage.State.StateDiff; 076import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 077import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 078import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 079import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 080import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 081import org.nuxeo.runtime.api.Framework; 082 083import com.mongodb.BasicDBObject; 084import com.mongodb.DB; 085import com.mongodb.DBCollection; 086import com.mongodb.DBCursor; 087import com.mongodb.DBObject; 088import com.mongodb.MongoClient; 089import com.mongodb.MongoClientOptions; 090import com.mongodb.MongoClientURI; 091import com.mongodb.QueryOperators; 092import com.mongodb.ServerAddress; 093import com.mongodb.WriteResult; 094 095/** 096 * MongoDB implementation of a {@link Repository}. 097 * 098 * @since 5.9.4 099 */ 100public class MongoDBRepository extends DBSRepositoryBase { 101 102 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 103 104 public static final Long ZERO = Long.valueOf(0); 105 106 public static final Long ONE = Long.valueOf(1); 107 108 public static final Long MINUS_ONE = Long.valueOf(-1); 109 110 public static final String DB_DEFAULT = "nuxeo"; 111 112 public static final String MONGODB_ID = "_id"; 113 114 public static final String MONGODB_INC = "$inc"; 115 116 public static final String MONGODB_SET = "$set"; 117 118 public static final String MONGODB_UNSET = "$unset"; 119 120 public static final String MONGODB_PUSH = "$push"; 121 122 public static final String MONGODB_EACH = "$each"; 123 124 public static final String MONGODB_META = "$meta"; 125 126 public static final String MONGODB_TEXT_SCORE = "textScore"; 127 128 private static final String MONGODB_INDEX_TEXT = "text"; 129 130 private static final String MONGODB_INDEX_NAME = "name"; 131 132 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 133 134 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 135 136 private static final String LANGUAGE_FIELD = "__language"; 137 138 protected static final String COUNTER_NAME_UUID = "ecm:id"; 139 140 protected static final String COUNTER_FIELD = "seq"; 141 142 protected static final int MONGODB_OPTION_CONNECTION_TIMEOUT_MS = 30000; 143 144 protected static final int MONGODB_OPTION_SOCKET_TIMEOUT_MS = 60000; 145 146 protected MongoClient mongoClient; 147 148 protected DBCollection coll; 149 150 protected DBCollection countersColl; 151 152 /** The key to use to store the id in the database. */ 153 protected String idKey; 154 155 /** True if we don't use MongoDB's native "_id" key to store the id. */ 156 protected boolean useCustomId; 157 158 /** Number of values still available in the in-memory sequence. */ 159 protected long sequenceLeft; 160 161 /** Last value used from the in-memory sequence. */ 162 protected long sequenceLastValue; 163 164 /** Sequence allocation block size. */ 165 protected long sequenceBlockSize; 166 167 protected final MongoDBConverter converter; 168 169 protected final CursorService<DBCursor, DBObject> cursorService = new CursorService<>(); 170 171 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 172 super(cm, descriptor.name, descriptor); 173 try { 174 mongoClient = newMongoClient(descriptor); 175 coll = getCollection(descriptor, mongoClient); 176 countersColl = getCountersCollection(descriptor, mongoClient); 177 } catch (UnknownHostException e) { 178 throw new RuntimeException(e); 179 } 180 if (Boolean.TRUE.equals(descriptor.nativeId)) { 181 idKey = MONGODB_ID; 182 } else { 183 idKey = KEY_ID; 184 } 185 useCustomId = KEY_ID.equals(idKey); 186 if (idType == IdType.sequence || DEBUG_UUIDS) { 187 Integer sbs = descriptor.sequenceBlockSize; 188 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 189 sequenceLeft = 0; 190 } 191 converter = new MongoDBConverter(idKey); 192 initRepository(); 193 } 194 195 @Override 196 public List<IdType> getAllowedIdTypes() { 197 return Arrays.asList(IdType.varchar, IdType.sequence); 198 } 199 200 @Override 201 public void shutdown() { 202 super.shutdown(); 203 cursorService.clear(); 204 mongoClient.close(); 205 } 206 207 // used also by unit tests 208 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 209 MongoClient ret; 210 String server = descriptor.server; 211 if (StringUtils.isBlank(server)) { 212 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 213 } 214 MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder() 215 // Can help to prevent firewall disconnects inactive connection, option not available from URI 216 .socketKeepAlive(true) 217 // don't wait for ever by default, can be overridden using URI options 218 .connectTimeout(MONGODB_OPTION_CONNECTION_TIMEOUT_MS) 219 .socketTimeout(MONGODB_OPTION_SOCKET_TIMEOUT_MS) 220 .description("Nuxeo"); 221 if (server.startsWith("mongodb://")) { 222 // allow mongodb:// URI syntax for the server, to pass everything in one string 223 ret = new MongoClient(new MongoClientURI(server, optionsBuilder)); 224 } else { 225 ret = new MongoClient(new ServerAddress(server), optionsBuilder.build()); 226 } 227 if (log.isDebugEnabled()) { 228 log.debug("MongoClient initialized with options: " + ret.getMongoClientOptions().toString()); 229 } 230 return ret; 231 } 232 233 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 234 if (StringUtils.isBlank(dbname)) { 235 dbname = DB_DEFAULT; 236 } 237 DB db = mongoClient.getDB(dbname); 238 return db.getCollection(collection); 239 } 240 241 // used also by unit tests 242 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 243 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 244 } 245 246 // used also by unit tests 247 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 248 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 249 } 250 251 protected void initRepository() { 252 // create required indexes 253 // code does explicit queries on those 254 if (useCustomId) { 255 coll.createIndex(new BasicDBObject(idKey, ONE)); 256 } 257 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 258 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 259 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 260 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 261 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 262 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 263 DBObject parentChild = new BasicDBObject(); 264 parentChild.put(KEY_PARENT_ID, ONE); 265 parentChild.put(KEY_NAME, ONE); 266 coll.createIndex(parentChild); 267 // often used in user-generated queries 268 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 269 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 270 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 271 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 272 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 273 // TODO configure these from somewhere else 274 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 275 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 276 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 277 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 278 if (!isFulltextDisabled()) { 279 DBObject indexKeys = new BasicDBObject(); 280 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 281 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 282 DBObject indexOptions = new BasicDBObject(); 283 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 284 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 285 coll.createIndex(indexKeys, indexOptions); 286 } 287 // check root presence 288 DBObject query = new BasicDBObject(idKey, getRootId()); 289 if (coll.findOne(query, justPresenceField()) != null) { 290 return; 291 } 292 // create basic repository structure needed 293 if (idType == IdType.sequence || DEBUG_UUIDS) { 294 // create the id counter 295 DBObject idCounter = new BasicDBObject(); 296 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 297 idCounter.put(COUNTER_FIELD, ZERO); 298 countersColl.insert(idCounter); 299 } 300 initRoot(); 301 } 302 303 protected synchronized Long getNextSequenceId() { 304 if (sequenceLeft == 0) { 305 // allocate a new sequence block 306 // the database contains the last value from the last block 307 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 308 DBObject update = new BasicDBObject(MONGODB_INC, 309 new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize))); 310 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false); 311 if (idCounter == null) { 312 throw new NuxeoException("Repository id counter not initialized"); 313 } 314 sequenceLeft = sequenceBlockSize; 315 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 316 } 317 sequenceLeft--; 318 sequenceLastValue++; 319 return Long.valueOf(sequenceLastValue); 320 } 321 322 @Override 323 public String generateNewId() { 324 if (idType == IdType.sequence || DEBUG_UUIDS) { 325 Long id = getNextSequenceId(); 326 if (DEBUG_UUIDS) { 327 return "UUID_" + id; 328 } 329 return id.toString(); 330 } else { 331 return UUID.randomUUID().toString(); 332 } 333 } 334 335 @Override 336 public void createState(State state) { 337 DBObject ob = converter.stateToBson(state); 338 if (log.isTraceEnabled()) { 339 log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob); 340 } 341 coll.insert(ob); 342 // TODO dupe exception 343 // throw new DocumentException("Already exists: " + id); 344 } 345 346 @Override 347 public void createStates(List<State> states) { 348 List<DBObject> obs = states.stream().map(converter::stateToBson).collect(Collectors.toList()); 349 if (log.isTraceEnabled()) { 350 log.trace("MongoDB: CREATE [" 351 + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", ")) 352 + "]: " + obs); 353 } 354 coll.insert(obs); 355 } 356 357 @Override 358 public State readState(String id) { 359 DBObject query = new BasicDBObject(idKey, id); 360 return findOne(query); 361 } 362 363 @Override 364 public List<State> readStates(List<String> ids) { 365 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 366 return findAll(query, ids.size()); 367 } 368 369 @Override 370 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 371 List<DBObject> updates = converter.diffToBson(diff); 372 for (DBObject update : updates) { 373 DBObject query = new BasicDBObject(idKey, id); 374 if (changeTokenUpdater == null) { 375 if (log.isTraceEnabled()) { 376 log.trace("MongoDB: UPDATE " + id + ": " + update); 377 } 378 } else { 379 // assume bson is identical to dbs internals 380 // condition works even if value is null 381 Map<String, Serializable> conditions = changeTokenUpdater.getConditions(); 382 Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates(); 383 if (update.containsField(MONGODB_SET)) { 384 ((DBObject) update.get(MONGODB_SET)).putAll(tokenUpdates); 385 } else { 386 DBObject set = new BasicDBObject(); 387 set.putAll(tokenUpdates); 388 update.put(MONGODB_SET, set); 389 } 390 if (log.isTraceEnabled()) { 391 log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update); 392 } 393 query.putAll(conditions); 394 } 395 WriteResult w = coll.update(query, update); 396 if (w.getN() != 1) { 397 log.trace("MongoDB: -> CONCURRENT UPDATE: " + id); 398 throw new ConcurrentUpdateException(id); 399 } 400 // TODO dupe exception 401 // throw new DocumentException("Missing: " + id); 402 } 403 } 404 405 @Override 406 public void deleteStates(Set<String> ids) { 407 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 408 if (log.isTraceEnabled()) { 409 log.trace("MongoDB: REMOVE " + ids); 410 } 411 WriteResult w = coll.remove(query); 412 if (w.getN() != ids.size()) { 413 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 414 } 415 } 416 417 @Override 418 public State readChildState(String parentId, String name, Set<String> ignored) { 419 DBObject query = getChildQuery(parentId, name, ignored); 420 return findOne(query); 421 } 422 423 protected void logQuery(String id, DBObject fields) { 424 logQuery(new BasicDBObject(idKey, id), fields); 425 } 426 427 protected void logQuery(DBObject query, DBObject fields) { 428 if (fields == null) { 429 log.trace("MongoDB: QUERY " + query); 430 } else { 431 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 432 } 433 } 434 435 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 436 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 437 + " OFFSET " + offset + " LIMIT " + limit); 438 } 439 440 @Override 441 public boolean hasChild(String parentId, String name, Set<String> ignored) { 442 DBObject query = getChildQuery(parentId, name, ignored); 443 if (log.isTraceEnabled()) { 444 logQuery(query, justPresenceField()); 445 } 446 return coll.findOne(query, justPresenceField()) != null; 447 } 448 449 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 450 DBObject query = new BasicDBObject(); 451 query.put(KEY_PARENT_ID, parentId); 452 query.put(KEY_NAME, name); 453 addIgnoredIds(query, ignored); 454 return query; 455 } 456 457 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 458 if (!ignored.isEmpty()) { 459 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<>(ignored)); 460 query.put(idKey, notInIds); 461 } 462 } 463 464 @Override 465 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 466 DBObject query = new BasicDBObject(converter.keyToBson(key), value); 467 addIgnoredIds(query, ignored); 468 return findAll(query, 0); 469 } 470 471 @Override 472 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 473 DBObject query = new BasicDBObject(converter.keyToBson(key1), value1); 474 query.put(converter.keyToBson(key2), value2); 475 addIgnoredIds(query, ignored); 476 return findAll(query, 0); 477 } 478 479 @Override 480 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 481 Map<String, Object[]> targetProxies) { 482 DBObject query = new BasicDBObject(key, value); 483 DBObject fields = new BasicDBObject(); 484 if (useCustomId) { 485 fields.put(MONGODB_ID, ZERO); 486 } 487 fields.put(idKey, ONE); 488 fields.put(KEY_IS_PROXY, ONE); 489 fields.put(KEY_PROXY_TARGET_ID, ONE); 490 fields.put(KEY_PROXY_IDS, ONE); 491 if (log.isTraceEnabled()) { 492 logQuery(query, fields); 493 } 494 try (DBCursor cursor = coll.find(query, fields)) { 495 for (DBObject ob : cursor) { 496 String id = (String) ob.get(idKey); 497 ids.add(id); 498 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 499 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 500 proxyTargets.put(id, targetId); 501 } 502 if (targetProxies != null) { 503 Object[] proxyIds = (Object[]) converter.bsonToValue(ob.get(KEY_PROXY_IDS)); 504 if (proxyIds != null) { 505 targetProxies.put(id, proxyIds); 506 } 507 } 508 } 509 } 510 } 511 512 @Override 513 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 514 DBObject query = new BasicDBObject(key, value); 515 addIgnoredIds(query, ignored); 516 if (log.isTraceEnabled()) { 517 logQuery(query, justPresenceField()); 518 } 519 return coll.findOne(query, justPresenceField()) != null; 520 } 521 522 protected State findOne(DBObject query) { 523 if (log.isTraceEnabled()) { 524 logQuery(query, null); 525 } 526 return converter.bsonToState(coll.findOne(query)); 527 } 528 529 protected List<State> findAll(DBObject query, int sizeHint) { 530 if (log.isTraceEnabled()) { 531 logQuery(query, null); 532 } 533 Set<String> seen = new HashSet<>(); 534 try (DBCursor cursor = coll.find(query)) { 535 List<State> list = new ArrayList<>(sizeHint); 536 for (DBObject ob : cursor) { 537 if (!seen.add((String) ob.get(idKey))) { 538 // MongoDB cursors may return the same 539 // object several times 540 continue; 541 } 542 list.add(converter.bsonToState(ob)); 543 } 544 return list; 545 } 546 } 547 548 protected DBObject justPresenceField() { 549 return new BasicDBObject(MONGODB_ID, ONE); 550 } 551 552 @Override 553 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 554 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 555 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 556 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 557 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 558 builder.walk(); 559 if (builder.hasFulltext && isFulltextDisabled()) { 560 throw new QueryParseException("Fulltext search disabled by configuration"); 561 } 562 DBObject query = builder.getQuery(); 563 addPrincipals(query, evaluator.principals); 564 DBObject orderBy = builder.getOrderBy(); 565 DBObject keys = builder.getProjection(); 566 // Don't do manual projection if there are no projection wildcards, as this brings no new 567 // information and is costly. The only difference is several identical rows instead of one. 568 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 569 if (manualProjection) { 570 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 571 // so we need the full state from the database 572 keys = new BasicDBObject(); 573 evaluator.parse(); 574 } 575 576 if (log.isTraceEnabled()) { 577 logQuery(query, keys, orderBy, limit, offset); 578 } 579 580 List<Map<String, Serializable>> projections; 581 long totalSize; 582 try (DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit)) { 583 if (orderBy != null) { 584 cursor.sort(orderBy); 585 } 586 projections = new ArrayList<>(); 587 for (DBObject ob : cursor) { 588 State state = converter.bsonToState(ob); 589 if (manualProjection) { 590 projections.addAll(evaluator.matches(state)); 591 } else { 592 projections.add(DBSStateFlattener.flatten(state)); 593 } 594 } 595 if (countUpTo == -1) { 596 // count full size 597 if (limit == 0) { 598 totalSize = projections.size(); 599 } else { 600 totalSize = cursor.count(); 601 } 602 } else if (countUpTo == 0) { 603 // no count 604 totalSize = -1; // not counted 605 } else { 606 // count only if less than countUpTo 607 if (limit == 0) { 608 totalSize = projections.size(); 609 } else { 610 totalSize = cursor.copy().limit(countUpTo + 1).count(); 611 } 612 if (totalSize > countUpTo) { 613 totalSize = -2; // truncated 614 } 615 } 616 } 617 if (log.isTraceEnabled() && projections.size() != 0) { 618 log.trace("MongoDB: -> " + projections.size()); 619 } 620 return new PartialList<>(projections, totalSize); 621 } 622 623 @Override 624 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 625 cursorService.checkForTimedOutScroll(); 626 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 627 evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 628 builder.walk(); 629 if (builder.hasFulltext && isFulltextDisabled()) { 630 throw new QueryParseException("Fulltext search disabled by configuration"); 631 } 632 DBObject query = builder.getQuery(); 633 DBObject keys = builder.getProjection(); 634 if (log.isTraceEnabled()) { 635 logQuery(query, keys, null, 0, 0); 636 } 637 638 DBCursor cursor = coll.find(query, keys); 639 String scrollId = cursorService.registerCursor(cursor, batchSize, keepAliveSeconds); 640 return scroll(scrollId); 641 } 642 643 @Override 644 public ScrollResult scroll(String scrollId) { 645 return cursorService.scroll(scrollId, ob -> (String) ob.get(converter.keyToBson(KEY_ID))); 646 } 647 648 protected void addPrincipals(DBObject query, Set<String> principals) { 649 if (principals != null) { 650 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<>(principals)); 651 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 652 } 653 } 654 655 /** Keys used for document projection when marking all binaries for GC. */ 656 protected DBObject binaryKeys; 657 658 @Override 659 protected void initBlobsPaths() { 660 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 661 finder.visit(); 662 binaryKeys = finder.binaryKeys; 663 } 664 665 protected static class MongoDBBlobFinder extends BlobFinder { 666 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 667 668 @Override 669 protected void recordBlobPath() { 670 path.addLast(KEY_BLOB_DATA); 671 binaryKeys.put(StringUtils.join(path, "."), ONE); 672 path.removeLast(); 673 } 674 } 675 676 @Override 677 public void markReferencedBinaries() { 678 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 679 // TODO add a query to not scan all documents 680 if (log.isTraceEnabled()) { 681 logQuery(new BasicDBObject(), binaryKeys); 682 } 683 try (DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys)) { 684 for (DBObject ob : cursor) { 685 markReferencedBinaries(ob, blobManager); 686 } 687 } 688 } 689 690 protected void markReferencedBinaries(DBObject ob, DocumentBlobManager blobManager) { 691 for (String key : ob.keySet()) { 692 Object value = ob.get(key); 693 if (value instanceof List) { 694 @SuppressWarnings("unchecked") 695 List<Object> list = (List<Object>) value; 696 for (Object v : list) { 697 if (v instanceof DBObject) { 698 markReferencedBinaries((DBObject) v, blobManager); 699 } else { 700 markReferencedBinary(v, blobManager); 701 } 702 } 703 } else if (value instanceof Object[]) { 704 for (Object v : (Object[]) value) { 705 markReferencedBinary(v, blobManager); 706 } 707 } else if (value instanceof DBObject) { 708 markReferencedBinaries((DBObject) value, blobManager); 709 } else { 710 markReferencedBinary(value, blobManager); 711 } 712 } 713 } 714 715 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 716 if (!(value instanceof String)) { 717 return; 718 } 719 String key = (String) value; 720 blobManager.markReferencedBinary(key, repositoryName); 721 } 722 723 protected static final DBObject LOCK_FIELDS; 724 725 static { 726 LOCK_FIELDS = new BasicDBObject(); 727 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 728 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 729 } 730 731 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 732 733 @Override 734 public Lock getLock(String id) { 735 if (log.isTraceEnabled()) { 736 logQuery(id, LOCK_FIELDS); 737 } 738 DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 739 if (res == null) { 740 // document not found 741 throw new DocumentNotFoundException(id); 742 } 743 String owner = (String) res.get(KEY_LOCK_OWNER); 744 if (owner == null) { 745 // not locked 746 return null; 747 } 748 Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED)); 749 return new Lock(owner, created); 750 } 751 752 @Override 753 public Lock setLock(String id, Lock lock) { 754 DBObject query = new BasicDBObject(idKey, id); 755 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 756 DBObject setLock = new BasicDBObject(); 757 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 758 setLock.put(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())); 759 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 760 if (log.isTraceEnabled()) { 761 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 762 } 763 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 764 if (res != null) { 765 // found a doc to lock 766 return null; 767 } else { 768 // doc not found, or lock owner already set 769 // get the old lock 770 if (log.isTraceEnabled()) { 771 logQuery(id, LOCK_FIELDS); 772 } 773 DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 774 if (old == null) { 775 // document not found 776 throw new DocumentNotFoundException(id); 777 } 778 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 779 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 780 if (oldOwner != null) { 781 return new Lock(oldOwner, oldCreated); 782 } 783 // no lock -- there was a race condition 784 // TODO do better 785 throw new ConcurrentUpdateException("Lock " + id); 786 } 787 } 788 789 @Override 790 public Lock removeLock(String id, String owner) { 791 DBObject query = new BasicDBObject(idKey, id); 792 if (owner != null) { 793 // remove if owner matches or null 794 // implements LockManager.canLockBeRemoved inside MongoDB 795 Object ownerOrNull = Arrays.asList(owner, null); 796 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 797 } // else unconditional remove 798 // remove the lock 799 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 800 if (old != null) { 801 // found a doc and removed the lock, return previous lock 802 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 803 if (oldOwner == null) { 804 // was not locked 805 return null; 806 } else { 807 // return previous lock 808 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 809 return new Lock(oldOwner, oldCreated); 810 } 811 } else { 812 // doc not found, or lock owner didn't match 813 // get the old lock 814 if (log.isTraceEnabled()) { 815 logQuery(id, LOCK_FIELDS); 816 } 817 old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 818 if (old == null) { 819 // document not found 820 throw new DocumentNotFoundException(id); 821 } 822 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 823 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 824 if (oldOwner != null) { 825 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 826 // existing mismatched lock, flag failure 827 return new Lock(oldOwner, oldCreated, true); 828 } 829 // old owner should have matched -- there was a race condition 830 // TODO do better 831 throw new ConcurrentUpdateException("Unlock " + id); 832 } 833 // old owner null, should have matched -- there was a race condition 834 // TODO do better 835 throw new ConcurrentUpdateException("Unlock " + id); 836 } 837 } 838 839 @Override 840 public void closeLockManager() { 841 842 } 843 844 @Override 845 public void clearLockManagerCaches() { 846 } 847 848}