001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 045 046import java.io.Serializable; 047import java.lang.reflect.Array; 048import java.net.UnknownHostException; 049import java.util.ArrayList; 050import java.util.Arrays; 051import java.util.Calendar; 052import java.util.Date; 053import java.util.HashSet; 054import java.util.List; 055import java.util.Map; 056import java.util.Map.Entry; 057import java.util.Set; 058import java.util.UUID; 059import java.util.stream.Collectors; 060 061import javax.resource.spi.ConnectionManager; 062 063import org.apache.commons.lang.StringUtils; 064import org.apache.commons.logging.Log; 065import org.apache.commons.logging.LogFactory; 066import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 067import org.nuxeo.ecm.core.api.DocumentNotFoundException; 068import org.nuxeo.ecm.core.api.Lock; 069import org.nuxeo.ecm.core.api.NuxeoException; 070import org.nuxeo.ecm.core.api.PartialList; 071import org.nuxeo.ecm.core.api.model.Delta; 072import org.nuxeo.ecm.core.blob.BlobManager; 073import org.nuxeo.ecm.core.model.LockManager; 074import org.nuxeo.ecm.core.model.Repository; 075import org.nuxeo.ecm.core.query.QueryParseException; 076import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 077import org.nuxeo.ecm.core.storage.State; 078import org.nuxeo.ecm.core.storage.State.ListDiff; 079import org.nuxeo.ecm.core.storage.State.StateDiff; 080import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 081import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 082import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 083import org.nuxeo.ecm.core.storage.dbs.DBSStateFlattener; 084import org.nuxeo.runtime.api.Framework; 085 086import com.mongodb.BasicDBObject; 087import com.mongodb.DB; 088import com.mongodb.DBCollection; 089import com.mongodb.DBCursor; 090import com.mongodb.DBObject; 091import com.mongodb.MongoClient; 092import com.mongodb.MongoClientURI; 093import com.mongodb.QueryOperators; 094import com.mongodb.ServerAddress; 095import com.mongodb.WriteResult; 096 097/** 098 * MongoDB implementation of a {@link Repository}. 099 * 100 * @since 5.9.4 101 */ 102public class MongoDBRepository extends DBSRepositoryBase { 103 104 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 105 106 private static final Long ZERO = Long.valueOf(0); 107 108 private static final Long ONE = Long.valueOf(1); 109 110 private static final Long MINUS_ONE = Long.valueOf(-11); 111 112 public static final String DB_DEFAULT = "nuxeo"; 113 114 public static final String MONGODB_ID = "_id"; 115 116 public static final String MONGODB_INC = "$inc"; 117 118 public static final String MONGODB_SET = "$set"; 119 120 public static final String MONGODB_UNSET = "$unset"; 121 122 public static final String MONGODB_PUSH = "$push"; 123 124 public static final String MONGODB_EACH = "$each"; 125 126 public static final String MONGODB_META = "$meta"; 127 128 public static final String MONGODB_TEXT_SCORE = "textScore"; 129 130 private static final String MONGODB_INDEX_TEXT = "text"; 131 132 private static final String MONGODB_INDEX_NAME = "name"; 133 134 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 135 136 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 137 138 private static final String LANGUAGE_FIELD = "__language"; 139 140 protected static final String COUNTER_NAME_UUID = "ecm:id"; 141 142 protected static final String COUNTER_FIELD = "seq"; 143 144 protected MongoClient mongoClient; 145 146 protected DBCollection coll; 147 148 protected DBCollection countersColl; 149 150 /** The key to use to store the id in the database. */ 151 protected String idKey; 152 153 /** True if we don't use MongoDB's native "_id" key to store the id. */ 154 protected boolean useCustomId; 155 156 /** Number of values still available in the in-memory sequence. */ 157 protected long sequenceLeft; 158 159 /** Last value used from the in-memory sequence. */ 160 protected long sequenceLastValue; 161 162 /** Sequence allocation block size. */ 163 protected long sequenceBlockSize; 164 165 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 166 super(cm, descriptor.name, descriptor); 167 try { 168 mongoClient = newMongoClient(descriptor); 169 coll = getCollection(descriptor, mongoClient); 170 countersColl = getCountersCollection(descriptor, mongoClient); 171 } catch (UnknownHostException e) { 172 throw new RuntimeException(e); 173 } 174 if (Boolean.TRUE.equals(descriptor.nativeId)) { 175 idKey = MONGODB_ID; 176 } else { 177 idKey = KEY_ID; 178 } 179 useCustomId = KEY_ID.equals(idKey); 180 if (idType == IdType.sequence || DEBUG_UUIDS) { 181 Integer sbs = descriptor.sequenceBlockSize; 182 sequenceBlockSize = sbs == null ? 1 : sbs.longValue(); 183 sequenceLeft = 0; 184 } 185 initRepository(); 186 } 187 188 @Override 189 public List<IdType> getAllowedIdTypes() { 190 return Arrays.asList(IdType.varchar, IdType.sequence); 191 } 192 193 @Override 194 public void shutdown() { 195 super.shutdown(); 196 mongoClient.close(); 197 } 198 199 // used also by unit tests 200 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 201 String server = descriptor.server; 202 if (StringUtils.isBlank(server)) { 203 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 204 } 205 if (server.startsWith("mongodb://")) { 206 // allow mongodb:// URI syntax for the server, to pass everything in one string 207 return new MongoClient(new MongoClientURI(server)); 208 } else { 209 return new MongoClient(new ServerAddress(server)); 210 } 211 } 212 213 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 214 if (StringUtils.isBlank(dbname)) { 215 dbname = DB_DEFAULT; 216 } 217 DB db = mongoClient.getDB(dbname); 218 return db.getCollection(collection); 219 } 220 221 // used also by unit tests 222 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 223 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 224 } 225 226 // used also by unit tests 227 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 228 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 229 } 230 231 protected String keyToBson(String key) { 232 if (useCustomId) { 233 return key; 234 } else { 235 return KEY_ID.equals(key) ? idKey : key; 236 } 237 } 238 239 protected Object valueToBson(Object value) { 240 if (value instanceof State) { 241 return stateToBson((State) value); 242 } else if (value instanceof List) { 243 @SuppressWarnings("unchecked") 244 List<Object> values = (List<Object>) value; 245 return listToBson(values); 246 } else if (value instanceof Object[]) { 247 return listToBson(Arrays.asList((Object[]) value)); 248 } else { 249 return serializableToBson(value); 250 } 251 } 252 253 protected DBObject stateToBson(State state) { 254 DBObject ob = new BasicDBObject(); 255 for (Entry<String, Serializable> en : state.entrySet()) { 256 Object val = valueToBson(en.getValue()); 257 if (val != null) { 258 ob.put(keyToBson(en.getKey()), val); 259 } 260 } 261 return ob; 262 } 263 264 protected List<Object> listToBson(List<Object> values) { 265 ArrayList<Object> objects = new ArrayList<Object>(values.size()); 266 for (Object value : values) { 267 objects.add(valueToBson(value)); 268 } 269 return objects; 270 } 271 272 protected String bsonToKey(String key) { 273 if (useCustomId) { 274 return key; 275 } else { 276 return idKey.equals(key) ? KEY_ID : key; 277 } 278 } 279 280 protected State bsonToState(DBObject ob) { 281 if (ob == null) { 282 return null; 283 } 284 State state = new State(ob.keySet().size()); 285 for (String key : ob.keySet()) { 286 if (useCustomId && MONGODB_ID.equals(key)) { 287 // skip native id 288 continue; 289 } 290 state.put(bsonToKey(key), bsonToValue(ob.get(key))); 291 } 292 return state; 293 } 294 295 protected Serializable bsonToValue(Object value) { 296 if (value instanceof List) { 297 @SuppressWarnings("unchecked") 298 List<Object> list = (List<Object>) value; 299 if (list.isEmpty()) { 300 return null; 301 } else { 302 if (list.get(0) instanceof DBObject) { 303 List<Serializable> l = new ArrayList<>(list.size()); 304 for (Object el : list) { 305 l.add(bsonToState((DBObject) el)); 306 } 307 return (Serializable) l; 308 } else { 309 // turn the list into a properly-typed array 310 Class<?> klass = Object.class; 311 for (Object o : list) { 312 if (o != null) { 313 klass = scalarToSerializableClass(o.getClass()); 314 break; 315 } 316 } 317 Object[] ar = (Object[]) Array.newInstance(klass, list.size()); 318 int i = 0; 319 for (Object el : list) { 320 ar[i++] = scalarToSerializable(el); 321 } 322 return ar; 323 } 324 } 325 } else if (value instanceof DBObject) { 326 return bsonToState((DBObject) value); 327 } else { 328 return scalarToSerializable(value); 329 } 330 } 331 332 public static class Updates { 333 public BasicDBObject set = new BasicDBObject(); 334 335 public BasicDBObject unset = new BasicDBObject(); 336 337 public BasicDBObject push = new BasicDBObject(); 338 339 public BasicDBObject inc = new BasicDBObject(); 340 } 341 342 /** 343 * Constructs a list of MongoDB updates from the given {@link StateDiff}. 344 * <p> 345 * We need a list because some cases need two operations to avoid conflicts. 346 */ 347 protected List<DBObject> diffToBson(StateDiff diff) { 348 Updates updates = new Updates(); 349 diffToUpdates(diff, null, updates); 350 UpdateListBuilder builder = new UpdateListBuilder(); 351 for (Entry<String, Object> en : updates.set.entrySet()) { 352 builder.update(MONGODB_SET, en.getKey(), en.getValue()); 353 } 354 for (Entry<String, Object> en : updates.unset.entrySet()) { 355 builder.update(MONGODB_UNSET, en.getKey(), en.getValue()); 356 } 357 for (Entry<String, Object> en : updates.push.entrySet()) { 358 builder.update(MONGODB_PUSH, en.getKey(), en.getValue()); 359 } 360 for (Entry<String, Object> en : updates.inc.entrySet()) { 361 builder.update(MONGODB_INC, en.getKey(), en.getValue()); 362 } 363 return builder.updateList; 364 } 365 366 /** 367 * Update list builder to prevent several updates of the same field. 368 * <p> 369 * This happens if two operations act on two fields where one is a prefix of the other. 370 * <p> 371 * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837) 372 * 373 * @since 5.9.5 374 */ 375 protected static class UpdateListBuilder { 376 377 protected List<DBObject> updateList = new ArrayList<>(1); 378 379 protected DBObject update; 380 381 protected List<String> keys; 382 383 protected UpdateListBuilder() { 384 newUpdate(); 385 } 386 387 protected void newUpdate() { 388 updateList.add(update = new BasicDBObject()); 389 keys = new ArrayList<>(); 390 } 391 392 protected void update(String op, String key, Object value) { 393 if (conflicts(key, keys)) { 394 newUpdate(); 395 } 396 keys.add(key); 397 DBObject map = (DBObject) update.get(op); 398 if (map == null) { 399 update.put(op, map = new BasicDBObject()); 400 } 401 map.put(key, value); 402 } 403 404 /** 405 * Checks if the key conflicts with one of the previous keys. 406 * <p> 407 * A conflict occurs if one key is equals to or is a prefix of the other. 408 */ 409 protected boolean conflicts(String key, List<String> previousKeys) { 410 String keydot = key + '.'; 411 for (String prev : previousKeys) { 412 if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) { 413 return true; 414 } 415 } 416 return false; 417 } 418 } 419 420 protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) { 421 String elemPrefix = prefix == null ? "" : prefix + '.'; 422 for (Entry<String, Serializable> en : diff.entrySet()) { 423 String name = elemPrefix + en.getKey(); 424 Serializable value = en.getValue(); 425 if (value instanceof StateDiff) { 426 diffToUpdates((StateDiff) value, name, updates); 427 } else if (value instanceof ListDiff) { 428 diffToUpdates((ListDiff) value, name, updates); 429 } else if (value instanceof Delta) { 430 diffToUpdates((Delta) value, name, updates); 431 } else { 432 // not a diff 433 if (value == null) { 434 // for null values, beyond the space saving, 435 // it's important to unset the field instead of setting the value to null 436 // because $inc does not work on nulls but works on non-existent fields 437 updates.unset.put(name, ONE); 438 } else { 439 updates.set.put(name, valueToBson(value)); 440 } 441 } 442 } 443 } 444 445 protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) { 446 if (listDiff.diff != null) { 447 String elemPrefix = prefix == null ? "" : prefix + '.'; 448 int i = 0; 449 for (Object value : listDiff.diff) { 450 String name = elemPrefix + i; 451 if (value instanceof StateDiff) { 452 diffToUpdates((StateDiff) value, name, updates); 453 } else if (value != NOP) { 454 // set value 455 updates.set.put(name, valueToBson(value)); 456 } 457 i++; 458 } 459 } 460 if (listDiff.rpush != null) { 461 Object pushed; 462 if (listDiff.rpush.size() == 1) { 463 // no need to use $each for one element 464 pushed = valueToBson(listDiff.rpush.get(0)); 465 } else { 466 pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush)); 467 } 468 updates.push.put(prefix, pushed); 469 } 470 } 471 472 protected void diffToUpdates(Delta delta, String prefix, Updates updates) { 473 // MongoDB can $inc a field that doesn't exist, it's treated as 0 BUT it doesn't work on null 474 // so we ensure (in diffToUpdates) that we never store a null but remove the field instead 475 Object inc = valueToBson(delta.getDeltaValue()); 476 updates.inc.put(prefix, inc); 477 } 478 479 protected Object serializableToBson(Object value) { 480 if (value instanceof Calendar) { 481 return ((Calendar) value).getTime(); 482 } 483 return value; 484 } 485 486 protected Serializable scalarToSerializable(Object val) { 487 if (val instanceof Date) { 488 Calendar cal = Calendar.getInstance(); 489 cal.setTime((Date) val); 490 return cal; 491 } 492 return (Serializable) val; 493 } 494 495 protected Class<?> scalarToSerializableClass(Class<?> klass) { 496 if (Date.class.isAssignableFrom(klass)) { 497 return Calendar.class; 498 } 499 return klass; 500 } 501 502 protected void initRepository() { 503 // create required indexes 504 // code does explicit queries on those 505 if (useCustomId) { 506 coll.createIndex(new BasicDBObject(idKey, ONE)); 507 } 508 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 509 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 510 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 511 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 512 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 513 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 514 DBObject parentChild = new BasicDBObject(); 515 parentChild.put(KEY_PARENT_ID, ONE); 516 parentChild.put(KEY_NAME, ONE); 517 coll.createIndex(parentChild); 518 // often used in user-generated queries 519 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 520 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 521 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 522 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 523 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 524 // TODO configure these from somewhere else 525 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 526 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 527 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 528 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 529 if (!isFulltextDisabled()) { 530 DBObject indexKeys = new BasicDBObject(); 531 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 532 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 533 DBObject indexOptions = new BasicDBObject(); 534 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 535 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 536 coll.createIndex(indexKeys, indexOptions); 537 } 538 // check root presence 539 DBObject query = new BasicDBObject(idKey, getRootId()); 540 if (coll.findOne(query, justPresenceField()) != null) { 541 return; 542 } 543 // create basic repository structure needed 544 if (idType == IdType.sequence || DEBUG_UUIDS) { 545 // create the id counter 546 DBObject idCounter = new BasicDBObject(); 547 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 548 idCounter.put(COUNTER_FIELD, ZERO); 549 countersColl.insert(idCounter); 550 } 551 initRoot(); 552 } 553 554 protected synchronized Long getNextSequenceId() { 555 if (sequenceLeft == 0) { 556 // allocate a new sequence block 557 // the database contains the last value from the last block 558 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 559 DBObject update = new BasicDBObject(MONGODB_INC, 560 new BasicDBObject(COUNTER_FIELD, Long.valueOf(sequenceBlockSize))); 561 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, true, false); 562 if (idCounter == null) { 563 throw new NuxeoException("Repository id counter not initialized"); 564 } 565 sequenceLeft = sequenceBlockSize; 566 sequenceLastValue = ((Long) idCounter.get(COUNTER_FIELD)).longValue() - sequenceBlockSize; 567 } 568 sequenceLeft--; 569 sequenceLastValue++; 570 return Long.valueOf(sequenceLastValue); 571 } 572 573 @Override 574 public String generateNewId() { 575 if (idType == IdType.sequence || DEBUG_UUIDS) { 576 Long id = getNextSequenceId(); 577 if (DEBUG_UUIDS) { 578 return "UUID_" + id; 579 } 580 return id.toString(); 581 } else { 582 return UUID.randomUUID().toString(); 583 } 584 } 585 586 @Override 587 public void createState(State state) { 588 DBObject ob = stateToBson(state); 589 if (log.isTraceEnabled()) { 590 log.trace("MongoDB: CREATE " + ob.get(idKey) + ": " + ob); 591 } 592 coll.insert(ob); 593 // TODO dupe exception 594 // throw new DocumentException("Already exists: " + id); 595 } 596 597 @Override 598 public void createStates(List<State> states) { 599 List<DBObject> obs = states.stream().map(this::stateToBson).collect(Collectors.toList()); 600 if (log.isTraceEnabled()) { 601 log.trace("MongoDB: CREATE [" 602 + obs.stream().map(ob -> ob.get(idKey).toString()).collect(Collectors.joining(", ")) 603 + "]: " + obs); 604 } 605 coll.insert(obs); 606 } 607 608 @Override 609 public State readState(String id) { 610 DBObject query = new BasicDBObject(idKey, id); 611 return findOne(query); 612 } 613 614 @Override 615 public List<State> readStates(List<String> ids) { 616 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 617 return findAll(query, ids.size()); 618 } 619 620 @Override 621 public void updateState(String id, StateDiff diff) { 622 DBObject query = new BasicDBObject(idKey, id); 623 for (DBObject update : diffToBson(diff)) { 624 if (log.isTraceEnabled()) { 625 log.trace("MongoDB: UPDATE " + id + ": " + update); 626 } 627 coll.update(query, update); 628 // TODO dupe exception 629 // throw new DocumentException("Missing: " + id); 630 } 631 } 632 633 @Override 634 public void deleteStates(Set<String> ids) { 635 DBObject query = new BasicDBObject(idKey, new BasicDBObject(QueryOperators.IN, ids)); 636 if (log.isTraceEnabled()) { 637 log.trace("MongoDB: REMOVE " + ids); 638 } 639 WriteResult w = coll.remove(query); 640 if (w.getN() != ids.size()) { 641 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 642 } 643 } 644 645 @Override 646 public State readChildState(String parentId, String name, Set<String> ignored) { 647 DBObject query = getChildQuery(parentId, name, ignored); 648 return findOne(query); 649 } 650 651 protected void logQuery(String id, DBObject fields) { 652 logQuery(new BasicDBObject(idKey, id), fields); 653 } 654 655 protected void logQuery(DBObject query, DBObject fields) { 656 if (fields == null) { 657 log.trace("MongoDB: QUERY " + query); 658 } else { 659 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 660 } 661 } 662 663 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 664 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 665 + " OFFSET " + offset + " LIMIT " + limit); 666 } 667 668 @Override 669 public boolean hasChild(String parentId, String name, Set<String> ignored) { 670 DBObject query = getChildQuery(parentId, name, ignored); 671 if (log.isTraceEnabled()) { 672 logQuery(query, justPresenceField()); 673 } 674 return coll.findOne(query, justPresenceField()) != null; 675 } 676 677 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 678 DBObject query = new BasicDBObject(); 679 query.put(KEY_PARENT_ID, parentId); 680 query.put(KEY_NAME, name); 681 addIgnoredIds(query, ignored); 682 return query; 683 } 684 685 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 686 if (!ignored.isEmpty()) { 687 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 688 query.put(idKey, notInIds); 689 } 690 } 691 692 @Override 693 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 694 DBObject query = new BasicDBObject(keyToBson(key), value); 695 addIgnoredIds(query, ignored); 696 return findAll(query, 0); 697 } 698 699 @Override 700 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 701 DBObject query = new BasicDBObject(keyToBson(key1), value1); 702 query.put(keyToBson(key2), value2); 703 addIgnoredIds(query, ignored); 704 return findAll(query, 0); 705 } 706 707 @Override 708 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 709 Map<String, Object[]> targetProxies) { 710 DBObject query = new BasicDBObject(key, value); 711 DBObject fields = new BasicDBObject(); 712 if (useCustomId) { 713 fields.put(MONGODB_ID, ZERO); 714 } 715 fields.put(idKey, ONE); 716 fields.put(KEY_IS_PROXY, ONE); 717 fields.put(KEY_PROXY_TARGET_ID, ONE); 718 fields.put(KEY_PROXY_IDS, ONE); 719 if (log.isTraceEnabled()) { 720 logQuery(query, fields); 721 } 722 DBCursor cursor = coll.find(query, fields); 723 try { 724 for (DBObject ob : cursor) { 725 String id = (String) ob.get(idKey); 726 ids.add(id); 727 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 728 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 729 proxyTargets.put(id, targetId); 730 } 731 if (targetProxies != null) { 732 Object[] proxyIds = (Object[]) bsonToValue(ob.get(KEY_PROXY_IDS)); 733 if (proxyIds != null) { 734 targetProxies.put(id, proxyIds); 735 } 736 } 737 } 738 } finally { 739 cursor.close(); 740 } 741 } 742 743 @Override 744 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 745 DBObject query = new BasicDBObject(key, value); 746 addIgnoredIds(query, ignored); 747 if (log.isTraceEnabled()) { 748 logQuery(query, justPresenceField()); 749 } 750 return coll.findOne(query, justPresenceField()) != null; 751 } 752 753 protected State findOne(DBObject query) { 754 if (log.isTraceEnabled()) { 755 logQuery(query, null); 756 } 757 return bsonToState(coll.findOne(query)); 758 } 759 760 protected List<State> findAll(DBObject query, int sizeHint) { 761 if (log.isTraceEnabled()) { 762 logQuery(query, null); 763 } 764 DBCursor cursor = coll.find(query); 765 Set<String> seen = new HashSet<>(); 766 try { 767 List<State> list = new ArrayList<>(sizeHint); 768 for (DBObject ob : cursor) { 769 if (!seen.add((String) ob.get(idKey))) { 770 // MongoDB cursors may return the same 771 // object several times 772 continue; 773 } 774 list.add(bsonToState(ob)); 775 } 776 return list; 777 } finally { 778 cursor.close(); 779 } 780 } 781 782 protected DBObject justPresenceField() { 783 return new BasicDBObject(MONGODB_ID, ONE); 784 } 785 786 @Override 787 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 788 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 789 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 790 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(this, evaluator.getExpression(), 791 evaluator.getSelectClause(), orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 792 builder.walk(); 793 if (builder.hasFulltext && isFulltextDisabled()) { 794 throw new QueryParseException("Fulltext search disabled by configuration"); 795 } 796 DBObject query = builder.getQuery(); 797 addPrincipals(query, evaluator.principals); 798 DBObject orderBy = builder.getOrderBy(); 799 DBObject keys = builder.getProjection(); 800 // Don't do manual projection if there are no projection wildcards, as this brings no new 801 // information and is costly. The only difference is several identical rows instead of one. 802 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 803 if (manualProjection) { 804 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 805 // so we need the full state from the database 806 keys = new BasicDBObject(); 807 evaluator.parse(); 808 } 809 810 if (log.isTraceEnabled()) { 811 logQuery(query, keys, orderBy, limit, offset); 812 } 813 814 List<Map<String, Serializable>> projections; 815 long totalSize; 816 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 817 try { 818 if (orderBy != null) { 819 cursor.sort(orderBy); 820 } 821 projections = new ArrayList<>(); 822 for (DBObject ob : cursor) { 823 State state = bsonToState(ob); 824 if (manualProjection) { 825 projections.addAll(evaluator.matches(state)); 826 } else { 827 projections.add(DBSStateFlattener.flatten(state)); 828 } 829 } 830 if (countUpTo == -1) { 831 // count full size 832 if (limit == 0) { 833 totalSize = projections.size(); 834 } else { 835 totalSize = cursor.count(); 836 } 837 } else if (countUpTo == 0) { 838 // no count 839 totalSize = -1; // not counted 840 } else { 841 // count only if less than countUpTo 842 if (limit == 0) { 843 totalSize = projections.size(); 844 } else { 845 totalSize = cursor.copy().limit(countUpTo + 1).count(); 846 } 847 if (totalSize > countUpTo) { 848 totalSize = -2; // truncated 849 } 850 } 851 } finally { 852 cursor.close(); 853 } 854 if (log.isTraceEnabled() && projections.size() != 0) { 855 log.trace("MongoDB: -> " + projections.size()); 856 } 857 return new PartialList<>(projections, totalSize); 858 } 859 860 protected void addPrincipals(DBObject query, Set<String> principals) { 861 if (principals != null) { 862 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 863 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 864 } 865 } 866 867 /** Keys used for document projection when marking all binaries for GC. */ 868 protected DBObject binaryKeys; 869 870 @Override 871 protected void initBlobsPaths() { 872 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 873 finder.visit(); 874 binaryKeys = finder.binaryKeys; 875 } 876 877 protected static class MongoDBBlobFinder extends BlobFinder { 878 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 879 880 @Override 881 protected void recordBlobPath() { 882 path.addLast(KEY_BLOB_DATA); 883 binaryKeys.put(StringUtils.join(path, "."), ONE); 884 path.removeLast(); 885 } 886 } 887 888 @Override 889 public void markReferencedBinaries() { 890 BlobManager blobManager = Framework.getService(BlobManager.class); 891 // TODO add a query to not scan all documents 892 if (log.isTraceEnabled()) { 893 logQuery(new BasicDBObject(), binaryKeys); 894 } 895 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 896 try { 897 for (DBObject ob : cursor) { 898 markReferencedBinaries(ob, blobManager); 899 } 900 } finally { 901 cursor.close(); 902 } 903 } 904 905 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 906 for (String key : ob.keySet()) { 907 Object value = ob.get(key); 908 if (value instanceof List) { 909 @SuppressWarnings("unchecked") 910 List<Object> list = (List<Object>) value; 911 for (Object v : list) { 912 if (v instanceof DBObject) { 913 markReferencedBinaries((DBObject) v, blobManager); 914 } else { 915 markReferencedBinary(v, blobManager); 916 } 917 } 918 } else if (value instanceof Object[]) { 919 for (Object v : (Object[]) value) { 920 markReferencedBinary(v, blobManager); 921 } 922 } else if (value instanceof DBObject) { 923 markReferencedBinaries((DBObject) value, blobManager); 924 } else { 925 markReferencedBinary(value, blobManager); 926 } 927 } 928 } 929 930 protected void markReferencedBinary(Object value, BlobManager blobManager) { 931 if (!(value instanceof String)) { 932 return; 933 } 934 String key = (String) value; 935 blobManager.markReferencedBinary(key, repositoryName); 936 } 937 938 protected static final DBObject LOCK_FIELDS; 939 940 static { 941 LOCK_FIELDS = new BasicDBObject(); 942 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 943 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 944 } 945 946 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 947 948 @Override 949 public Lock getLock(String id) { 950 if (log.isTraceEnabled()) { 951 logQuery(id, LOCK_FIELDS); 952 } 953 DBObject res = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 954 if (res == null) { 955 // document not found 956 throw new DocumentNotFoundException(id); 957 } 958 String owner = (String) res.get(KEY_LOCK_OWNER); 959 if (owner == null) { 960 // not locked 961 return null; 962 } 963 Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED)); 964 return new Lock(owner, created); 965 } 966 967 @Override 968 public Lock setLock(String id, Lock lock) { 969 DBObject query = new BasicDBObject(idKey, id); 970 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 971 DBObject setLock = new BasicDBObject(); 972 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 973 setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated())); 974 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 975 if (log.isTraceEnabled()) { 976 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 977 } 978 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 979 if (res != null) { 980 // found a doc to lock 981 return null; 982 } else { 983 // doc not found, or lock owner already set 984 // get the old lock 985 if (log.isTraceEnabled()) { 986 logQuery(id, LOCK_FIELDS); 987 } 988 DBObject old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 989 if (old == null) { 990 // document not found 991 throw new DocumentNotFoundException(id); 992 } 993 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 994 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 995 if (oldOwner != null) { 996 return new Lock(oldOwner, oldCreated); 997 } 998 // no lock -- there was a race condition 999 // TODO do better 1000 throw new ConcurrentUpdateException("Lock " + id); 1001 } 1002 } 1003 1004 @Override 1005 public Lock removeLock(String id, String owner) { 1006 DBObject query = new BasicDBObject(idKey, id); 1007 if (owner != null) { 1008 // remove if owner matches or null 1009 // implements LockManager.canLockBeRemoved inside MongoDB 1010 Object ownerOrNull = Arrays.asList(owner, null); 1011 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 1012 } // else unconditional remove 1013 // remove the lock 1014 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 1015 if (old != null) { 1016 // found a doc and removed the lock, return previous lock 1017 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1018 if (oldOwner == null) { 1019 // was not locked 1020 return null; 1021 } else { 1022 // return previous lock 1023 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1024 return new Lock(oldOwner, oldCreated); 1025 } 1026 } else { 1027 // doc not found, or lock owner didn't match 1028 // get the old lock 1029 if (log.isTraceEnabled()) { 1030 logQuery(id, LOCK_FIELDS); 1031 } 1032 old = coll.findOne(new BasicDBObject(idKey, id), LOCK_FIELDS); 1033 if (old == null) { 1034 // document not found 1035 throw new DocumentNotFoundException(id); 1036 } 1037 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1038 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1039 if (oldOwner != null) { 1040 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 1041 // existing mismatched lock, flag failure 1042 return new Lock(oldOwner, oldCreated, true); 1043 } 1044 // old owner should have matched -- there was a race condition 1045 // TODO do better 1046 throw new ConcurrentUpdateException("Unlock " + id); 1047 } 1048 // old owner null, should have matched -- there was a race condition 1049 // TODO do better 1050 throw new ConcurrentUpdateException("Unlock " + id); 1051 } 1052 } 1053 1054 @Override 1055 public void closeLockManager() { 1056 1057 } 1058 1059 @Override 1060 public void clearLockManagerCaches() { 1061 } 1062 1063}