001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023import static org.nuxeo.ecm.core.storage.State.NOP; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 045import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 046 047import java.io.Serializable; 048import java.lang.reflect.Array; 049import java.net.UnknownHostException; 050import java.util.ArrayList; 051import java.util.Arrays; 052import java.util.Calendar; 053import java.util.Date; 054import java.util.HashSet; 055import java.util.List; 056import java.util.Map; 057import java.util.Map.Entry; 058import java.util.Set; 059import java.util.UUID; 060import java.util.concurrent.ConcurrentHashMap; 061import java.util.stream.Collectors; 062 063import javax.resource.spi.ConnectionManager; 064 065import com.mongodb.MongoClientOptions; 066import org.apache.commons.lang.StringUtils; 067import org.apache.commons.logging.Log; 068import org.apache.commons.logging.LogFactory; 069import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 070import org.nuxeo.ecm.core.api.DocumentNotFoundException; 071import org.nuxeo.ecm.core.api.Lock; 072import org.nuxeo.ecm.core.api.NuxeoException; 073import org.nuxeo.ecm.core.api.PartialList; 074import org.nuxeo.ecm.core.api.ScrollResult; 075import org.nuxeo.ecm.core.api.ScrollResultImpl; 076import org.nuxeo.ecm.core.api.model.Delta; 077import org.nuxeo.ecm.core.blob.BlobManager; 078import org.nuxeo.ecm.core.model.LockManager; 079import org.nuxeo.ecm.core.model.Repository; 080import org.nuxeo.ecm.core.query.QueryParseException; 081import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 082import org.nuxeo.ecm.core.storage.State; 083import org.nuxeo.ecm.core.storage.State.ListDiff; 084import org.nuxeo.ecm.core.storage.State.StateDiff; 085import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 086import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 087import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 088import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 089import org.nuxeo.runtime.api.Framework; 090 091import com.mongodb.BasicDBObject; 092import com.mongodb.DB; 093import com.mongodb.DBCollection; 094import com.mongodb.DBCursor; 095import com.mongodb.DBObject; 096import com.mongodb.MongoClient; 097import com.mongodb.MongoClientURI; 098import com.mongodb.QueryOperators; 099import com.mongodb.ServerAddress; 100import com.mongodb.WriteResult; 101 102/** 103 * MongoDB implementation of a {@link Repository}. 104 * 105 * @since 5.9.4 106 */ 107public class MongoDBRepository extends DBSRepositoryBase { 108 109 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 110 111 private static final Long ZERO = Long.valueOf(0); 112 113 private static final Long ONE = Long.valueOf(1); 114 115 private static final Long MINUS_ONE = Long.valueOf(-11); 116 117 public static final String DB_DEFAULT = "nuxeo"; 118 119 public static final String MONGODB_ID = "_id"; 120 121 public static final String MONGODB_INC = "$inc"; 122 123 public static final String MONGODB_SET = "$set"; 124 125 public static final String MONGODB_UNSET = "$unset"; 126 127 public static final String MONGODB_PUSH = "$push"; 128 129 public static final String MONGODB_EACH = "$each"; 130 131 public static final String MONGODB_META = "$meta"; 132 133 public static final String MONGODB_TEXT_SCORE = "textScore"; 134 135 private static final String MONGODB_INDEX_TEXT = "text"; 136 137 private static final String MONGODB_INDEX_NAME = "name"; 138 139 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 140 141 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 142 143 private static final String LANGUAGE_FIELD = "__language"; 144 145 protected static final String COUNTER_NAME_UUID = "ecm:id"; 146 147 protected static final String COUNTER_FIELD = "seq"; 148 149 protected static final int MONGODB_OPTION_CONNECTION_TIMEOUT_MS = 30000; 150 151 protected static final int MONGODB_OPTION_SOCKET_TIMEOUT_MS = 60000; 152 153 protected MongoClient mongoClient; 154 155 protected DBCollection coll; 156 157 protected DBCollection countersColl; 158 159 /** The key to use to store the id in the database. */ 160 protected String idKey; 161 162 /** True if we don't use MongoDB's native "_id" key to store the id. */ 163 protected boolean useCustomId; 164 165 /** Number of values still available in the in-memory sequence. */ 166 protected long sequenceLeft; 167 168 /** Last value used from the in-memory sequence. */ 169 protected long sequenceLastValue; 170 171 /** Sequence allocation block size. */ 172 protected long sequenceBlockSize; 173 174 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 175 176 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 177 super(cm, descriptor.name, descriptor); 178 try { 179 mongoClient = newMongoClient(descriptor); 180 coll = getCollection(descriptor, mongoClient); 181 countersColl = getCountersCollection(descriptor, mongoClient); 182 } catch (UnknownHostException e) { 183 throw new RuntimeException(e); 184 } 185 if (Boolean.TRUE.equals(descriptor.nativeId)) { 186 idKey = MONGODB_ID; 187 } else { 188 idKey = KEY_ID; 189 } 190 useCustomId = KEY_ID.equals(idKey); 191 if (idType == IdType.sequence || DEBUG_UUIDS) { 192 Integer sbs = descriptor.sequenceBlockSize; 193 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 194 sequenceLeft = 0; 195 } 196 initRepository(); 197 } 198 199 @Override 200 public List<IdType> getAllowedIdTypes() { 201 return Arrays.asList(IdType.varchar, IdType.sequence); 202 } 203 204 @Override 205 public void shutdown() { 206 super.shutdown(); 207 mongoClient.close(); 208 } 209 210 // used also by unit tests 211 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 212 MongoClient ret = null; 213 String server = descriptor.server; 214 if (StringUtils.isBlank(server)) { 215 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 216 } 217 MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder() 218 // Can help to prevent firewall disconnects inactive connection, option not available from URI 219 .socketKeepAlive(true) 220 // don't wait for ever by default, can be overridden using URI options 221 .connectTimeout(MONGODB_OPTION_CONNECTION_TIMEOUT_MS) 222 .socketTimeout(MONGODB_OPTION_SOCKET_TIMEOUT_MS) 223 .description("Nuxeo"); 224 if (server.startsWith("mongodb://")) { 225 // allow mongodb:// URI syntax for the server, to pass everything in one string 226 ret = new MongoClient(new MongoClientURI(server, optionsBuilder)); 227 } else { 228 ret = new MongoClient(new ServerAddress(server), optionsBuilder.build()); 229 } 230 if (log.isDebugEnabled()) { 231 log.debug("MongoClient initialized with options: " + ret.getMongoClientOptions().toString()); 232 } 233 return ret; 234 } 235 236 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 237 if (StringUtils.isBlank(dbname)) { 238 dbname = DB_DEFAULT; 239 } 240 DB db = mongoClient.getDB(dbname); 241 return db.getCollection(collection); 242 } 243 244 // used also by unit tests 245 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 246 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 247 } 248 249 // used also by unit tests 250 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 251 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 252 } 253 254 protected String keyToBson(String key) { 255 if (useCustomId) { 256 return key; 257 } else { 258 return KEY_ID.equals(key) ? idKey : key; 259 } 260 } 261 262 protected Object valueToBson(Object value) { 263 if (value instanceof State) { 264 return stateToBson((State) value); 265 } else if (value instanceof List) { 266 @SuppressWarnings("unchecked") 267 List<Object> values = (List<Object>) value; 268 return listToBson(values); 269 } else if (value instanceof Object[]) { 270 return listToBson(Arrays.asList((Object[]) value)); 271 } else { 272 return serializableToBson(value); 273 } 274 } 275 276 protected DBObject stateToBson(State state) { 277 DBObject ob = new BasicDBObject(); 278 for (Entry<String, Serializable> en : state.entrySet()) { 279 Object val = valueToBson(en.getValue()); 280 if (val != null) { 281 ob.put(keyToBson(en.getKey()), val); 282 } 283 } 284 return ob; 285 } 286 287 protected List<Object> listToBson(List<Object> values) { 288 ArrayList<Object> objects = new ArrayList<Object>(values.size()); 289 for (Object value : values) { 290 objects.add(valueToBson(value)); 291 } 292 return objects; 293 } 294 295 protected String bsonToKey(String key) { 296 if (useCustomId) { 297 return key; 298 } else { 299 return idKey.equals(key) ? KEY_ID : key; 300 } 301 } 302 303 protected State bsonToState(DBObject ob) { 304 if (ob == null) { 305 return null; 306 } 307 State state = new State(ob.keySet().size()); 308 for (String key : ob.keySet()) { 309 if (useCustomId && MONGODB_ID.equals(key)) { 310 // skip native id 311 continue; 312 } 313 state.put(bsonToKey(key), bsonToValue(ob.get(key))); 314 } 315 return state; 316 } 317 318 protected Serializable bsonToValue(Object value) { 319 if (value instanceof List) { 320 @SuppressWarnings("unchecked") 321 List<Object> list = (List<Object>) value; 322 if (list.isEmpty()) { 323 return null; 324 } else { 325 if (list.get(0) instanceof DBObject) { 326 List<Serializable> l = new ArrayList<>(list.size()); 327 for (Object el : list) { 328 l.add(bsonToState((DBObject) el)); 329 } 330 return (Serializable) l; 331 } else { 332 // turn the list into a properly-typed array 333 Class<?> klass = Object.class; 334 for (Object o : list) { 335 if (o != null) { 336 klass = scalarToSerializableClass(o.getClass()); 337 break; 338 } 339 } 340 Object[] ar = (Object[]) Array.newInstance(klass, list.size()); 341 int i = 0; 342 for (Object el : list) { 343 ar[i++] = scalarToSerializable(el); 344 } 345 return ar; 346 } 347 } 348 } else if (value instanceof DBObject) { 349 return bsonToState((DBObject) value); 350 } else { 351 return scalarToSerializable(value); 352 } 353 } 354 355 public static class Updates { 356 public BasicDBObject set = new BasicDBObject(); 357 358 public BasicDBObject unset = new BasicDBObject(); 359 360 public BasicDBObject push = new BasicDBObject(); 361 362 public BasicDBObject inc = new BasicDBObject(); 363 } 364 365 /** 366 * Constructs a list of MongoDB updates from the given {@link StateDiff}. 367 * <p> 368 * We need a list because some cases need two operations to avoid conflicts. 369 */ 370 protected List<DBObject> diffToBson(StateDiff diff) { 371 Updates updates = new Updates(); 372 diffToUpdates(diff, null, updates); 373 UpdateListBuilder builder = new UpdateListBuilder(); 374 for (Entry<String, Object> en : updates.set.entrySet()) { 375 builder.update(MONGODB_SET, en.getKey(), en.getValue()); 376 } 377 for (Entry<String, Object> en : updates.unset.entrySet()) { 378 builder.update(MONGODB_UNSET, en.getKey(), en.getValue()); 379 } 380 for (Entry<String, Object> en : updates.push.entrySet()) { 381 builder.update(MONGODB_PUSH, en.getKey(), en.getValue()); 382 } 383 for (Entry<String, Object> en : updates.inc.entrySet()) { 384 builder.update(MONGODB_INC, en.getKey(), en.getValue()); 385 } 386 return builder.updateList; 387 } 388 389 /** 390 * Update list builder to prevent several updates of the same field. 391 * <p> 392 * This happens if two operations act on two fields where one is a prefix of the other. 393 * <p> 394 * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837) 395 * 396 * @since 5.9.5 397 */ 398 protected static class UpdateListBuilder { 399 400 protected List<DBObject> updateList = new ArrayList<>(10); 401 402 protected DBObject update; 403 404 protected Set<String> prefixKeys; 405 406 protected Set<String> keys; 407 408 protected UpdateListBuilder() { 409 newUpdate(); 410 } 411 412 protected void update(String op, String key, Object value) { 413 checkForConflict(key); 414 DBObject map = (DBObject) update.get(op); 415 if (map == null) { 416 update.put(op, map = new BasicDBObject()); 417 } 418 map.put(key, value); 419 } 420 421 /** 422 * Checks if the key conflicts with one of the previous keys. 423 * <p> 424 * A conflict occurs if one key is equals to or is a prefix of the other. 425 */ 426 protected void checkForConflict(String key) { 427 List<String> pKeys = getPrefixKeys(key); 428 if (conflictKeys(key, pKeys)) { 429 newUpdate(); 430 } 431 prefixKeys.addAll(pKeys); 432 keys.add(key); 433 } 434 435 protected void newUpdate() { 436 updateList.add(update = new BasicDBObject()); 437 prefixKeys = new HashSet<>(); 438 keys = new HashSet<>(); 439 } 440 441 protected boolean conflictKeys(String key, List<String> subkeys) { 442 if (prefixKeys.contains(key)) { 443 return true; 444 } 445 for (String sk: subkeys) { 446 if (keys.contains(sk)) { 447 return true; 448 } 449 } 450 return false; 451 } 452 453 /** 454 * return a list of parents key 455 * foo.0.bar -> [foo, foo.0, foo.0.bar] 456 */ 457 protected List<String> getPrefixKeys(String key) { 458 List<String> ret = new ArrayList<>(10); 459 int i=0; 460 while ((i = key.indexOf('.', i)) > 0) { 461 ret.add(key.substring(0, i++)); 462 } 463 ret.add(key); 464 return ret; 465 } 466 467 } 468 469 protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) { 470 String elemPrefix = prefix == null ? "" : prefix + '.'; 471 for (Entry<String, Serializable> en : diff.entrySet()) { 472 String name = elemPrefix + en.getKey(); 473 Serializable value = en.getValue(); 474 if (value instanceof StateDiff) { 475 diffToUpdates((StateDiff) value, name, updates); 476 } else if (value instanceof ListDiff) { 477 diffToUpdates((ListDiff) value, name, updates); 478 } else if (value instanceof Delta) { 479 diffToUpdates((Delta) value, name, updates); 480 } else { 481 // not a diff 482 if (value == null) { 483 // for null values, beyond the space saving, 484 // it's important to unset the field instead of setting the value to null 485 // because $inc does not work on nulls but works on non-existent fields 486 updates.unset.put(name, ONE); 487 } else { 488 updates.set.put(name, valueToBson(value)); 489 } 490 } 491 } 492 } 493 494 protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) { 495 if (listDiff.diff != null) { 496 String elemPrefix = prefix == null ? "" : prefix + '.'; 497 int i = 0; 498 for (Object value : listDiff.diff) { 499 String name = elemPrefix + i; 500 if (value instanceof StateDiff) { 501 diffToUpdates((StateDiff) value, name, updates); 502 } else if (value != NOP) { 503 // set value 504 updates.set.put(name, valueToBson(value)); 505 } 506 i++; 507 } 508 } 509 if (listDiff.rpush != null) { 510 Object pushed; 511 if (listDiff.rpush.size() == 1) { 512 // no need to use $each for one element 513 pushed = valueToBson(listDiff.rpush.get(0)); 514 } else { 515 pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush)); 516 } 517 updates.push.put(prefix, pushed); 518 } 519 } 520 521 protected void diffToUpdates(Delta delta, String prefix, Updates updates) { 522 // MongoDB can $inc a field that doesn't exist, it's treated as 0 BUT it doesn't work on null 523 // so we ensure (in diffToUpdates) that we never store a null but remove the field instead 524 Object inc = valueToBson(delta.getDeltaValue()); 525 updates.inc.put(prefix, inc); 526 } 527 528 protected Object serializableToBson(Object value) { 529 if (value instanceof Calendar) { 530 return ((Calendar) value).getTime(); 531 } 532 return value; 533 } 534 535 protected Serializable scalarToSerializable(Object val) { 536 if (val instanceof Date) { 537 Calendar cal = Calendar.getInstance(); 538 cal.setTime((Date) val); 539 return cal; 540 } 541 return (Serializable) val; 542 } 543 544 protected Class<?> scalarToSerializableClass(Class<?> klass) { 545 if (Date.class.isAssignableFrom(klass)) { 546 return Calendar.class; 547 } 548 return klass; 549 } 550 551 protected void initRepository() { 552 // create required indexes 553 // code does explicit queries on those 554 if (useCustomId) { 555 coll.createIndex(new BasicDBObject(idKey, ONE)); 556 } 557 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 558 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 559 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 560 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 561 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 562 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 563 DBObject parentChild = new BasicDBObject(); 564 parentChild.put(KEY_PARENT_ID, ONE); 565 parentChild.put(KEY_NAME, ONE); 566 coll.createIndex(parentChild); 567 // often used in user-generated queries 568 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 569 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 570 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 571 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 572 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 573 // TODO configure these from somewhere else 574 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 575 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 576 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 577 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 578 if (!isFulltextDisabled()) { 579 DBObject indexKeys = new BasicDBObject(); 580 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 581 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 582 DBObject indexOptions = new BasicDBObject(); 583 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 584 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 585 coll.createIndex(indexKeys, indexOptions); 586 } 587 // check root presence 588 DBObject query = new BasicDBObject(idKey, getRootId()); 589 if (coll.findOne(query, justPresenceField()) != null) { 590 return; 591 } 592 // create basic repository structure needed 593 if (idType == IdType.sequence || DEBUG_UUIDS) { 594 // create the id counter 595 DBObject idCounter = new BasicDBObject(); 596 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 597 idCounter.put(COUNTER_FIELD, ZERO); 598 countersColl.insert(idCounter); 599 } 600 initRoot(); 601 } 602 603 protected synchronized Long getNextSequenceId() { 604 if (sequenceLeft == 0) { 605 // allocate a new sequence block 606 // the database contains the last value from the last block 607 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 608 DBObject update = new BasicDBObject(MONGODB_INC, 609 new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize))); 610 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false); 611 if (idCounter == null) { 612 throw new NuxeoException("Repository id counter not initialized"); 613 } 614 sequenceLeft = sequenceBlockSize; 615 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 616 } 617 sequenceLeft--; 618 sequenceLastValue++; 619 return Long.valueOf(sequenceLastValue); 620 } 621 622 @Override 623 public String generateNewId() { 624 if (idType == IdType.sequence || DEBUG_UUIDS) { 625 Long id = getNextSequenceId(); 626 if (DEBUG_UUIDS) { 627 return "UUID_" + id; 628 } 629 return id.toString(); 630 } else { 631 return UUID.randomUUID().toString(); 632 } 633 } 634 635 @Override 636 public void createState(State state) { 637 DBObject ob = stateToBson(state); 638 if (log.isTraceEnabled()) { 639 log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob); 640 } 641 coll.insert(ob); 642 // TODO dupe exception 643 // throw new DocumentException("Already exists: " + id); 644 } 645 646 @Override 647 public void createStates(List<State> states) { 648 List<DBObject> obs = states.stream().map(this::stateToBson).collect(Collectors.toList()); 649 if (log.isTraceEnabled()) { 650 log.trace("MongoDB: CREATE [" 651 + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", ")) 652 + "]: " + obs); 653 } 654 coll.insert(obs); 655 } 656 657 @Override 658 public State readState(String id) { 659 DBObject query = new BasicDBObject(idKey, id); 660 return findOne(query); 661 } 662 663 @Override 664 public List<State> readStates(List<String> ids) { 665 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 666 return findAll(query, ids.size()); 667 } 668 669 @Override 670 public void updateState(String id, StateDiff diff) { 671 DBObject query = new BasicDBObject(idKey, id); 672 for (DBObject update : diffToBson(diff)) { 673 if (log.isTraceEnabled()) { 674 log.trace("MongoDB: UPDATE " + id + ": " + update); 675 } 676 coll.update(query, update); 677 // TODO dupe exception 678 // throw new DocumentException("Missing: " + id); 679 } 680 } 681 682 @Override 683 public void deleteStates(Set<String> ids) { 684 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 685 if (log.isTraceEnabled()) { 686 log.trace("MongoDB: REMOVE " + ids); 687 } 688 WriteResult w = coll.remove(query); 689 if (w.getN() != ids.size()) { 690 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 691 } 692 } 693 694 @Override 695 public State readChildState(String parentId, String name, Set<String> ignored) { 696 DBObject query = getChildQuery(parentId, name, ignored); 697 return findOne(query); 698 } 699 700 protected void logQuery(String id, DBObject fields) { 701 logQuery(new BasicDBObject(idKey, id), fields); 702 } 703 704 protected void logQuery(DBObject query, DBObject fields) { 705 if (fields == null) { 706 log.trace("MongoDB: QUERY " + query); 707 } else { 708 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 709 } 710 } 711 712 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 713 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 714 + " OFFSET " + offset + " LIMIT " + limit); 715 } 716 717 @Override 718 public boolean hasChild(String parentId, String name, Set<String> ignored) { 719 DBObject query = getChildQuery(parentId, name, ignored); 720 if (log.isTraceEnabled()) { 721 logQuery(query, justPresenceField()); 722 } 723 return coll.findOne(query, justPresenceField()) != null; 724 } 725 726 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 727 DBObject query = new BasicDBObject(); 728 query.put(KEY_PARENT_ID, parentId); 729 query.put(KEY_NAME, name); 730 addIgnoredIds(query, ignored); 731 return query; 732 } 733 734 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 735 if (!ignored.isEmpty()) { 736 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 737 query.put(idKey, notInIds); 738 } 739 } 740 741 @Override 742 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 743 DBObject query = new BasicDBObject(keyToBson(key), value); 744 addIgnoredIds(query, ignored); 745 return findAll(query, 0); 746 } 747 748 @Override 749 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 750 DBObject query = new BasicDBObject(keyToBson(key1), value1); 751 query.put(keyToBson(key2), value2); 752 addIgnoredIds(query, ignored); 753 return findAll(query, 0); 754 } 755 756 @Override 757 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 758 Map<String, Object[]> targetProxies) { 759 DBObject query = new BasicDBObject(key, value); 760 DBObject fields = new BasicDBObject(); 761 if (useCustomId) { 762 fields.put(MONGODB_ID, ZERO); 763 } 764 fields.put(idKey, ONE); 765 fields.put(KEY_IS_PROXY, ONE); 766 fields.put(KEY_PROXY_TARGET_ID, ONE); 767 fields.put(KEY_PROXY_IDS, ONE); 768 if (log.isTraceEnabled()) { 769 logQuery(query, fields); 770 } 771 DBCursor cursor = coll.find(query, fields); 772 try { 773 for (DBObject ob : cursor) { 774 String id = (String) ob.get(idKey); 775 ids.add(id); 776 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 777 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 778 proxyTargets.put(id, targetId); 779 } 780 if (targetProxies != null) { 781 Object[] proxyIds = (Object[]) bsonToValue(ob.get(KEY_PROXY_IDS)); 782 if (proxyIds != null) { 783 targetProxies.put(id, proxyIds); 784 } 785 } 786 } 787 } finally { 788 cursor.close(); 789 } 790 } 791 792 @Override 793 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 794 DBObject query = new BasicDBObject(key, value); 795 addIgnoredIds(query, ignored); 796 if (log.isTraceEnabled()) { 797 logQuery(query, justPresenceField()); 798 } 799 return coll.findOne(query, justPresenceField()) != null; 800 } 801 802 protected State findOne(DBObject query) { 803 if (log.isTraceEnabled()) { 804 logQuery(query, null); 805 } 806 return bsonToState(coll.findOne(query)); 807 } 808 809 protected List<State> findAll(DBObject query, int sizeHint) { 810 if (log.isTraceEnabled()) { 811 logQuery(query, null); 812 } 813 DBCursor cursor = coll.find(query); 814 Set<String> seen = new HashSet<>(); 815 try { 816 List<State> list = new ArrayList<>(sizeHint); 817 for (DBObject ob : cursor) { 818 if (!seen.add((String) ob.get(idKey))) { 819 // MongoDB cursors may return the same 820 // object several times 821 continue; 822 } 823 list.add(bsonToState(ob)); 824 } 825 return list; 826 } finally { 827 cursor.close(); 828 } 829 } 830 831 protected DBObject justPresenceField() { 832 return new BasicDBObject(MONGODB_ID, ONE); 833 } 834 835 @Override 836 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 837 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 838 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 839 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 840 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 841 builder.walk(); 842 if (builder.hasFulltext && isFulltextDisabled()) { 843 throw new QueryParseException("Fulltext search disabled by configuration"); 844 } 845 DBObject query = builder.getQuery(); 846 addPrincipals(query, evaluator.principals); 847 DBObject orderBy = builder.getOrderBy(); 848 DBObject keys = builder.getProjection(); 849 // Don't do manual projection if there are no projection wildcards, as this brings no new 850 // information and is costly. The only difference is several identical rows instead of one. 851 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 852 if (manualProjection) { 853 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 854 // so we need the full state from the database 855 keys = new BasicDBObject(); 856 evaluator.parse(); 857 } 858 859 if (log.isTraceEnabled()) { 860 logQuery(query, keys, orderBy, limit, offset); 861 } 862 863 List<Map<String, Serializable>> projections; 864 long totalSize; 865 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 866 try { 867 if (orderBy != null) { 868 cursor.sort(orderBy); 869 } 870 projections = new ArrayList<>(); 871 for (DBObject ob : cursor) { 872 State state = bsonToState(ob); 873 if (manualProjection) { 874 projections.addAll(evaluator.matches(state)); 875 } else { 876 projections.add(DBSStateFlattener.flatten(state)); 877 } 878 } 879 if (countUpTo == -1) { 880 // count full size 881 if (limit == 0) { 882 totalSize = projections.size(); 883 } else { 884 totalSize = cursor.count(); 885 } 886 } else if (countUpTo == 0) { 887 // no count 888 totalSize = -1; // not counted 889 } else { 890 // count only if less than countUpTo 891 if (limit == 0) { 892 totalSize = projections.size(); 893 } else { 894 totalSize = cursor.copy().limit(countUpTo + 1).count(); 895 } 896 if (totalSize > countUpTo) { 897 totalSize = -2; // truncated 898 } 899 } 900 } finally { 901 cursor.close(); 902 } 903 if (log.isTraceEnabled() && projections.size() != 0) { 904 log.trace("MongoDB: -> " + projections.size()); 905 } 906 return new PartialList<>(projections, totalSize); 907 } 908 909 @Override 910 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 911 checkForTimedoutScroll(); 912 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 913 evaluator.getSelectClause(), null, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 914 builder.walk(); 915 if (builder.hasFulltext && isFulltextDisabled()) { 916 throw new QueryParseException("Fulltext search disabled by configuration"); 917 } 918 DBObject query = builder.getQuery(); 919 DBObject keys = builder.getProjection(); 920 if (log.isTraceEnabled()) { 921 logQuery(query, keys, null, 0, 0); 922 } 923 924 DBCursor cursor = coll.find(query, keys); 925 String scrollId = UUID.randomUUID().toString(); 926 registerCursor(scrollId, cursor, batchSize, keepAliveSeconds); 927 return scroll(scrollId); 928 } 929 930 protected void checkForTimedoutScroll() { 931 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 932 } 933 934 protected void registerCursor(String scrollId, DBCursor cursor, int batchSize, int keepAliveSeconds) { 935 cursorResults.put(scrollId, new CursorResult(cursor, batchSize, keepAliveSeconds)); 936 } 937 938 @Override 939 public ScrollResult scroll(String scrollId) { 940 CursorResult cursorResult = cursorResults.get(scrollId); 941 if (cursorResult == null) { 942 throw new NuxeoException("Unknown or timed out scrollId"); 943 } else if (cursorResult.timedOut(scrollId)) { 944 throw new NuxeoException("Timed out scrollId"); 945 } 946 cursorResult.touch(); 947 List<String> ids = new ArrayList<>(cursorResult.batchSize); 948 synchronized (cursorResult) { 949 if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) { 950 unregisterCursor(scrollId); 951 return emptyResult(); 952 } 953 while (ids.size() < cursorResult.batchSize) { 954 if (cursorResult.cursor == null || !cursorResult.cursor.hasNext()) { 955 cursorResult.close(); 956 break; 957 } else { 958 DBObject ob = cursorResult.cursor.next(); 959 String id = (String) ob.get(keyToBson(KEY_ID)); 960 if (id != null) { 961 ids.add(id); 962 } else { 963 log.error("Got a document without id: " + ob); 964 } 965 } 966 } 967 } 968 return new ScrollResultImpl(scrollId, ids); 969 } 970 971 protected boolean unregisterCursor(String scrollId) { 972 CursorResult cursor = cursorResults.remove(scrollId); 973 if (cursor != null) { 974 cursor.close(); 975 return true; 976 } 977 return false; 978 } 979 980 protected void addPrincipals(DBObject query, Set<String> principals) { 981 if (principals != null) { 982 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 983 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 984 } 985 } 986 987 /** Keys used for document projection when marking all binaries for GC. */ 988 protected DBObject binaryKeys; 989 990 @Override 991 protected void initBlobsPaths() { 992 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 993 finder.visit(); 994 binaryKeys = finder.binaryKeys; 995 } 996 997 protected static class MongoDBBlobFinder extends BlobFinder { 998 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 999 1000 @Override 1001 protected void recordBlobPath() { 1002 path.addLast(KEY_BLOB_DATA); 1003 binaryKeys.put(StringUtils.join(path, "."), ONE); 1004 path.removeLast(); 1005 } 1006 } 1007 1008 @Override 1009 public void markReferencedBinaries() { 1010 BlobManager blobManager = Framework.getService(BlobManager.class); 1011 // TODO add a query to not scan all documents 1012 if (log.isTraceEnabled()) { 1013 logQuery(new BasicDBObject(), binaryKeys); 1014 } 1015 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 1016 try { 1017 for (DBObject ob : cursor) { 1018 markReferencedBinaries(ob, blobManager); 1019 } 1020 } finally { 1021 cursor.close(); 1022 } 1023 } 1024 1025 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 1026 for (String key : ob.keySet()) { 1027 Object value = ob.get(key); 1028 if (value instanceof List) { 1029 @SuppressWarnings("unchecked") 1030 List<Object> list = (List<Object>) value; 1031 for (Object v : list) { 1032 if (v instanceof DBObject) { 1033 markReferencedBinaries((DBObject) v, blobManager); 1034 } else { 1035 markReferencedBinary(v, blobManager); 1036 } 1037 } 1038 } else if (value instanceof Object[]) { 1039 for (Object v : (Object[]) value) { 1040 markReferencedBinary(v, blobManager); 1041 } 1042 } else if (value instanceof DBObject) { 1043 markReferencedBinaries((DBObject) value, blobManager); 1044 } else { 1045 markReferencedBinary(value, blobManager); 1046 } 1047 } 1048 } 1049 1050 protected void markReferencedBinary(Object value, BlobManager blobManager) { 1051 if (!(value instanceof String)) { 1052 return; 1053 } 1054 String key = (String) value; 1055 blobManager.markReferencedBinary(key, repositoryName); 1056 } 1057 1058 protected static final DBObject LOCK_FIELDS; 1059 1060 static { 1061 LOCK_FIELDS = new BasicDBObject(); 1062 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 1063 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 1064 } 1065 1066 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 1067 1068 @Override 1069 public Lock getLock(String id) { 1070 if (log.isTraceEnabled()) { 1071 logQuery(id, LOCK_FIELDS); 1072 } 1073 DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 1074 if (res == null) { 1075 // document not found 1076 throw new DocumentNotFoundException(id); 1077 } 1078 String owner = (String) res.get(KEY_LOCK_OWNER); 1079 if (owner == null) { 1080 // not locked 1081 return null; 1082 } 1083 Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED)); 1084 return new Lock(owner, created); 1085 } 1086 1087 @Override 1088 public Lock setLock(String id, Lock lock) { 1089 DBObject query = new BasicDBObject(idKey, id); 1090 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 1091 DBObject setLock = new BasicDBObject(); 1092 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 1093 setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated())); 1094 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 1095 if (log.isTraceEnabled()) { 1096 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 1097 } 1098 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 1099 if (res != null) { 1100 // found a doc to lock 1101 return null; 1102 } else { 1103 // doc not found, or lock owner already set 1104 // get the old lock 1105 if (log.isTraceEnabled()) { 1106 logQuery(id, LOCK_FIELDS); 1107 } 1108 DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 1109 if (old == null) { 1110 // document not found 1111 throw new DocumentNotFoundException(id); 1112 } 1113 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1114 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1115 if (oldOwner != null) { 1116 return new Lock(oldOwner, oldCreated); 1117 } 1118 // no lock -- there was a race condition 1119 // TODO do better 1120 throw new ConcurrentUpdateException("Lock " + id); 1121 } 1122 } 1123 1124 @Override 1125 public Lock removeLock(String id, String owner) { 1126 DBObject query = new BasicDBObject(idKey, id); 1127 if (owner != null) { 1128 // remove if owner matches or null 1129 // implements LockManager.canLockBeRemoved inside MongoDB 1130 Object ownerOrNull = Arrays.asList(owner, null); 1131 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 1132 } // else unconditional remove 1133 // remove the lock 1134 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 1135 if (old != null) { 1136 // found a doc and removed the lock, return previous lock 1137 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1138 if (oldOwner == null) { 1139 // was not locked 1140 return null; 1141 } else { 1142 // return previous lock 1143 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1144 return new Lock(oldOwner, oldCreated); 1145 } 1146 } else { 1147 // doc not found, or lock owner didn't match 1148 // get the old lock 1149 if (log.isTraceEnabled()) { 1150 logQuery(id, LOCK_FIELDS); 1151 } 1152 old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 1153 if (old == null) { 1154 // document not found 1155 throw new DocumentNotFoundException(id); 1156 } 1157 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1158 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1159 if (oldOwner != null) { 1160 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 1161 // existing mismatched lock, flag failure 1162 return new Lock(oldOwner, oldCreated, true); 1163 } 1164 // old owner should have matched -- there was a race condition 1165 // TODO do better 1166 throw new ConcurrentUpdateException("Unlock " + id); 1167 } 1168 // old owner null, should have matched -- there was a race condition 1169 // TODO do better 1170 throw new ConcurrentUpdateException("Unlock " + id); 1171 } 1172 } 1173 1174 @Override 1175 public void closeLockManager() { 1176 1177 } 1178 1179 @Override 1180 public void clearLockManagerCaches() { 1181 } 1182 1183 protected class CursorResult { 1184 // Note that MongoDB cursor automatically timeout after 10 minutes of inactivity by default. 1185 protected DBCursor cursor; 1186 protected final int batchSize; 1187 protected long lastCallTimestamp; 1188 protected final int keepAliveSeconds; 1189 1190 public CursorResult(DBCursor cursor, int batchSize, int keepAliveSeconds) { 1191 this.cursor = cursor; 1192 this.batchSize = batchSize; 1193 this.keepAliveSeconds = keepAliveSeconds; 1194 lastCallTimestamp = System.currentTimeMillis(); 1195 } 1196 1197 void touch() { 1198 lastCallTimestamp = System.currentTimeMillis(); 1199 } 1200 1201 boolean timedOut(String scrollId) { 1202 long now = System.currentTimeMillis(); 1203 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 1204 if (unregisterCursor(scrollId)) { 1205 log.warn("Scroll " + scrollId + " timed out"); 1206 } 1207 return true; 1208 } 1209 return false; 1210 } 1211 1212 public void close() { 1213 if (cursor != null) { 1214 cursor.close(); 1215 } 1216 cursor = null; 1217 } 1218 } 1219}