001/* 002 * Copyright (c) 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012package org.nuxeo.ecm.core.storage.mongodb; 013 014import static java.lang.Boolean.TRUE; 015import static org.nuxeo.ecm.core.storage.State.NOP; 016import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 017import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 018import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 019import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 020import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 038 039import java.io.Serializable; 040import java.lang.reflect.Array; 041import java.net.UnknownHostException; 042import java.util.ArrayList; 043import java.util.Arrays; 044import java.util.Calendar; 045import java.util.Date; 046import java.util.HashSet; 047import java.util.List; 048import java.util.Map; 049import java.util.Map.Entry; 050import java.util.Set; 051import java.util.UUID; 052 053import org.apache.commons.lang.StringUtils; 054import org.apache.commons.logging.Log; 055import org.apache.commons.logging.LogFactory; 056import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 057import org.nuxeo.ecm.core.api.DocumentNotFoundException; 058import org.nuxeo.ecm.core.api.Lock; 059import org.nuxeo.ecm.core.api.NuxeoException; 060import org.nuxeo.ecm.core.api.PartialList; 061import org.nuxeo.ecm.core.api.model.Delta; 062import org.nuxeo.ecm.core.blob.BlobManager; 063import org.nuxeo.ecm.core.model.LockManager; 064import org.nuxeo.ecm.core.model.Repository; 065import org.nuxeo.ecm.core.query.sql.model.Expression; 066import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 067import org.nuxeo.ecm.core.query.sql.model.SelectClause; 068import org.nuxeo.ecm.core.storage.State; 069import org.nuxeo.ecm.core.storage.State.ListDiff; 070import org.nuxeo.ecm.core.storage.State.StateDiff; 071import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 072import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 073import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 074import org.nuxeo.runtime.api.Framework; 075 076import com.mongodb.BasicDBObject; 077import com.mongodb.DB; 078import com.mongodb.DBCollection; 079import com.mongodb.DBCursor; 080import com.mongodb.DBObject; 081import com.mongodb.MongoClient; 082import com.mongodb.MongoClientURI; 083import com.mongodb.QueryOperators; 084import com.mongodb.ServerAddress; 085import com.mongodb.WriteResult; 086 087/** 088 * MongoDB implementation of a {@link Repository}. 089 * 090 * @since 5.9.4 091 */ 092public class MongoDBRepository extends DBSRepositoryBase { 093 094 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 095 096 private static final Long ZERO = Long.valueOf(0); 097 098 private static final Long ONE = Long.valueOf(1); 099 100 private static final Long MINUS_ONE = Long.valueOf(-11); 101 102 public static final String DB_DEFAULT = "nuxeo"; 103 104 public static final String MONGODB_ID = "_id"; 105 106 public static final String MONGODB_INC = "$inc"; 107 108 public static final String MONGODB_SET = "$set"; 109 110 public static final String MONGODB_UNSET = "$unset"; 111 112 public static final String MONGODB_PUSH = "$push"; 113 114 public static final String MONGODB_EACH = "$each"; 115 116 public static final String MONGODB_META = "$meta"; 117 118 public static final String MONGODB_TEXT_SCORE = "textScore"; 119 120 private static final String MONGODB_INDEX_TEXT = "text"; 121 122 private static final String MONGODB_INDEX_NAME = "name"; 123 124 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 125 126 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 127 128 private static final String LANGUAGE_FIELD = "__language"; 129 130 protected static final String COUNTER_NAME_UUID = "ecm:id"; 131 132 protected static final String COUNTER_FIELD = "seq"; 133 134 protected MongoClient mongoClient; 135 136 protected DBCollection coll; 137 138 protected DBCollection countersColl; 139 140 public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) { 141 super(descriptor.name, descriptor.getFulltextDisabled()); 142 try { 143 mongoClient = newMongoClient(descriptor); 144 coll = getCollection(descriptor, mongoClient); 145 countersColl = getCountersCollection(descriptor, mongoClient); 146 } catch (UnknownHostException e) { 147 throw new RuntimeException(e); 148 } 149 initRepository(); 150 } 151 152 @Override 153 public void shutdown() { 154 super.shutdown(); 155 mongoClient.close(); 156 } 157 158 // used also by unit tests 159 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 160 String server = descriptor.server; 161 if (StringUtils.isBlank(server)) { 162 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 163 } 164 if (server.startsWith("mongodb://")) { 165 // allow mongodb:// URI syntax for the server, to pass everything in one string 166 return new MongoClient(new MongoClientURI(server)); 167 } else { 168 return new MongoClient(new ServerAddress(server)); 169 } 170 } 171 172 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 173 if (StringUtils.isBlank(dbname)) { 174 dbname = DB_DEFAULT; 175 } 176 DB db = mongoClient.getDB(dbname); 177 return db.getCollection(collection); 178 } 179 180 // used also by unit tests 181 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 182 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 183 } 184 185 // used also by unit tests 186 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 187 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 188 } 189 190 protected Object valueToBson(Object value) { 191 if (value instanceof State) { 192 return stateToBson((State) value); 193 } else if (value instanceof List) { 194 @SuppressWarnings("unchecked") 195 List<Object> values = (List<Object>) value; 196 return listToBson(values); 197 } else if (value instanceof Object[]) { 198 return listToBson(Arrays.asList((Object[]) value)); 199 } else { 200 return serializableToBson(value); 201 } 202 } 203 204 protected DBObject stateToBson(State state) { 205 DBObject ob = new BasicDBObject(); 206 for (Entry<String, Serializable> en : state.entrySet()) { 207 Object val = valueToBson(en.getValue()); 208 if (val != null) { 209 ob.put(en.getKey(), val); 210 } 211 } 212 return ob; 213 } 214 215 protected List<Object> listToBson(List<Object> values) { 216 ArrayList<Object> objects = new ArrayList<Object>(values.size()); 217 for (Object value : values) { 218 objects.add(valueToBson(value)); 219 } 220 return objects; 221 } 222 223 protected State bsonToState(DBObject ob) { 224 if (ob == null) { 225 return null; 226 } 227 State state = new State(ob.keySet().size()); 228 for (String key : ob.keySet()) { 229 Object val = ob.get(key); 230 Serializable value; 231 if (val instanceof List) { 232 @SuppressWarnings("unchecked") 233 List<Object> list = (List<Object>) val; 234 if (list.isEmpty()) { 235 value = null; 236 } else { 237 if (list.get(0) instanceof DBObject) { 238 List<Serializable> l = new ArrayList<>(list.size()); 239 for (Object el : list) { 240 l.add(bsonToState((DBObject) el)); 241 } 242 value = (Serializable) l; 243 } else { 244 // turn the list into a properly-typed array 245 Class<?> klass = Object.class; 246 for (Object o : list) { 247 if (o != null) { 248 klass = o.getClass(); 249 break; 250 } 251 } 252 Object[] ar = (Object[]) Array.newInstance(klass, list.size()); 253 int i = 0; 254 for (Object el : list) { 255 ar[i++] = scalarToSerializable(el); 256 } 257 value = ar; 258 } 259 } 260 } else if (val instanceof DBObject) { 261 value = bsonToState((DBObject) val); 262 } else { 263 if (MONGODB_ID.equals(key)) { 264 // skip ObjectId 265 continue; 266 } 267 value = scalarToSerializable(val); 268 } 269 state.put(key, value); 270 } 271 return state; 272 } 273 274 public static class Updates { 275 public BasicDBObject set = new BasicDBObject(); 276 277 public BasicDBObject unset = new BasicDBObject(); 278 279 public BasicDBObject push = new BasicDBObject(); 280 281 public BasicDBObject inc = new BasicDBObject(); 282 } 283 284 /** 285 * Constructs a list of MongoDB updates from the given {@link StateDiff}. 286 * <p> 287 * We need a list because some cases need two operations to avoid conflicts. 288 */ 289 protected List<DBObject> diffToBson(StateDiff diff) { 290 Updates updates = new Updates(); 291 diffToUpdates(diff, null, updates); 292 UpdateListBuilder builder = new UpdateListBuilder(); 293 for (Entry<String, Object> en : updates.set.entrySet()) { 294 builder.update(MONGODB_SET, en.getKey(), en.getValue()); 295 } 296 for (Entry<String, Object> en : updates.unset.entrySet()) { 297 builder.update(MONGODB_UNSET, en.getKey(), en.getValue()); 298 } 299 for (Entry<String, Object> en : updates.push.entrySet()) { 300 builder.update(MONGODB_PUSH, en.getKey(), en.getValue()); 301 } 302 for (Entry<String, Object> en : updates.inc.entrySet()) { 303 builder.update(MONGODB_INC, en.getKey(), en.getValue()); 304 } 305 return builder.updateList; 306 } 307 308 /** 309 * Update list builder to prevent several updates of the same field. 310 * <p> 311 * This happens if two operations act on two fields where one is a prefix of the other. 312 * <p> 313 * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837) 314 * 315 * @since 5.9.5 316 */ 317 protected static class UpdateListBuilder { 318 319 protected List<DBObject> updateList = new ArrayList<>(1); 320 321 protected DBObject update; 322 323 protected List<String> keys; 324 325 protected UpdateListBuilder() { 326 newUpdate(); 327 } 328 329 protected void newUpdate() { 330 updateList.add(update = new BasicDBObject()); 331 keys = new ArrayList<>(); 332 } 333 334 protected void update(String op, String key, Object value) { 335 if (conflicts(key, keys)) { 336 newUpdate(); 337 } 338 keys.add(key); 339 DBObject map = (DBObject) update.get(op); 340 if (map == null) { 341 update.put(op, map = new BasicDBObject()); 342 } 343 map.put(key, value); 344 } 345 346 /** 347 * Checks if the key conflicts with one of the previous keys. 348 * <p> 349 * A conflict occurs if one key is equals to or is a prefix of the other. 350 */ 351 protected boolean conflicts(String key, List<String> previousKeys) { 352 String keydot = key + '.'; 353 for (String prev : previousKeys) { 354 if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) { 355 return true; 356 } 357 } 358 return false; 359 } 360 } 361 362 protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) { 363 String elemPrefix = prefix == null ? "" : prefix + '.'; 364 for (Entry<String, Serializable> en : diff.entrySet()) { 365 String name = elemPrefix + en.getKey(); 366 Serializable value = en.getValue(); 367 if (value instanceof StateDiff) { 368 diffToUpdates((StateDiff) value, name, updates); 369 } else if (value instanceof ListDiff) { 370 diffToUpdates((ListDiff) value, name, updates); 371 } else if (value instanceof Delta) { 372 diffToUpdates((Delta) value, name, updates); 373 } else { 374 // not a diff 375 updates.set.put(name, valueToBson(value)); 376 } 377 } 378 } 379 380 protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) { 381 if (listDiff.diff != null) { 382 String elemPrefix = prefix == null ? "" : prefix + '.'; 383 int i = 0; 384 for (Object value : listDiff.diff) { 385 String name = elemPrefix + i; 386 if (value instanceof StateDiff) { 387 diffToUpdates((StateDiff) value, name, updates); 388 } else if (value != NOP) { 389 // set value 390 updates.set.put(name, valueToBson(value)); 391 } 392 i++; 393 } 394 } 395 if (listDiff.rpush != null) { 396 Object pushed; 397 if (listDiff.rpush.size() == 1) { 398 // no need to use $each for one element 399 pushed = valueToBson(listDiff.rpush.get(0)); 400 } else { 401 pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush)); 402 } 403 updates.push.put(prefix, pushed); 404 } 405 } 406 407 protected void diffToUpdates(Delta delta, String prefix, Updates updates) { 408 Object inc = valueToBson(delta.getDeltaValue()); 409 updates.inc.put(prefix, inc); 410 } 411 412 protected Object serializableToBson(Object value) { 413 if (value instanceof Calendar) { 414 return ((Calendar) value).getTime(); 415 } 416 return value; 417 } 418 419 protected Serializable scalarToSerializable(Object val) { 420 if (val instanceof Date) { 421 Calendar cal = Calendar.getInstance(); 422 cal.setTime((Date) val); 423 return cal; 424 } 425 return (Serializable) val; 426 } 427 428 protected void initRepository() { 429 // create required indexes 430 // code does explicit queries on those 431 coll.createIndex(new BasicDBObject(KEY_ID, ONE)); 432 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 433 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 434 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 435 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 436 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 437 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 438 DBObject parentChild = new BasicDBObject(); 439 parentChild.put(KEY_PARENT_ID, ONE); 440 parentChild.put(KEY_NAME, ONE); 441 coll.createIndex(parentChild); 442 // often used in user-generated queries 443 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 444 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 445 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 446 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 447 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 448 // TODO configure these from somewhere else 449 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 450 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 451 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 452 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 453 if (!fulltextDisabled) { 454 DBObject indexKeys = new BasicDBObject(); 455 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 456 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 457 DBObject indexOptions = new BasicDBObject(); 458 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 459 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 460 coll.createIndex(indexKeys, indexOptions); 461 } 462 // check root presence 463 DBObject query = new BasicDBObject(KEY_ID, getRootId()); 464 if (coll.findOne(query, justPresenceField()) != null) { 465 return; 466 } 467 // create basic repository structure needed 468 if (DEBUG_UUIDS) { 469 // create the id counter 470 DBObject idCounter = new BasicDBObject(); 471 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 472 idCounter.put(COUNTER_FIELD, ZERO); 473 countersColl.insert(idCounter); 474 } 475 initRoot(); 476 } 477 478 protected Long getNextUuidSeq() { 479 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 480 DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE)); 481 boolean returnNew = true; 482 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false); 483 if (idCounter == null) { 484 throw new RuntimeException("Repository id counter not initialized"); 485 } 486 return (Long) idCounter.get(COUNTER_FIELD); 487 } 488 489 @Override 490 public String generateNewId() { 491 if (DEBUG_UUIDS) { 492 Long id = getNextUuidSeq(); 493 return "UUID_" + id; 494 } else { 495 return UUID.randomUUID().toString(); 496 } 497 } 498 499 @Override 500 public void createState(State state) { 501 DBObject ob = stateToBson(state); 502 if (log.isTraceEnabled()) { 503 log.trace("MongoDB: CREATE " + ob); 504 } 505 coll.insert(ob); 506 // TODO dupe exception 507 // throw new DocumentException("Already exists: " + id); 508 } 509 510 @Override 511 public State readState(String id) { 512 DBObject query = new BasicDBObject(KEY_ID, id); 513 return findOne(query); 514 } 515 516 @Override 517 public List<State> readStates(List<String> ids) { 518 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 519 return findAll(query, ids.size()); 520 } 521 522 @Override 523 public void updateState(String id, StateDiff diff) { 524 DBObject query = new BasicDBObject(KEY_ID, id); 525 for (DBObject update : diffToBson(diff)) { 526 if (log.isTraceEnabled()) { 527 log.trace("MongoDB: UPDATE " + id + ": " + update); 528 } 529 coll.update(query, update); 530 // TODO dupe exception 531 // throw new DocumentException("Missing: " + id); 532 } 533 } 534 535 @Override 536 public void deleteStates(Set<String> ids) { 537 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 538 if (log.isTraceEnabled()) { 539 log.trace("MongoDB: REMOVE " + ids); 540 } 541 WriteResult w = coll.remove(query); 542 if (w.getN() != ids.size()) { 543 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 544 } 545 } 546 547 @Override 548 public State readChildState(String parentId, String name, Set<String> ignored) { 549 DBObject query = getChildQuery(parentId, name, ignored); 550 return findOne(query); 551 } 552 553 protected void logQuery(String id, DBObject fields) { 554 logQuery(new BasicDBObject(KEY_ID, id), fields); 555 } 556 557 protected void logQuery(DBObject query, DBObject fields) { 558 if (fields == null) { 559 log.trace("MongoDB: QUERY " + query); 560 } else { 561 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 562 } 563 } 564 565 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 566 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 567 + " OFFSET " + offset + " LIMIT " + limit); 568 } 569 570 @Override 571 public boolean hasChild(String parentId, String name, Set<String> ignored) { 572 DBObject query = getChildQuery(parentId, name, ignored); 573 if (log.isTraceEnabled()) { 574 logQuery(query, justPresenceField()); 575 } 576 return coll.findOne(query, justPresenceField()) != null; 577 } 578 579 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 580 DBObject query = new BasicDBObject(); 581 query.put(KEY_PARENT_ID, parentId); 582 query.put(KEY_NAME, name); 583 addIgnoredIds(query, ignored); 584 return query; 585 } 586 587 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 588 if (!ignored.isEmpty()) { 589 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 590 query.put(KEY_ID, notInIds); 591 } 592 } 593 594 @Override 595 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 596 DBObject query = new BasicDBObject(key, value); 597 addIgnoredIds(query, ignored); 598 return findAll(query, 0); 599 } 600 601 @Override 602 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 603 DBObject query = new BasicDBObject(key1, value1); 604 query.put(key2, value2); 605 addIgnoredIds(query, ignored); 606 return findAll(query, 0); 607 } 608 609 @Override 610 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 611 Map<String, Object[]> targetProxies) { 612 DBObject query = new BasicDBObject(key, value); 613 DBObject fields = new BasicDBObject(); 614 fields.put(MONGODB_ID, ZERO); 615 fields.put(KEY_ID, ONE); 616 fields.put(KEY_IS_PROXY, ONE); 617 fields.put(KEY_PROXY_TARGET_ID, ONE); 618 fields.put(KEY_PROXY_IDS, ONE); 619 if (log.isTraceEnabled()) { 620 logQuery(query, fields); 621 } 622 DBCursor cursor = coll.find(query, fields); 623 try { 624 for (DBObject ob : cursor) { 625 String id = (String) ob.get(KEY_ID); 626 ids.add(id); 627 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 628 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 629 proxyTargets.put(id, targetId); 630 } 631 if (targetProxies != null) { 632 Object[] proxyIds = (Object[]) ob.get(KEY_PROXY_IDS); 633 if (proxyIds != null) { 634 targetProxies.put(id, proxyIds); 635 } 636 } 637 } 638 } finally { 639 cursor.close(); 640 } 641 } 642 643 @Override 644 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 645 DBObject query = new BasicDBObject(key, value); 646 addIgnoredIds(query, ignored); 647 if (log.isTraceEnabled()) { 648 logQuery(query, justPresenceField()); 649 } 650 return coll.findOne(query, justPresenceField()) != null; 651 } 652 653 protected State findOne(DBObject query) { 654 if (log.isTraceEnabled()) { 655 logQuery(query, null); 656 } 657 return bsonToState(coll.findOne(query)); 658 } 659 660 protected List<State> findAll(DBObject query, int sizeHint) { 661 if (log.isTraceEnabled()) { 662 logQuery(query, null); 663 } 664 DBCursor cursor = coll.find(query); 665 Set<String> seen = new HashSet<>(); 666 try { 667 List<State> list = new ArrayList<>(sizeHint); 668 for (DBObject ob : cursor) { 669 if (!seen.add((String) ob.get(KEY_ID))) { 670 // MongoDB cursors may return the same 671 // object several times 672 continue; 673 } 674 list.add(bsonToState(ob)); 675 } 676 return list; 677 } finally { 678 cursor.close(); 679 } 680 } 681 682 protected DBObject justPresenceField() { 683 return new BasicDBObject(MONGODB_ID, ONE); 684 } 685 686 @Override 687 public PartialList<State> queryAndFetch(Expression expression, SelectClause selectClause, OrderByClause orderByClause, 688 int limit, int offset, int countUpTo, DBSExpressionEvaluator evaluator, boolean deepCopy) { 689 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(expression, selectClause, orderByClause, 690 evaluator.pathResolver); 691 builder.walk(); 692 if (builder.hasFulltext && fulltextDisabled) { 693 throw new RuntimeException("Fulltext disabled by configuration"); 694 } 695 DBObject query = builder.getQuery(); 696 addPrincipals(query, evaluator.principals); 697 DBObject orderBy = builder.getOrderBy(); 698 DBObject keys = builder.getProjection(); 699 700 if (log.isTraceEnabled()) { 701 logQuery(query, keys, orderBy, limit, offset); 702 } 703 704 List<State> list; 705 long totalSize; 706 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 707 try { 708 if (orderBy != null) { 709 cursor = cursor.sort(orderBy); 710 } 711 list = new ArrayList<>(); 712 for (DBObject ob : cursor) { 713 list.add(bsonToState(ob)); 714 } 715 if (countUpTo == -1) { 716 // count full size 717 if (limit == 0) { 718 totalSize = list.size(); 719 } else { 720 totalSize = cursor.count(); 721 } 722 } else if (countUpTo == 0) { 723 // no count 724 totalSize = -1; // not counted 725 } else { 726 // count only if less than countUpTo 727 if (limit == 0) { 728 totalSize = list.size(); 729 } else { 730 totalSize = cursor.copy().limit(countUpTo + 1).count(); 731 } 732 if (totalSize > countUpTo) { 733 totalSize = -2; // truncated 734 } 735 } 736 } finally { 737 cursor.close(); 738 } 739 if (log.isTraceEnabled() && list.size() != 0) { 740 log.trace("MongoDB: -> " + list.size()); 741 } 742 return new PartialList<>(list, totalSize); 743 } 744 745 protected void addPrincipals(DBObject query, Set<String> principals) { 746 if (principals != null) { 747 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 748 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 749 } 750 } 751 752 /** Keys used for document projection when marking all binaries for GC. */ 753 protected DBObject binaryKeys; 754 755 @Override 756 protected void initBlobsPaths() { 757 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 758 finder.visit(); 759 binaryKeys = finder.binaryKeys; 760 } 761 762 protected static class MongoDBBlobFinder extends BlobFinder { 763 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 764 765 @Override 766 protected void recordBlobPath() { 767 path.addLast(KEY_BLOB_DATA); 768 binaryKeys.put(StringUtils.join(path, "."), ONE); 769 path.removeLast(); 770 } 771 } 772 773 @Override 774 public void markReferencedBinaries() { 775 BlobManager blobManager = Framework.getService(BlobManager.class); 776 // TODO add a query to not scan all documents 777 if (log.isTraceEnabled()) { 778 logQuery(new BasicDBObject(), binaryKeys); 779 } 780 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 781 try { 782 for (DBObject ob : cursor) { 783 markReferencedBinaries(ob, blobManager); 784 } 785 } finally { 786 cursor.close(); 787 } 788 } 789 790 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 791 for (String key : ob.keySet()) { 792 Object value = ob.get(key); 793 if (value instanceof List) { 794 @SuppressWarnings("unchecked") 795 List<Object> list = (List<Object>) value; 796 for (Object v : list) { 797 if (v instanceof DBObject) { 798 markReferencedBinaries((DBObject) v, blobManager); 799 } else { 800 markReferencedBinary(v, blobManager); 801 } 802 } 803 } else if (value instanceof Object[]) { 804 for (Object v : (Object[]) value) { 805 markReferencedBinary(v, blobManager); 806 } 807 } else if (value instanceof DBObject) { 808 markReferencedBinaries((DBObject) value, blobManager); 809 } else { 810 markReferencedBinary(value, blobManager); 811 } 812 } 813 } 814 815 protected void markReferencedBinary(Object value, BlobManager blobManager) { 816 if (!(value instanceof String)) { 817 return; 818 } 819 String key = (String) value; 820 blobManager.markReferencedBinary(key, repositoryName); 821 } 822 823 protected static final DBObject LOCK_FIELDS; 824 825 static { 826 LOCK_FIELDS = new BasicDBObject(); 827 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 828 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 829 } 830 831 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 832 833 @Override 834 public Lock getLock(String id) { 835 if (log.isTraceEnabled()) { 836 logQuery(id, LOCK_FIELDS); 837 } 838 DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 839 if (res == null) { 840 // document not found 841 throw new DocumentNotFoundException(id); 842 } 843 String owner = (String) res.get(KEY_LOCK_OWNER); 844 if (owner == null) { 845 // not locked 846 return null; 847 } 848 Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED)); 849 return new Lock(owner, created); 850 } 851 852 @Override 853 public Lock setLock(String id, Lock lock) { 854 DBObject query = new BasicDBObject(KEY_ID, id); 855 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 856 DBObject setLock = new BasicDBObject(); 857 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 858 setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated())); 859 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 860 if (log.isTraceEnabled()) { 861 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 862 } 863 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 864 if (res != null) { 865 // found a doc to lock 866 return null; 867 } else { 868 // doc not found, or lock owner already set 869 // get the old lock 870 if (log.isTraceEnabled()) { 871 logQuery(id, LOCK_FIELDS); 872 } 873 DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 874 if (old == null) { 875 // document not found 876 throw new DocumentNotFoundException(id); 877 } 878 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 879 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 880 if (oldOwner != null) { 881 return new Lock(oldOwner, oldCreated); 882 } 883 // no lock -- there was a race condition 884 // TODO do better 885 throw new ConcurrentUpdateException("Lock " + id); 886 } 887 } 888 889 @Override 890 public Lock removeLock(String id, String owner) { 891 DBObject query = new BasicDBObject(KEY_ID, id); 892 if (owner != null) { 893 // remove if owner matches or null 894 // implements LockManager.canLockBeRemoved inside MongoDB 895 Object ownerOrNull = Arrays.asList(owner, null); 896 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 897 } // else unconditional remove 898 // remove the lock 899 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 900 if (old != null) { 901 // found a doc and removed the lock, return previous lock 902 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 903 if (oldOwner == null) { 904 // was not locked 905 return null; 906 } else { 907 // return previous lock 908 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 909 return new Lock(oldOwner, oldCreated); 910 } 911 } else { 912 // doc not found, or lock owner didn't match 913 // get the old lock 914 if (log.isTraceEnabled()) { 915 logQuery(id, LOCK_FIELDS); 916 } 917 old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 918 if (old == null) { 919 // document not found 920 throw new DocumentNotFoundException(id); 921 } 922 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 923 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 924 if (oldOwner != null) { 925 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 926 // existing mismatched lock, flag failure 927 return new Lock(oldOwner, oldCreated, true); 928 } 929 // old owner should have matched -- there was a race condition 930 // TODO do better 931 throw new ConcurrentUpdateException("Unlock " + id); 932 } 933 // old owner null, should have matched -- there was a race condition 934 // TODO do better 935 throw new ConcurrentUpdateException("Unlock " + id); 936 } 937 } 938 939 @Override 940 public void closeLockManager() { 941 942 } 943 944 @Override 945 public void clearLockManagerCaches() { 946 } 947 948}