001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 045 046import java.io.Serializable; 047import java.net.UnknownHostException; 048import java.util.ArrayList; 049import java.util.Arrays; 050import java.util.Calendar; 051import java.util.HashSet; 052import java.util.List; 053import java.util.Map; 054import java.util.Set; 055import java.util.UUID; 056import java.util.concurrent.ConcurrentHashMap; 057import java.util.stream.Collectors; 058 059import javax.resource.spi.ConnectionManager; 060 061import com.mongodb.MongoClientOptions; 062import org.apache.commons.lang.StringUtils; 063import org.apache.commons.logging.Log; 064import org.apache.commons.logging.LogFactory; 065import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 066import org.nuxeo.ecm.core.api.DocumentNotFoundException; 067import org.nuxeo.ecm.core.api.Lock; 068import org.nuxeo.ecm.core.api.NuxeoException; 069import org.nuxeo.ecm.core.api.PartialList; 070import org.nuxeo.ecm.core.api.ScrollResult; 071import org.nuxeo.ecm.core.api.ScrollResultImpl; 072import org.nuxeo.ecm.core.blob.BlobManager; 073import org.nuxeo.ecm.core.model.LockManager; 074import org.nuxeo.ecm.core.model.Repository; 075import org.nuxeo.ecm.core.query.QueryParseException; 076import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 077import org.nuxeo.ecm.core.storage.State; 078import org.nuxeo.ecm.core.storage.State.StateDiff; 079import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 080import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 081import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 082import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 083import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 084import org.nuxeo.runtime.api.Framework; 085 086import com.mongodb.BasicDBObject; 087import com.mongodb.DB; 088import com.mongodb.DBCollection; 089import com.mongodb.DBCursor; 090import com.mongodb.DBObject; 091import com.mongodb.MongoClient; 092import com.mongodb.MongoClientURI; 093import com.mongodb.QueryOperators; 094import com.mongodb.ServerAddress; 095import com.mongodb.WriteResult; 096 097/** 098 * MongoDB implementation of a {@link Repository}. 099 * 100 * @since 5.9.4 101 */ 102public class MongoDBRepository extends DBSRepositoryBase { 103 104 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 105 106 public static final Long ZERO = Long.valueOf(0); 107 108 public static final Long ONE = Long.valueOf(1); 109 110 public static final Long MINUS_ONE = Long.valueOf(-1); 111 112 public static final String DB_DEFAULT = "nuxeo"; 113 114 public static final String MONGODB_ID = "_id"; 115 116 public static final String MONGODB_INC = "$inc"; 117 118 public static final String MONGODB_SET = "$set"; 119 120 public static final String MONGODB_UNSET = "$unset"; 121 122 public static final String MONGODB_PUSH = "$push"; 123 124 public static final String MONGODB_EACH = "$each"; 125 126 public static final String MONGODB_META = "$meta"; 127 128 public static final String MONGODB_TEXT_SCORE = "textScore"; 129 130 private static final String MONGODB_INDEX_TEXT = "text"; 131 132 private static final String MONGODB_INDEX_NAME = "name"; 133 134 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 135 136 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 137 138 private static final String LANGUAGE_FIELD = "__language"; 139 140 protected static final String COUNTER_NAME_UUID = "ecm:id"; 141 142 protected static final String COUNTER_FIELD = "seq"; 143 144 protected static final int MONGODB_OPTION_CONNECTION_TIMEOUT_MS = 30000; 145 146 protected static final int MONGODB_OPTION_SOCKET_TIMEOUT_MS = 60000; 147 148 protected MongoClient mongoClient; 149 150 protected DBCollection coll; 151 152 protected DBCollection countersColl; 153 154 /** The key to use to store the id in the database. */ 155 protected String idKey; 156 157 /** True if we don't use MongoDB's native "_id" key to store the id. */ 158 protected boolean useCustomId; 159 160 /** Number of values still available in the in-memory sequence. */ 161 protected long sequenceLeft; 162 163 /** Last value used from the in-memory sequence. */ 164 protected long sequenceLastValue; 165 166 /** Sequence allocation block size. */ 167 protected long sequenceBlockSize; 168 169 protected final MongoDBConverter converter; 170 171 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 172 173 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 174 super(cm, descriptor.name, descriptor); 175 try { 176 mongoClient = newMongoClient(descriptor); 177 coll = getCollection(descriptor, mongoClient); 178 countersColl = getCountersCollection(descriptor, mongoClient); 179 } catch (UnknownHostException e) { 180 throw new RuntimeException(e); 181 } 182 if (Boolean.TRUE.equals(descriptor.nativeId)) { 183 idKey = MONGODB_ID; 184 } else { 185 idKey = KEY_ID; 186 } 187 useCustomId = KEY_ID.equals(idKey); 188 if (idType == IdType.sequence || DEBUG_UUIDS) { 189 Integer sbs = descriptor.sequenceBlockSize; 190 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 191 sequenceLeft = 0; 192 } 193 converter = new MongoDBConverter(idKey); 194 initRepository(); 195 } 196 197 @Override 198 public List<IdType> getAllowedIdTypes() { 199 return Arrays.asList(IdType.varchar, IdType.sequence); 200 } 201 202 @Override 203 public void shutdown() { 204 super.shutdown(); 205 mongoClient.close(); 206 } 207 208 // used also by unit tests 209 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 210 MongoClient ret = null; 211 String server = descriptor.server; 212 if (StringUtils.isBlank(server)) { 213 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 214 } 215 MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder() 216 // Can help to prevent firewall disconnects inactive connection, option not available from URI 217 .socketKeepAlive(true) 218 // don't wait for ever by default, can be overridden using URI options 219 .connectTimeout(MONGODB_OPTION_CONNECTION_TIMEOUT_MS) 220 .socketTimeout(MONGODB_OPTION_SOCKET_TIMEOUT_MS) 221 .description("Nuxeo"); 222 if (server.startsWith("mongodb://")) { 223 // allow mongodb:// URI syntax for the server, to pass everything in one string 224 ret = new MongoClient(new MongoClientURI(server, optionsBuilder)); 225 } else { 226 ret = new MongoClient(new ServerAddress(server), optionsBuilder.build()); 227 } 228 if (log.isDebugEnabled()) { 229 log.debug("MongoClient initialized with options: " + ret.getMongoClientOptions().toString()); 230 } 231 return ret; 232 } 233 234 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 235 if (StringUtils.isBlank(dbname)) { 236 dbname = DB_DEFAULT; 237 } 238 DB db = mongoClient.getDB(dbname); 239 return db.getCollection(collection); 240 } 241 242 // used also by unit tests 243 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 244 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 245 } 246 247 // used also by unit tests 248 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 249 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 250 } 251 252 protected void initRepository() { 253 // create required indexes 254 // code does explicit queries on those 255 if (useCustomId) { 256 coll.createIndex(new BasicDBObject(idKey, ONE)); 257 } 258 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 259 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 260 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 261 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 262 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 263 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 264 DBObject parentChild = new BasicDBObject(); 265 parentChild.put(KEY_PARENT_ID, ONE); 266 parentChild.put(KEY_NAME, ONE); 267 coll.createIndex(parentChild); 268 // often used in user-generated queries 269 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 270 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 271 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 272 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 273 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 274 // TODO configure these from somewhere else 275 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 276 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 277 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 278 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 279 if (!isFulltextDisabled()) { 280 DBObject indexKeys = new BasicDBObject(); 281 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 282 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 283 DBObject indexOptions = new BasicDBObject(); 284 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 285 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 286 coll.createIndex(indexKeys, indexOptions); 287 } 288 // check root presence 289 DBObject query = new BasicDBObject(idKey, getRootId()); 290 if (coll.findOne(query, justPresenceField()) != null) { 291 return; 292 } 293 // create basic repository structure needed 294 if (idType == IdType.sequence || DEBUG_UUIDS) { 295 // create the id counter 296 DBObject idCounter = new BasicDBObject(); 297 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 298 idCounter.put(COUNTER_FIELD, ZERO); 299 countersColl.insert(idCounter); 300 } 301 initRoot(); 302 } 303 304 protected synchronized Long getNextSequenceId() { 305 if (sequenceLeft == 0) { 306 // allocate a new sequence block 307 // the database contains the last value from the last block 308 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 309 DBObject update = new BasicDBObject(MONGODB_INC, 310 new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize))); 311 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false); 312 if (idCounter == null) { 313 throw new NuxeoException("Repository id counter not initialized"); 314 } 315 sequenceLeft = sequenceBlockSize; 316 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 317 } 318 sequenceLeft--; 319 sequenceLastValue++; 320 return Long.valueOf(sequenceLastValue); 321 } 322 323 @Override 324 public String generateNewId() { 325 if (idType == IdType.sequence || DEBUG_UUIDS) { 326 Long id = getNextSequenceId(); 327 if (DEBUG_UUIDS) { 328 return "UUID_" + id; 329 } 330 return id.toString(); 331 } else { 332 return UUID.randomUUID().toString(); 333 } 334 } 335 336 @Override 337 public void createState(State state) { 338 DBObject ob = converter.stateToBson(state); 339 if (log.isTraceEnabled()) { 340 log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob); 341 } 342 coll.insert(ob); 343 // TODO dupe exception 344 // throw new DocumentException("Already exists: " + id); 345 } 346 347 @Override 348 public void createStates(List<State> states) { 349 List<DBObject> obs = states.stream().map(converter::stateToBson).collect(Collectors.toList()); 350 if (log.isTraceEnabled()) { 351 log.trace("MongoDB: CREATE [" 352 + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", ")) 353 + "]: " + obs); 354 } 355 coll.insert(obs); 356 } 357 358 @Override 359 public State readState(String id) { 360 DBObject query = new BasicDBObject(idKey, id); 361 return findOne(query); 362 } 363 364 @Override 365 public List<State> readStates(List<String> ids) { 366 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 367 return findAll(query, ids.size()); 368 } 369 370 @Override 371 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 372 List<DBObject> updates = converter.diffToBson(diff); 373 for (DBObject update : updates) { 374 DBObject query = new BasicDBObject(idKey, id); 375 if (changeTokenUpdater == null) { 376 if (log.isTraceEnabled()) { 377 log.trace("MongoDB: UPDATE " + id + ": " + update); 378 } 379 } else { 380 // assume bson is identical to dbs internals 381 // condition works even if value is null 382 Map<String, Serializable> conditions = changeTokenUpdater.getConditions(); 383 Map<String, Serializable> tokenUpdates = changeTokenUpdater.getUpdates(); 384 if (update.containsField(MONGODB_SET)) { 385 ((DBObject) update.get(MONGODB_SET)).putAll(tokenUpdates); 386 } else { 387 DBObject set = new BasicDBObject(); 388 set.putAll(tokenUpdates); 389 update.put(MONGODB_SET, set); 390 } 391 if (log.isTraceEnabled()) { 392 log.trace("MongoDB: UPDATE " + id + ": IF " + conditions + " THEN " + update); 393 } 394 query.putAll(conditions); 395 } 396 WriteResult w = coll.update(query, update); 397 if (w.getN() != 1) { 398 log.trace("MongoDB: -> CONCURRENT UPDATE: " + id); 399 throw new ConcurrentUpdateException(id); 400 } 401 // TODO dupe exception 402 // throw new DocumentException("Missing: " + id); 403 } 404 } 405 406 @Override 407 public void deleteStates(Set<String> ids) { 408 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 409 if (log.isTraceEnabled()) { 410 log.trace("MongoDB: REMOVE " + ids); 411 } 412 WriteResult w = coll.remove(query); 413 if (w.getN() != ids.size()) { 414 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 415 } 416 } 417 418 @Override 419 public State readChildState(String parentId, String name, Set<String> ignored) { 420 DBObject query = getChildQuery(parentId, name, ignored); 421 return findOne(query); 422 } 423 424 protected void logQuery(String id, DBObject fields) { 425 logQuery(new BasicDBObject(idKey, id), fields); 426 } 427 428 protected void logQuery(DBObject query, DBObject fields) { 429 if (fields == null) { 430 log.trace("MongoDB: QUERY " + query); 431 } else { 432 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 433 } 434 } 435 436 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 437 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 438 + " OFFSET " + offset + " LIMIT " + limit); 439 } 440 441 @Override 442 public boolean hasChild(String parentId, String name, Set<String> ignored) { 443 DBObject query = getChildQuery(parentId, name, ignored); 444 if (log.isTraceEnabled()) { 445 logQuery(query, justPresenceField()); 446 } 447 return coll.findOne(query, justPresenceField()) != null; 448 } 449 450 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 451 DBObject query = new BasicDBObject(); 452 query.put(KEY_PARENT_ID, parentId); 453 query.put(KEY_NAME, name); 454 addIgnoredIds(query, ignored); 455 return query; 456 } 457 458 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 459 if (!ignored.isEmpty()) { 460 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 461 query.put(idKey, notInIds); 462 } 463 } 464 465 @Override 466 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 467 DBObject query = new BasicDBObject(converter.keyToBson(key), value); 468 addIgnoredIds(query, ignored); 469 return findAll(query, 0); 470 } 471 472 @Override 473 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 474 DBObject query = new BasicDBObject(converter.keyToBson(key1), value1); 475 query.put(converter.keyToBson(key2), value2); 476 addIgnoredIds(query, ignored); 477 return findAll(query, 0); 478 } 479 480 @Override 481 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 482 Map<String, Object[]> targetProxies) { 483 DBObject query = new BasicDBObject(key, value); 484 DBObject fields = new BasicDBObject(); 485 if (useCustomId) { 486 fields.put(MONGODB_ID, ZERO); 487 } 488 fields.put(idKey, ONE); 489 fields.put(KEY_IS_PROXY, ONE); 490 fields.put(KEY_PROXY_TARGET_ID, ONE); 491 fields.put(KEY_PROXY_IDS, ONE); 492 if (log.isTraceEnabled()) { 493 logQuery(query, fields); 494 } 495 DBCursor cursor = coll.find(query, fields); 496 try { 497 for (DBObject ob : cursor) { 498 String id = (String) ob.get(idKey); 499 ids.add(id); 500 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 501 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 502 proxyTargets.put(id, targetId); 503 } 504 if (targetProxies != null) { 505 Object[] proxyIds = (Object[]) converter.bsonToValue(ob.get(KEY_PROXY_IDS)); 506 if (proxyIds != null) { 507 targetProxies.put(id, proxyIds); 508 } 509 } 510 } 511 } finally { 512 cursor.close(); 513 } 514 } 515 516 @Override 517 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 518 DBObject query = new BasicDBObject(key, value); 519 addIgnoredIds(query, ignored); 520 if (log.isTraceEnabled()) { 521 logQuery(query, justPresenceField()); 522 } 523 return coll.findOne(query, justPresenceField()) != null; 524 } 525 526 protected State findOne(DBObject query) { 527 if (log.isTraceEnabled()) { 528 logQuery(query, null); 529 } 530 return converter.bsonToState(coll.findOne(query)); 531 } 532 533 protected List<State> findAll(DBObject query, int sizeHint) { 534 if (log.isTraceEnabled()) { 535 logQuery(query, null); 536 } 537 DBCursor cursor = coll.find(query); 538 Set<String> seen = new HashSet<>(); 539 try { 540 List<State> list = new ArrayList<>(sizeHint); 541 for (DBObject ob : cursor) { 542 if (!seen.add((String) ob.get(idKey))) { 543 // MongoDB cursors may return the same 544 // object several times 545 continue; 546 } 547 list.add(converter.bsonToState(ob)); 548 } 549 return list; 550 } finally { 551 cursor.close(); 552 } 553 } 554 555 protected DBObject justPresenceField() { 556 return new BasicDBObject(MONGODB_ID, ONE); 557 } 558 559 @Override 560 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 561 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 562 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 563 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 564 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 565 builder.walk(); 566 if (builder.hasFulltext && isFulltextDisabled()) { 567 throw new QueryParseException("Fulltext search disabled by configuration"); 568 } 569 DBObject query = builder.getQuery(); 570 addPrincipals(query, evaluator.principals); 571 DBObject orderBy = builder.getOrderBy(); 572 DBObject keys = builder.getProjection(); 573 // Don't do manual projection if there are no projection wildcards, as this brings no new 574 // information and is costly. The only difference is several identical rows instead of one. 575 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 576 if (manualProjection) { 577 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 578 // so we need the full state from the database 579 keys = new BasicDBObject(); 580 evaluator.parse(); 581 } 582 583 if (log.isTraceEnabled()) { 584 logQuery(query, keys, orderBy, limit, offset); 585 } 586 587 List<Map<String, Serializable>> projections; 588 long totalSize; 589 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 590 try { 591 if (orderBy != null) { 592 cursor.sort(orderBy); 593 } 594 projections = new ArrayList<>(); 595 for (DBObject ob : cursor) { 596 State state = converter.bsonToState(ob); 597 if (manualProjection) { 598 projections.addAll(evaluator.matches(state)); 599 } else { 600 projections.add(DBSStateFlattener.flatten(state)); 601 } 602 } 603 if (countUpTo == -1) { 604 // count full size 605 if (limit == 0) { 606 totalSize = projections.size(); 607 } else { 608 totalSize = cursor.count(); 609 } 610 } else if (countUpTo == 0) { 611 // no count 612 totalSize = -1; // not counted 613 } else { 614 // count only if less than countUpTo 615 if (limit == 0) { 616 totalSize = projections.size(); 617 } else { 618 totalSize = cursor.copy().limit(countUpTo + 1).count(); 619 } 620 if (totalSize > countUpTo) { 621 totalSize = -2; // truncated 622 } 623 } 624 } finally { 625 cursor.close(); 626 } 627 if (log.isTraceEnabled() && projections.size() != 0) { 628 log.trace("MongoDB: -> " + projections.size()); 629 } 630 return new PartialList<>(projections, totalSize); 631 } 632 633 @Override 634 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 635 checkForTimedoutScroll(); 636 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 637 evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 638 builder.walk(); 639 if (builder.hasFulltext && isFulltextDisabled()) { 640 throw new QueryParseException("Fulltext search disabled by configuration"); 641 } 642 DBObject query = builder.getQuery(); 643 DBObject keys = builder.getProjection(); 644 if (log.isTraceEnabled()) { 645 logQuery(query, keys, null, 0, 0); 646 } 647 648 DBCursor cursor = coll.find(query, keys); 649 String scrollId = UUID.randomUUID().toString(); 650 registerCursor(scrollId, cursor, batchSize, keepAliveSeconds); 651 return scroll(scrollId); 652 } 653 654 protected void checkForTimedoutScroll() { 655 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 656 } 657 658 protected void registerCursor(String scrollId, DBCursor cursor, int batchSize, int keepAliveSeconds) { 659 cursorResults.put(scrollId, new CursorResult(cursor, batchSize, keepAliveSeconds)); 660 } 661 662 @Override 663 public ScrollResult scroll(String scrollId) { 664 CursorResult cursorResult = cursorResults.get(scrollId); 665 if (cursorResult == null) { 666 throw new NuxeoException("Unknown or timed out scrollId"); 667 } else if (cursorResult.timedOut(scrollId)) { 668 throw new NuxeoException("Timed out scrollId"); 669 } 670 cursorResult.touch(); 671 List<String> ids = new ArrayList<>(cursorResult.batchSize); 672 synchronized (cursorResult) { 673 if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) { 674 unregisterCursor(scrollId); 675 return emptyResult(); 676 } 677 while (ids.size() < cursorResult.batchSize) { 678 if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) { 679 cursorResult.close(); 680 break; 681 } else { 682 DBObject ob = cursorResult.cursor.next(); 683 String id = (String) ob.get(converter.keyToBson(KEY_ID)); 684 if (id != null) { 685 ids.add(id); 686 } else { 687 log.error("Got a document without id: " + ob); 688 } 689 } 690 } 691 } 692 return new ScrollResultImpl(scrollId, ids); 693 } 694 695 protected boolean unregisterCursor(String scrollId) { 696 CursorResult cursor = cursorResults.remove(scrollId); 697 if (cursor != null) { 698 cursor.close(); 699 return true; 700 } 701 return false; 702 } 703 704 protected void addPrincipals(DBObject query, Set<String> principals) { 705 if (principals != null) { 706 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 707 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 708 } 709 } 710 711 /** Keys used for document projection when marking all binaries for GC. */ 712 protected DBObject binaryKeys; 713 714 @Override 715 protected void initBlobsPaths() { 716 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 717 finder.visit(); 718 binaryKeys = finder.binaryKeys; 719 } 720 721 protected static class MongoDBBlobFinder extends BlobFinder { 722 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 723 724 @Override 725 protected void recordBlobPath() { 726 path.addLast(KEY_BLOB_DATA); 727 binaryKeys.put(StringUtils.join(path, "."), ONE); 728 path.removeLast(); 729 } 730 } 731 732 @Override 733 public void markReferencedBinaries() { 734 BlobManager blobManager = Framework.getService(BlobManager.class); 735 // TODO add a query to not scan all documents 736 if (log.isTraceEnabled()) { 737 logQuery(new BasicDBObject(), binaryKeys); 738 } 739 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 740 try { 741 for (DBObject ob : cursor) { 742 markReferencedBinaries(ob, blobManager); 743 } 744 } finally { 745 cursor.close(); 746 } 747 } 748 749 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 750 for (String key : ob.keySet()) { 751 Object value = ob.get(key); 752 if (value instanceof List) { 753 @SuppressWarnings("unchecked") 754 List<Object> list = (List<Object>) value; 755 for (Object v : list) { 756 if (v instanceof DBObject) { 757 markReferencedBinaries((DBObject) v, blobManager); 758 } else { 759 markReferencedBinary(v, blobManager); 760 } 761 } 762 } else if (value instanceof Object[]) { 763 for (Object v : (Object[]) value) { 764 markReferencedBinary(v, blobManager); 765 } 766 } else if (value instanceof DBObject) { 767 markReferencedBinaries((DBObject) value, blobManager); 768 } else { 769 markReferencedBinary(value, blobManager); 770 } 771 } 772 } 773 774 protected void markReferencedBinary(Object value, BlobManager blobManager) { 775 if (!(value instanceof String)) { 776 return; 777 } 778 String key = (String) value; 779 blobManager.markReferencedBinary(key, repositoryName); 780 } 781 782 protected static final DBObject LOCK_FIELDS; 783 784 static { 785 LOCK_FIELDS = new BasicDBObject(); 786 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 787 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 788 } 789 790 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 791 792 @Override 793 public Lock getLock(String id) { 794 if (log.isTraceEnabled()) { 795 logQuery(id, LOCK_FIELDS); 796 } 797 DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 798 if (res == null) { 799 // document not found 800 throw new DocumentNotFoundException(id); 801 } 802 String owner = (String) res.get(KEY_LOCK_OWNER); 803 if (owner == null) { 804 // not locked 805 return null; 806 } 807 Calendar created = (Calendar) converter.scalarToSerializable(res.get(KEY_LOCK_CREATED)); 808 return new Lock(owner, created); 809 } 810 811 @Override 812 public Lock setLock(String id, Lock lock) { 813 DBObject query = new BasicDBObject(idKey, id); 814 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 815 DBObject setLock = new BasicDBObject(); 816 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 817 setLock.put(KEY_LOCK_CREATED, converter.serializableToBson(lock.getCreated())); 818 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 819 if (log.isTraceEnabled()) { 820 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 821 } 822 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 823 if (res != null) { 824 // found a doc to lock 825 return null; 826 } else { 827 // doc not found, or lock owner already set 828 // get the old lock 829 if (log.isTraceEnabled()) { 830 logQuery(id, LOCK_FIELDS); 831 } 832 DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 833 if (old == null) { 834 // document not found 835 throw new DocumentNotFoundException(id); 836 } 837 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 838 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 839 if (oldOwner != null) { 840 return new Lock(oldOwner, oldCreated); 841 } 842 // no lock -- there was a race condition 843 // TODO do better 844 throw new ConcurrentUpdateException("Lock " + id); 845 } 846 } 847 848 @Override 849 public Lock removeLock(String id, String owner) { 850 DBObject query = new BasicDBObject(idKey, id); 851 if (owner != null) { 852 // remove if owner matches or null 853 // implements LockManager.canLockBeRemoved inside MongoDB 854 Object ownerOrNull = Arrays.asList(owner, null); 855 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 856 } // else unconditional remove 857 // remove the lock 858 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 859 if (old != null) { 860 // found a doc and removed the lock, return previous lock 861 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 862 if (oldOwner == null) { 863 // was not locked 864 return null; 865 } else { 866 // return previous lock 867 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 868 return new Lock(oldOwner, oldCreated); 869 } 870 } else { 871 // doc not found, or lock owner didn't match 872 // get the old lock 873 if (log.isTraceEnabled()) { 874 logQuery(id, LOCK_FIELDS); 875 } 876 old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 877 if (old == null) { 878 // document not found 879 throw new DocumentNotFoundException(id); 880 } 881 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 882 Calendar oldCreated = (Calendar) converter.scalarToSerializable(old.get(KEY_LOCK_CREATED)); 883 if (oldOwner != null) { 884 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 885 // existing mismatched lock, flag failure 886 return new Lock(oldOwner, oldCreated, true); 887 } 888 // old owner should have matched -- there was a race condition 889 // TODO do better 890 throw new ConcurrentUpdateException("Unlock " + id); 891 } 892 // old owner null, should have matched -- there was a race condition 893 // TODO do better 894 throw new ConcurrentUpdateException("Unlock " + id); 895 } 896 } 897 898 @Override 899 public void closeLockManager() { 900 901 } 902 903 @Override 904 public void clearLockManagerCaches() { 905 } 906 907 protected class CursorResult { 908 // Note that MongoDB cursor automatically timeout after 10 minutes of inactivity by default. 909 protected DBCursor cursor; 910 protected final int batchSize; 911 protected long lastCallTimestamp; 912 protected final int keepAliveSeconds; 913 914 public CursorResult(DBCursor cursor, int batchSize, int keepAliveSeconds) { 915 this.cursor = cursor; 916 this.batchSize = batchSize; 917 this.keepAliveSeconds = keepAliveSeconds; 918 lastCallTimestamp = System.currentTimeMillis(); 919 } 920 921 void touch() { 922 lastCallTimestamp = System.currentTimeMillis(); 923 } 924 925 boolean timedOut(String scrollId) { 926 long now = System.currentTimeMillis(); 927 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 928 if (unregisterCursor(scrollId)) { 929 log.warn("Scroll " + scrollId + " timed out"); 930 } 931 return true; 932 } 933 return false; 934 } 935 936 public void close() { 937 if (cursor != null) { 938 cursor.close(); 939 } 940 cursor = null; 941 } 942 } 943}