001/* 002 * Copyright (c) 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012package org.nuxeo.ecm.core.storage.mongodb; 013 014import static java.lang.Boolean.TRUE; 015import static org.nuxeo.ecm.core.storage.State.NOP; 016import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 017import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 018import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 019import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 020import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 038 039import java.io.Serializable; 040import java.lang.reflect.Array; 041import java.net.UnknownHostException; 042import java.util.ArrayList; 043import java.util.Arrays; 044import java.util.Calendar; 045import java.util.Date; 046import java.util.HashSet; 047import java.util.List; 048import java.util.Map; 049import java.util.Map.Entry; 050import java.util.Set; 051import java.util.UUID; 052 053import org.apache.commons.lang.StringUtils; 054import org.apache.commons.logging.Log; 055import org.apache.commons.logging.LogFactory; 056import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 057import org.nuxeo.ecm.core.api.DocumentNotFoundException; 058import org.nuxeo.ecm.core.api.Lock; 059import org.nuxeo.ecm.core.api.NuxeoException; 060import org.nuxeo.ecm.core.api.PartialList; 061import org.nuxeo.ecm.core.api.model.Delta; 062import org.nuxeo.ecm.core.blob.BlobManager; 063import org.nuxeo.ecm.core.model.LockManager; 064import org.nuxeo.ecm.core.model.Repository; 065import org.nuxeo.ecm.core.query.sql.model.Expression; 066import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 067import org.nuxeo.ecm.core.query.sql.model.SelectClause; 068import org.nuxeo.ecm.core.storage.State; 069import org.nuxeo.ecm.core.storage.State.ListDiff; 070import org.nuxeo.ecm.core.storage.State.StateDiff; 071import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 072import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 073import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 074import org.nuxeo.runtime.api.Framework; 075 076import com.mongodb.BasicDBObject; 077import com.mongodb.DB; 078import com.mongodb.DBCollection; 079import com.mongodb.DBCursor; 080import com.mongodb.DBObject; 081import com.mongodb.MongoClient; 082import com.mongodb.MongoClientURI; 083import com.mongodb.QueryOperators; 084import com.mongodb.ServerAddress; 085import com.mongodb.WriteResult; 086 087/** 088 * MongoDB implementation of a {@link Repository}. 089 * 090 * @since 5.9.4 091 */ 092public class MongoDBRepository extends DBSRepositoryBase { 093 094 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 095 096 private static final Long ZERO = Long.valueOf(0); 097 098 private static final Long ONE = Long.valueOf(1); 099 100 private static final Long MINUS_ONE = Long.valueOf(-11); 101 102 public static final String DB_DEFAULT = "nuxeo"; 103 104 public static final String MONGODB_ID = "_id"; 105 106 public static final String MONGODB_INC = "$inc"; 107 108 public static final String MONGODB_SET = "$set"; 109 110 public static final String MONGODB_UNSET = "$unset"; 111 112 public static final String MONGODB_PUSH = "$push"; 113 114 public static final String MONGODB_EACH = "$each"; 115 116 public static final String MONGODB_META = "$meta"; 117 118 public static final String MONGODB_TEXT_SCORE = "textScore"; 119 120 private static final String MONGODB_INDEX_TEXT = "text"; 121 122 private static final String MONGODB_INDEX_NAME = "name"; 123 124 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 125 126 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 127 128 private static final String LANGUAGE_FIELD = "__language"; 129 130 protected static final String COUNTER_NAME_UUID = "ecm:id"; 131 132 protected static final String COUNTER_FIELD = "seq"; 133 134 protected MongoClient mongoClient; 135 136 protected DBCollection coll; 137 138 protected DBCollection countersColl; 139 140 public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) { 141 super(descriptor.name, descriptor.getFulltextDisabled()); 142 try { 143 mongoClient = newMongoClient(descriptor); 144 coll = getCollection(descriptor, mongoClient); 145 countersColl = getCountersCollection(descriptor, mongoClient); 146 } catch (UnknownHostException e) { 147 throw new RuntimeException(e); 148 } 149 initRepository(); 150 } 151 152 @Override 153 public void shutdown() { 154 super.shutdown(); 155 mongoClient.close(); 156 } 157 158 // used also by unit tests 159 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 160 String server = descriptor.server; 161 if (StringUtils.isBlank(server)) { 162 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 163 } 164 if (server.startsWith("mongodb://")) { 165 // allow mongodb:// URI syntax for the server, to pass everything in one string 166 return new MongoClient(new MongoClientURI(server)); 167 } else { 168 return new MongoClient(new ServerAddress(server)); 169 } 170 } 171 172 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 173 if (StringUtils.isBlank(dbname)) { 174 dbname = DB_DEFAULT; 175 } 176 DB db = mongoClient.getDB(dbname); 177 return db.getCollection(collection); 178 } 179 180 // used also by unit tests 181 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 182 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 183 } 184 185 // used also by unit tests 186 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 187 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 188 } 189 190 protected Object valueToBson(Object value) { 191 if (value instanceof State) { 192 return stateToBson((State) value); 193 } else if (value instanceof List) { 194 @SuppressWarnings("unchecked") 195 List<Object> values = (List<Object>) value; 196 return listToBson(values); 197 } else if (value instanceof Object[]) { 198 return listToBson(Arrays.asList((Object[]) value)); 199 } else { 200 return serializableToBson(value); 201 } 202 } 203 204 protected DBObject stateToBson(State state) { 205 DBObject ob = new BasicDBObject(); 206 for (Entry<String, Serializable> en : state.entrySet()) { 207 Object val = valueToBson(en.getValue()); 208 if (val != null) { 209 ob.put(en.getKey(), val); 210 } 211 } 212 return ob; 213 } 214 215 protected List<Object> listToBson(List<Object> values) { 216 ArrayList<Object> objects = new ArrayList<Object>(values.size()); 217 for (Object value : values) { 218 objects.add(valueToBson(value)); 219 } 220 return objects; 221 } 222 223 protected State bsonToState(DBObject ob) { 224 if (ob == null) { 225 return null; 226 } 227 State state = new State(ob.keySet().size()); 228 for (String key : ob.keySet()) { 229 Object val = ob.get(key); 230 Serializable value; 231 if (val instanceof List) { 232 @SuppressWarnings("unchecked") 233 List<Object> list = (List<Object>) val; 234 if (list.isEmpty()) { 235 value = null; 236 } else { 237 if (list.get(0) instanceof DBObject) { 238 List<Serializable> l = new ArrayList<>(list.size()); 239 for (Object el : list) { 240 l.add(bsonToState((DBObject) el)); 241 } 242 value = (Serializable) l; 243 } else { 244 // turn the list into a properly-typed array 245 Class<?> klass = list.get(0).getClass(); 246 Object[] ar = (Object[]) Array.newInstance(klass, list.size()); 247 int i = 0; 248 for (Object el : list) { 249 ar[i++] = scalarToSerializable(el); 250 } 251 value = ar; 252 } 253 } 254 } else if (val instanceof DBObject) { 255 value = bsonToState((DBObject) val); 256 } else { 257 if (MONGODB_ID.equals(key)) { 258 // skip ObjectId 259 continue; 260 } 261 value = scalarToSerializable(val); 262 } 263 state.put(key, value); 264 } 265 return state; 266 } 267 268 public static class Updates { 269 public BasicDBObject set = new BasicDBObject(); 270 271 public BasicDBObject unset = new BasicDBObject(); 272 273 public BasicDBObject push = new BasicDBObject(); 274 275 public BasicDBObject inc = new BasicDBObject(); 276 } 277 278 /** 279 * Constructs a list of MongoDB updates from the given {@link StateDiff}. 280 * <p> 281 * We need a list because some cases need two operations to avoid conflicts. 282 */ 283 protected List<DBObject> diffToBson(StateDiff diff) { 284 Updates updates = new Updates(); 285 diffToUpdates(diff, null, updates); 286 UpdateListBuilder builder = new UpdateListBuilder(); 287 for (Entry<String, Object> en : updates.set.entrySet()) { 288 builder.update(MONGODB_SET, en.getKey(), en.getValue()); 289 } 290 for (Entry<String, Object> en : updates.unset.entrySet()) { 291 builder.update(MONGODB_UNSET, en.getKey(), en.getValue()); 292 } 293 for (Entry<String, Object> en : updates.push.entrySet()) { 294 builder.update(MONGODB_PUSH, en.getKey(), en.getValue()); 295 } 296 for (Entry<String, Object> en : updates.inc.entrySet()) { 297 builder.update(MONGODB_INC, en.getKey(), en.getValue()); 298 } 299 return builder.updateList; 300 } 301 302 /** 303 * Update list builder to prevent several updates of the same field. 304 * <p> 305 * This happens if two operations act on two fields where one is a prefix of the other. 306 * <p> 307 * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837) 308 * 309 * @since 5.9.5 310 */ 311 protected static class UpdateListBuilder { 312 313 protected List<DBObject> updateList = new ArrayList<>(1); 314 315 protected DBObject update; 316 317 protected List<String> keys; 318 319 protected UpdateListBuilder() { 320 newUpdate(); 321 } 322 323 protected void newUpdate() { 324 updateList.add(update = new BasicDBObject()); 325 keys = new ArrayList<>(); 326 } 327 328 protected void update(String op, String key, Object value) { 329 if (conflicts(key, keys)) { 330 newUpdate(); 331 } 332 keys.add(key); 333 DBObject map = (DBObject) update.get(op); 334 if (map == null) { 335 update.put(op, map = new BasicDBObject()); 336 } 337 map.put(key, value); 338 } 339 340 /** 341 * Checks if the key conflicts with one of the previous keys. 342 * <p> 343 * A conflict occurs if one key is equals to or is a prefix of the other. 344 */ 345 protected boolean conflicts(String key, List<String> previousKeys) { 346 String keydot = key + '.'; 347 for (String prev : previousKeys) { 348 if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) { 349 return true; 350 } 351 } 352 return false; 353 } 354 } 355 356 protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) { 357 String elemPrefix = prefix == null ? "" : prefix + '.'; 358 for (Entry<String, Serializable> en : diff.entrySet()) { 359 String name = elemPrefix + en.getKey(); 360 Serializable value = en.getValue(); 361 if (value instanceof StateDiff) { 362 diffToUpdates((StateDiff) value, name, updates); 363 } else if (value instanceof ListDiff) { 364 diffToUpdates((ListDiff) value, name, updates); 365 } else if (value instanceof Delta) { 366 diffToUpdates((Delta) value, name, updates); 367 } else { 368 // not a diff 369 updates.set.put(name, valueToBson(value)); 370 } 371 } 372 } 373 374 protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) { 375 if (listDiff.diff != null) { 376 String elemPrefix = prefix == null ? "" : prefix + '.'; 377 int i = 0; 378 for (Object value : listDiff.diff) { 379 String name = elemPrefix + i; 380 if (value instanceof StateDiff) { 381 diffToUpdates((StateDiff) value, name, updates); 382 } else if (value != NOP) { 383 // set value 384 updates.set.put(name, valueToBson(value)); 385 } 386 i++; 387 } 388 } 389 if (listDiff.rpush != null) { 390 Object pushed; 391 if (listDiff.rpush.size() == 1) { 392 // no need to use $each for one element 393 pushed = valueToBson(listDiff.rpush.get(0)); 394 } else { 395 pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush)); 396 } 397 updates.push.put(prefix, pushed); 398 } 399 } 400 401 protected void diffToUpdates(Delta delta, String prefix, Updates updates) { 402 Object inc = valueToBson(delta.getDeltaValue()); 403 updates.inc.put(prefix, inc); 404 } 405 406 protected Object serializableToBson(Object value) { 407 if (value instanceof Calendar) { 408 return ((Calendar) value).getTime(); 409 } 410 return value; 411 } 412 413 protected Serializable scalarToSerializable(Object val) { 414 if (val instanceof Date) { 415 Calendar cal = Calendar.getInstance(); 416 cal.setTime((Date) val); 417 return cal; 418 } 419 return (Serializable) val; 420 } 421 422 protected void initRepository() { 423 // create required indexes 424 // code does explicit queries on those 425 coll.createIndex(new BasicDBObject(KEY_ID, ONE)); 426 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 427 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 428 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 429 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 430 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 431 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 432 DBObject parentChild = new BasicDBObject(); 433 parentChild.put(KEY_PARENT_ID, ONE); 434 parentChild.put(KEY_NAME, ONE); 435 coll.createIndex(parentChild); 436 // often used in user-generated queries 437 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 438 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 439 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 440 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 441 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 442 // TODO configure these from somewhere else 443 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 444 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 445 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 446 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 447 if (!fulltextDisabled) { 448 DBObject indexKeys = new BasicDBObject(); 449 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 450 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 451 DBObject indexOptions = new BasicDBObject(); 452 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 453 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 454 coll.createIndex(indexKeys, indexOptions); 455 } 456 // check root presence 457 DBObject query = new BasicDBObject(KEY_ID, getRootId()); 458 if (coll.findOne(query, justPresenceField()) != null) { 459 return; 460 } 461 // create basic repository structure needed 462 if (DEBUG_UUIDS) { 463 // create the id counter 464 DBObject idCounter = new BasicDBObject(); 465 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 466 idCounter.put(COUNTER_FIELD, ZERO); 467 countersColl.insert(idCounter); 468 } 469 initRoot(); 470 } 471 472 protected Long getNextUuidSeq() { 473 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 474 DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE)); 475 boolean returnNew = true; 476 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false); 477 if (idCounter == null) { 478 throw new RuntimeException("Repository id counter not initialized"); 479 } 480 return (Long) idCounter.get(COUNTER_FIELD); 481 } 482 483 @Override 484 public String generateNewId() { 485 if (DEBUG_UUIDS) { 486 Long id = getNextUuidSeq(); 487 return "UUID_" + id; 488 } else { 489 return UUID.randomUUID().toString(); 490 } 491 } 492 493 @Override 494 public void createState(State state) { 495 DBObject ob = stateToBson(state); 496 if (log.isTraceEnabled()) { 497 log.trace("MongoDB: CREATE " + ob); 498 } 499 coll.insert(ob); 500 // TODO dupe exception 501 // throw new DocumentException("Already exists: " + id); 502 } 503 504 @Override 505 public State readState(String id) { 506 DBObject query = new BasicDBObject(KEY_ID, id); 507 return findOne(query); 508 } 509 510 @Override 511 public List<State> readStates(List<String> ids) { 512 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 513 return findAll(query, ids.size()); 514 } 515 516 @Override 517 public void updateState(String id, StateDiff diff) { 518 DBObject query = new BasicDBObject(KEY_ID, id); 519 for (DBObject update : diffToBson(diff)) { 520 if (log.isTraceEnabled()) { 521 log.trace("MongoDB: UPDATE " + id + ": " + update); 522 } 523 coll.update(query, update); 524 // TODO dupe exception 525 // throw new DocumentException("Missing: " + id); 526 } 527 } 528 529 @Override 530 public void deleteStates(Set<String> ids) { 531 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 532 if (log.isTraceEnabled()) { 533 log.trace("MongoDB: REMOVE " + ids); 534 } 535 WriteResult w = coll.remove(query); 536 if (w.getN() != ids.size()) { 537 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 538 } 539 } 540 541 @Override 542 public State readChildState(String parentId, String name, Set<String> ignored) { 543 DBObject query = getChildQuery(parentId, name, ignored); 544 return findOne(query); 545 } 546 547 protected void logQuery(String id, DBObject fields) { 548 logQuery(new BasicDBObject(KEY_ID, id), fields); 549 } 550 551 protected void logQuery(DBObject query, DBObject fields) { 552 if (fields == null) { 553 log.trace("MongoDB: QUERY " + query); 554 } else { 555 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 556 } 557 } 558 559 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 560 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 561 + " OFFSET " + offset + " LIMIT " + limit); 562 } 563 564 @Override 565 public boolean hasChild(String parentId, String name, Set<String> ignored) { 566 DBObject query = getChildQuery(parentId, name, ignored); 567 if (log.isTraceEnabled()) { 568 logQuery(query, justPresenceField()); 569 } 570 return coll.findOne(query, justPresenceField()) != null; 571 } 572 573 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 574 DBObject query = new BasicDBObject(); 575 query.put(KEY_PARENT_ID, parentId); 576 query.put(KEY_NAME, name); 577 addIgnoredIds(query, ignored); 578 return query; 579 } 580 581 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 582 if (!ignored.isEmpty()) { 583 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 584 query.put(KEY_ID, notInIds); 585 } 586 } 587 588 @Override 589 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 590 DBObject query = new BasicDBObject(key, value); 591 addIgnoredIds(query, ignored); 592 return findAll(query, 0); 593 } 594 595 @Override 596 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 597 DBObject query = new BasicDBObject(key1, value1); 598 query.put(key2, value2); 599 addIgnoredIds(query, ignored); 600 return findAll(query, 0); 601 } 602 603 @Override 604 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 605 Map<String, Object[]> targetProxies) { 606 DBObject query = new BasicDBObject(key, value); 607 DBObject fields = new BasicDBObject(); 608 fields.put(MONGODB_ID, ZERO); 609 fields.put(KEY_ID, ONE); 610 fields.put(KEY_IS_PROXY, ONE); 611 fields.put(KEY_PROXY_TARGET_ID, ONE); 612 fields.put(KEY_PROXY_IDS, ONE); 613 if (log.isTraceEnabled()) { 614 logQuery(query, fields); 615 } 616 DBCursor cursor = coll.find(query, fields); 617 try { 618 for (DBObject ob : cursor) { 619 String id = (String) ob.get(KEY_ID); 620 ids.add(id); 621 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 622 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 623 proxyTargets.put(id, targetId); 624 } 625 if (targetProxies != null) { 626 Object[] proxyIds = (Object[]) ob.get(KEY_PROXY_IDS); 627 if (proxyIds != null) { 628 targetProxies.put(id, proxyIds); 629 } 630 } 631 } 632 } finally { 633 cursor.close(); 634 } 635 } 636 637 @Override 638 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 639 DBObject query = new BasicDBObject(key, value); 640 addIgnoredIds(query, ignored); 641 if (log.isTraceEnabled()) { 642 logQuery(query, justPresenceField()); 643 } 644 return coll.findOne(query, justPresenceField()) != null; 645 } 646 647 protected State findOne(DBObject query) { 648 if (log.isTraceEnabled()) { 649 logQuery(query, null); 650 } 651 return bsonToState(coll.findOne(query)); 652 } 653 654 protected List<State> findAll(DBObject query, int sizeHint) { 655 if (log.isTraceEnabled()) { 656 logQuery(query, null); 657 } 658 DBCursor cursor = coll.find(query); 659 Set<String> seen = new HashSet<>(); 660 try { 661 List<State> list = new ArrayList<>(sizeHint); 662 for (DBObject ob : cursor) { 663 if (!seen.add((String) ob.get(KEY_ID))) { 664 // MongoDB cursors may return the same 665 // object several times 666 continue; 667 } 668 list.add(bsonToState(ob)); 669 } 670 return list; 671 } finally { 672 cursor.close(); 673 } 674 } 675 676 protected DBObject justPresenceField() { 677 return new BasicDBObject(MONGODB_ID, ONE); 678 } 679 680 @Override 681 public PartialList<State> queryAndFetch(Expression expression, SelectClause selectClause, OrderByClause orderByClause, 682 int limit, int offset, int countUpTo, DBSExpressionEvaluator evaluator, boolean deepCopy) { 683 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(expression, selectClause, orderByClause, 684 evaluator.pathResolver); 685 builder.walk(); 686 if (builder.hasFulltext && fulltextDisabled) { 687 throw new RuntimeException("Fulltext disabled by configuration"); 688 } 689 DBObject query = builder.getQuery(); 690 addPrincipals(query, evaluator.principals); 691 DBObject orderBy = builder.getOrderBy(); 692 DBObject keys = builder.getProjection(); 693 694 if (log.isTraceEnabled()) { 695 logQuery(query, keys, orderBy, limit, offset); 696 } 697 698 List<State> list; 699 long totalSize; 700 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 701 try { 702 if (orderBy != null) { 703 cursor = cursor.sort(orderBy); 704 } 705 list = new ArrayList<>(); 706 for (DBObject ob : cursor) { 707 list.add(bsonToState(ob)); 708 } 709 if (countUpTo == -1) { 710 // count full size 711 if (limit == 0) { 712 totalSize = list.size(); 713 } else { 714 totalSize = cursor.count(); 715 } 716 } else if (countUpTo == 0) { 717 // no count 718 totalSize = -1; // not counted 719 } else { 720 // count only if less than countUpTo 721 if (limit == 0) { 722 totalSize = list.size(); 723 } else { 724 totalSize = cursor.copy().limit(countUpTo + 1).count(); 725 } 726 if (totalSize > countUpTo) { 727 totalSize = -2; // truncated 728 } 729 } 730 } finally { 731 cursor.close(); 732 } 733 if (log.isTraceEnabled() && list.size() != 0) { 734 log.trace("MongoDB: -> " + list.size()); 735 } 736 return new PartialList<>(list, totalSize); 737 } 738 739 protected void addPrincipals(DBObject query, Set<String> principals) { 740 if (principals != null) { 741 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 742 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 743 } 744 } 745 746 /** Keys used for document projection when marking all binaries for GC. */ 747 protected DBObject binaryKeys; 748 749 @Override 750 protected void initBlobsPaths() { 751 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 752 finder.visit(); 753 binaryKeys = finder.binaryKeys; 754 } 755 756 protected static class MongoDBBlobFinder extends BlobFinder { 757 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 758 759 @Override 760 protected void recordBlobPath() { 761 path.addLast(KEY_BLOB_DATA); 762 binaryKeys.put(StringUtils.join(path, "."), ONE); 763 path.removeLast(); 764 } 765 } 766 767 @Override 768 public void markReferencedBinaries() { 769 BlobManager blobManager = Framework.getService(BlobManager.class); 770 // TODO add a query to not scan all documents 771 if (log.isTraceEnabled()) { 772 logQuery(new BasicDBObject(), binaryKeys); 773 } 774 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 775 try { 776 for (DBObject ob : cursor) { 777 markReferencedBinaries(ob, blobManager); 778 } 779 } finally { 780 cursor.close(); 781 } 782 } 783 784 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 785 for (String key : ob.keySet()) { 786 Object value = ob.get(key); 787 if (value instanceof List) { 788 @SuppressWarnings("unchecked") 789 List<Object> list = (List<Object>) value; 790 for (Object v : list) { 791 if (v instanceof DBObject) { 792 markReferencedBinaries((DBObject) v, blobManager); 793 } else { 794 markReferencedBinary(v, blobManager); 795 } 796 } 797 } else if (value instanceof Object[]) { 798 for (Object v : (Object[]) value) { 799 markReferencedBinary(v, blobManager); 800 } 801 } else if (value instanceof DBObject) { 802 markReferencedBinaries((DBObject) value, blobManager); 803 } else { 804 markReferencedBinary(value, blobManager); 805 } 806 } 807 } 808 809 protected void markReferencedBinary(Object value, BlobManager blobManager) { 810 if (!(value instanceof String)) { 811 return; 812 } 813 String key = (String) value; 814 blobManager.markReferencedBinary(key, repositoryName); 815 } 816 817 protected static final DBObject LOCK_FIELDS; 818 819 static { 820 LOCK_FIELDS = new BasicDBObject(); 821 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 822 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 823 } 824 825 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 826 827 @Override 828 public Lock getLock(String id) { 829 if (log.isTraceEnabled()) { 830 logQuery(id, LOCK_FIELDS); 831 } 832 DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 833 if (res == null) { 834 // document not found 835 throw new DocumentNotFoundException(id); 836 } 837 String owner = (String) res.get(KEY_LOCK_OWNER); 838 if (owner == null) { 839 // not locked 840 return null; 841 } 842 Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED)); 843 return new Lock(owner, created); 844 } 845 846 @Override 847 public Lock setLock(String id, Lock lock) { 848 DBObject query = new BasicDBObject(KEY_ID, id); 849 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 850 DBObject setLock = new BasicDBObject(); 851 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 852 setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated())); 853 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 854 if (log.isTraceEnabled()) { 855 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 856 } 857 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 858 if (res != null) { 859 // found a doc to lock 860 return null; 861 } else { 862 // doc not found, or lock owner already set 863 // get the old lock 864 if (log.isTraceEnabled()) { 865 logQuery(id, LOCK_FIELDS); 866 } 867 DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 868 if (old == null) { 869 // document not found 870 throw new DocumentNotFoundException(id); 871 } 872 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 873 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 874 if (oldOwner != null) { 875 return new Lock(oldOwner, oldCreated); 876 } 877 // no lock -- there was a race condition 878 // TODO do better 879 throw new ConcurrentUpdateException("Lock " + id); 880 } 881 } 882 883 @Override 884 public Lock removeLock(String id, String owner) { 885 DBObject query = new BasicDBObject(KEY_ID, id); 886 if (owner != null) { 887 // remove if owner matches or null 888 // implements LockManager.canLockBeRemoved inside MongoDB 889 Object ownerOrNull = Arrays.asList(owner, null); 890 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 891 } // else unconditional remove 892 // remove the lock 893 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 894 if (old != null) { 895 // found a doc and removed the lock, return previous lock 896 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 897 if (oldOwner == null) { 898 // was not locked 899 return null; 900 } else { 901 // return previous lock 902 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 903 return new Lock(oldOwner, oldCreated); 904 } 905 } else { 906 // doc not found, or lock owner didn't match 907 // get the old lock 908 if (log.isTraceEnabled()) { 909 logQuery(id, LOCK_FIELDS); 910 } 911 old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 912 if (old == null) { 913 // document not found 914 throw new DocumentNotFoundException(id); 915 } 916 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 917 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 918 if (oldOwner != null) { 919 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 920 // existing mismatched lock, flag failure 921 return new Lock(oldOwner, oldCreated, true); 922 } 923 // old owner should have matched -- there was a race condition 924 // TODO do better 925 throw new ConcurrentUpdateException("Unlock " + id); 926 } 927 // old owner null, should have matched -- there was a race condition 928 // TODO do better 929 throw new ConcurrentUpdateException("Unlock " + id); 930 } 931 } 932 933 @Override 934 public void closeLockManager() { 935 936 } 937 938 @Override 939 public void clearLockManagerCaches() { 940 } 941 942}