001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PREFIX; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 045import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 046 047import java.io.Serializable; 048import java.lang.reflect.Array; 049import java.net.UnknownHostException; 050import java.util.ArrayList; 051import java.util.Arrays; 052import java.util.Calendar; 053import java.util.Date; 054import java.util.HashMap; 055import java.util.HashSet; 056import java.util.List; 057import java.util.Map; 058import java.util.Map.Entry; 059import java.util.Set; 060import java.util.UUID; 061 062import org.apache.commons.lang.StringUtils; 063import org.apache.commons.logging.Log; 064import org.apache.commons.logging.LogFactory; 065import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 066import org.nuxeo.ecm.core.api.DocumentNotFoundException; 067import org.nuxeo.ecm.core.api.Lock; 068import org.nuxeo.ecm.core.api.NuxeoException; 069import org.nuxeo.ecm.core.api.PartialList; 070import org.nuxeo.ecm.core.api.model.Delta; 071import org.nuxeo.ecm.core.blob.BlobManager; 072import org.nuxeo.ecm.core.model.LockManager; 073import org.nuxeo.ecm.core.model.Repository; 074import org.nuxeo.ecm.core.query.QueryParseException; 075import org.nuxeo.ecm.core.query.sql.model.Expression; 076import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 077import org.nuxeo.ecm.core.query.sql.model.SelectClause; 078import org.nuxeo.ecm.core.storage.State; 079import org.nuxeo.ecm.core.storage.State.ListDiff; 080import org.nuxeo.ecm.core.storage.State.StateDiff; 081import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 082import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 083import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 084import org.nuxeo.ecm.core.storage.dbs.DBSSession; 085import org.nuxeo.runtime.api.Framework; 086 087import com.mongodb.BasicDBObject; 088import com.mongodb.DB; 089import com.mongodb.DBCollection; 090import com.mongodb.DBCursor; 091import com.mongodb.DBObject; 092import com.mongodb.MongoClient; 093import com.mongodb.MongoClientURI; 094import com.mongodb.QueryOperators; 095import com.mongodb.ServerAddress; 096import com.mongodb.WriteResult; 097 098/** 099 * MongoDB implementation of a {@link Repository}. 100 * 101 * @since 5.9.4 102 */ 103public class MongoDBRepository extends DBSRepositoryBase { 104 105 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 106 107 private static final Long ZERO = Long.valueOf(0); 108 109 private static final Long ONE = Long.valueOf(1); 110 111 private static final Long MINUS_ONE = Long.valueOf(-11); 112 113 public static final String DB_DEFAULT = "nuxeo"; 114 115 public static final String MONGODB_ID = "_id"; 116 117 public static final String MONGODB_INC = "$inc"; 118 119 public static final String MONGODB_SET = "$set"; 120 121 public static final String MONGODB_UNSET = "$unset"; 122 123 public static final String MONGODB_PUSH = "$push"; 124 125 public static final String MONGODB_EACH = "$each"; 126 127 public static final String MONGODB_META = "$meta"; 128 129 public static final String MONGODB_TEXT_SCORE = "textScore"; 130 131 private static final String MONGODB_INDEX_TEXT = "text"; 132 133 private static final String MONGODB_INDEX_NAME = "name"; 134 135 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 136 137 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 138 139 private static final String LANGUAGE_FIELD = "__language"; 140 141 protected static final String COUNTER_NAME_UUID = "ecm:id"; 142 143 protected static final String COUNTER_FIELD = "seq"; 144 145 protected MongoClient mongoClient; 146 147 protected DBCollection coll; 148 149 protected DBCollection countersColl; 150 151 public MongoDBRepository(MongoDBRepositoryDescriptor descriptor) { 152 super(descriptor.name, descriptor.getFulltextDescriptor()); 153 try { 154 mongoClient = newMongoClient(descriptor); 155 coll = getCollection(descriptor, mongoClient); 156 countersColl = getCountersCollection(descriptor, mongoClient); 157 } catch (UnknownHostException e) { 158 throw new RuntimeException(e); 159 } 160 initRepository(); 161 } 162 163 @Override 164 public void shutdown() { 165 super.shutdown(); 166 mongoClient.close(); 167 } 168 169 // used also by unit tests 170 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 171 String server = descriptor.server; 172 if (StringUtils.isBlank(server)) { 173 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 174 } 175 if (server.startsWith("mongodb://")) { 176 // allow mongodb:// URI syntax for the server, to pass everything in one string 177 return new MongoClient(new MongoClientURI(server)); 178 } else { 179 return new MongoClient(new ServerAddress(server)); 180 } 181 } 182 183 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 184 if (StringUtils.isBlank(dbname)) { 185 dbname = DB_DEFAULT; 186 } 187 DB db = mongoClient.getDB(dbname); 188 return db.getCollection(collection); 189 } 190 191 // used also by unit tests 192 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 193 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 194 } 195 196 // used also by unit tests 197 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 198 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 199 } 200 201 protected Object valueToBson(Object value) { 202 if (value instanceof State) { 203 return stateToBson((State) value); 204 } else if (value instanceof List) { 205 @SuppressWarnings("unchecked") 206 List<Object> values = (List<Object>) value; 207 return listToBson(values); 208 } else if (value instanceof Object[]) { 209 return listToBson(Arrays.asList((Object[]) value)); 210 } else { 211 return serializableToBson(value); 212 } 213 } 214 215 protected DBObject stateToBson(State state) { 216 DBObject ob = new BasicDBObject(); 217 for (Entry<String, Serializable> en : state.entrySet()) { 218 Object val = valueToBson(en.getValue()); 219 if (val != null) { 220 ob.put(en.getKey(), val); 221 } 222 } 223 return ob; 224 } 225 226 protected List<Object> listToBson(List<Object> values) { 227 ArrayList<Object> objects = new ArrayList<Object>(values.size()); 228 for (Object value : values) { 229 objects.add(valueToBson(value)); 230 } 231 return objects; 232 } 233 234 protected State bsonToState(DBObject ob) { 235 if (ob == null) { 236 return null; 237 } 238 State state = new State(ob.keySet().size()); 239 for (String key : ob.keySet()) { 240 Object val = ob.get(key); 241 Serializable value; 242 if (val instanceof List) { 243 @SuppressWarnings("unchecked") 244 List<Object> list = (List<Object>) val; 245 if (list.isEmpty()) { 246 value = null; 247 } else { 248 if (list.get(0) instanceof DBObject) { 249 List<Serializable> l = new ArrayList<>(list.size()); 250 for (Object el : list) { 251 l.add(bsonToState((DBObject) el)); 252 } 253 value = (Serializable) l; 254 } else { 255 // turn the list into a properly-typed array 256 Class<?> klass = Object.class; 257 for (Object o : list) { 258 if (o != null) { 259 klass = scalarToSerializableClass(o.getClass()); 260 break; 261 } 262 } 263 Object[] ar = (Object[]) Array.newInstance(klass, list.size()); 264 int i = 0; 265 for (Object el : list) { 266 ar[i++] = scalarToSerializable(el); 267 } 268 value = ar; 269 } 270 } 271 } else if (val instanceof DBObject) { 272 value = bsonToState((DBObject) val); 273 } else { 274 if (MONGODB_ID.equals(key)) { 275 // skip ObjectId 276 continue; 277 } 278 value = scalarToSerializable(val); 279 } 280 state.put(key, value); 281 } 282 return state; 283 } 284 285 public static class Updates { 286 public BasicDBObject set = new BasicDBObject(); 287 288 public BasicDBObject unset = new BasicDBObject(); 289 290 public BasicDBObject push = new BasicDBObject(); 291 292 public BasicDBObject inc = new BasicDBObject(); 293 } 294 295 /** 296 * Constructs a list of MongoDB updates from the given {@link StateDiff}. 297 * <p> 298 * We need a list because some cases need two operations to avoid conflicts. 299 */ 300 protected List<DBObject> diffToBson(StateDiff diff) { 301 Updates updates = new Updates(); 302 diffToUpdates(diff, null, updates); 303 UpdateListBuilder builder = new UpdateListBuilder(); 304 for (Entry<String, Object> en : updates.set.entrySet()) { 305 builder.update(MONGODB_SET, en.getKey(), en.getValue()); 306 } 307 for (Entry<String, Object> en : updates.unset.entrySet()) { 308 builder.update(MONGODB_UNSET, en.getKey(), en.getValue()); 309 } 310 for (Entry<String, Object> en : updates.push.entrySet()) { 311 builder.update(MONGODB_PUSH, en.getKey(), en.getValue()); 312 } 313 for (Entry<String, Object> en : updates.inc.entrySet()) { 314 builder.update(MONGODB_INC, en.getKey(), en.getValue()); 315 } 316 return builder.updateList; 317 } 318 319 /** 320 * Update list builder to prevent several updates of the same field. 321 * <p> 322 * This happens if two operations act on two fields where one is a prefix of the other. 323 * <p> 324 * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837) 325 * 326 * @since 5.9.5 327 */ 328 protected static class UpdateListBuilder { 329 330 protected List<DBObject> updateList = new ArrayList<>(1); 331 332 protected DBObject update; 333 334 protected List<String> keys; 335 336 protected UpdateListBuilder() { 337 newUpdate(); 338 } 339 340 protected void newUpdate() { 341 updateList.add(update = new BasicDBObject()); 342 keys = new ArrayList<>(); 343 } 344 345 protected void update(String op, String key, Object value) { 346 if (conflicts(key, keys)) { 347 newUpdate(); 348 } 349 keys.add(key); 350 DBObject map = (DBObject) update.get(op); 351 if (map == null) { 352 update.put(op, map = new BasicDBObject()); 353 } 354 map.put(key, value); 355 } 356 357 /** 358 * Checks if the key conflicts with one of the previous keys. 359 * <p> 360 * A conflict occurs if one key is equals to or is a prefix of the other. 361 */ 362 protected boolean conflicts(String key, List<String> previousKeys) { 363 String keydot = key + '.'; 364 for (String prev : previousKeys) { 365 if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) { 366 return true; 367 } 368 } 369 return false; 370 } 371 } 372 373 protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) { 374 String elemPrefix = prefix == null ? "" : prefix + '.'; 375 for (Entry<String, Serializable> en : diff.entrySet()) { 376 String name = elemPrefix + en.getKey(); 377 Serializable value = en.getValue(); 378 if (value instanceof StateDiff) { 379 diffToUpdates((StateDiff) value, name, updates); 380 } else if (value instanceof ListDiff) { 381 diffToUpdates((ListDiff) value, name, updates); 382 } else if (value instanceof Delta) { 383 diffToUpdates((Delta) value, name, updates); 384 } else { 385 // not a diff 386 updates.set.put(name, valueToBson(value)); 387 } 388 } 389 } 390 391 protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) { 392 if (listDiff.diff != null) { 393 String elemPrefix = prefix == null ? "" : prefix + '.'; 394 int i = 0; 395 for (Object value : listDiff.diff) { 396 String name = elemPrefix + i; 397 if (value instanceof StateDiff) { 398 diffToUpdates((StateDiff) value, name, updates); 399 } else if (value != NOP) { 400 // set value 401 updates.set.put(name, valueToBson(value)); 402 } 403 i++; 404 } 405 } 406 if (listDiff.rpush != null) { 407 Object pushed; 408 if (listDiff.rpush.size() == 1) { 409 // no need to use $each for one element 410 pushed = valueToBson(listDiff.rpush.get(0)); 411 } else { 412 pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush)); 413 } 414 updates.push.put(prefix, pushed); 415 } 416 } 417 418 protected void diffToUpdates(Delta delta, String prefix, Updates updates) { 419 Object inc = valueToBson(delta.getDeltaValue()); 420 updates.inc.put(prefix, inc); 421 } 422 423 protected Object serializableToBson(Object value) { 424 if (value instanceof Calendar) { 425 return ((Calendar) value).getTime(); 426 } 427 return value; 428 } 429 430 protected Serializable scalarToSerializable(Object val) { 431 if (val instanceof Date) { 432 Calendar cal = Calendar.getInstance(); 433 cal.setTime((Date) val); 434 return cal; 435 } 436 return (Serializable) val; 437 } 438 439 protected Class<?> scalarToSerializableClass(Class<?> klass) { 440 if (Date.class.isAssignableFrom(klass)) { 441 return Calendar.class; 442 } 443 return klass; 444 } 445 446 protected void initRepository() { 447 // create required indexes 448 // code does explicit queries on those 449 coll.createIndex(new BasicDBObject(KEY_ID, ONE)); 450 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 451 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 452 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 453 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 454 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 455 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 456 DBObject parentChild = new BasicDBObject(); 457 parentChild.put(KEY_PARENT_ID, ONE); 458 parentChild.put(KEY_NAME, ONE); 459 coll.createIndex(parentChild); 460 // often used in user-generated queries 461 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 462 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 463 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 464 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 465 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 466 // TODO configure these from somewhere else 467 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 468 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 469 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 470 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 471 if (!isFulltextDisabled()) { 472 DBObject indexKeys = new BasicDBObject(); 473 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 474 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 475 DBObject indexOptions = new BasicDBObject(); 476 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 477 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 478 coll.createIndex(indexKeys, indexOptions); 479 } 480 // check root presence 481 DBObject query = new BasicDBObject(KEY_ID, getRootId()); 482 if (coll.findOne(query, justPresenceField()) != null) { 483 return; 484 } 485 // create basic repository structure needed 486 if (DEBUG_UUIDS) { 487 // create the id counter 488 DBObject idCounter = new BasicDBObject(); 489 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 490 idCounter.put(COUNTER_FIELD, ZERO); 491 countersColl.insert(idCounter); 492 } 493 initRoot(); 494 } 495 496 protected Long getNextUuidSeq() { 497 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 498 DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE)); 499 boolean returnNew = true; 500 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false); 501 if (idCounter == null) { 502 throw new RuntimeException("Repository id counter not initialized"); 503 } 504 return (Long) idCounter.get(COUNTER_FIELD); 505 } 506 507 @Override 508 public String generateNewId() { 509 if (DEBUG_UUIDS) { 510 Long id = getNextUuidSeq(); 511 return "UUID_" + id; 512 } else { 513 return UUID.randomUUID().toString(); 514 } 515 } 516 517 @Override 518 public void createState(State state) { 519 DBObject ob = stateToBson(state); 520 if (log.isTraceEnabled()) { 521 log.trace("MongoDB: CREATE " + ob.get(KEY_ID) + ": " + ob); 522 } 523 coll.insert(ob); 524 // TODO dupe exception 525 // throw new DocumentException("Already exists: " + id); 526 } 527 528 @Override 529 public State readState(String id) { 530 DBObject query = new BasicDBObject(KEY_ID, id); 531 return findOne(query); 532 } 533 534 @Override 535 public List<State> readStates(List<String> ids) { 536 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 537 return findAll(query, ids.size()); 538 } 539 540 @Override 541 public void updateState(String id, StateDiff diff) { 542 DBObject query = new BasicDBObject(KEY_ID, id); 543 for (DBObject update : diffToBson(diff)) { 544 if (log.isTraceEnabled()) { 545 log.trace("MongoDB: UPDATE " + id + ": " + update); 546 } 547 coll.update(query, update); 548 // TODO dupe exception 549 // throw new DocumentException("Missing: " + id); 550 } 551 } 552 553 @Override 554 public void deleteStates(Set<String> ids) { 555 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 556 if (log.isTraceEnabled()) { 557 log.trace("MongoDB: REMOVE " + ids); 558 } 559 WriteResult w = coll.remove(query); 560 if (w.getN() != ids.size()) { 561 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 562 } 563 } 564 565 @Override 566 public State readChildState(String parentId, String name, Set<String> ignored) { 567 DBObject query = getChildQuery(parentId, name, ignored); 568 return findOne(query); 569 } 570 571 protected void logQuery(String id, DBObject fields) { 572 logQuery(new BasicDBObject(KEY_ID, id), fields); 573 } 574 575 protected void logQuery(DBObject query, DBObject fields) { 576 if (fields == null) { 577 log.trace("MongoDB: QUERY " + query); 578 } else { 579 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 580 } 581 } 582 583 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 584 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 585 + " OFFSET " + offset + " LIMIT " + limit); 586 } 587 588 @Override 589 public boolean hasChild(String parentId, String name, Set<String> ignored) { 590 DBObject query = getChildQuery(parentId, name, ignored); 591 if (log.isTraceEnabled()) { 592 logQuery(query, justPresenceField()); 593 } 594 return coll.findOne(query, justPresenceField()) != null; 595 } 596 597 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 598 DBObject query = new BasicDBObject(); 599 query.put(KEY_PARENT_ID, parentId); 600 query.put(KEY_NAME, name); 601 addIgnoredIds(query, ignored); 602 return query; 603 } 604 605 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 606 if (!ignored.isEmpty()) { 607 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 608 query.put(KEY_ID, notInIds); 609 } 610 } 611 612 @Override 613 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 614 DBObject query = new BasicDBObject(key, value); 615 addIgnoredIds(query, ignored); 616 return findAll(query, 0); 617 } 618 619 @Override 620 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 621 DBObject query = new BasicDBObject(key1, value1); 622 query.put(key2, value2); 623 addIgnoredIds(query, ignored); 624 return findAll(query, 0); 625 } 626 627 @Override 628 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 629 Map<String, Object[]> targetProxies) { 630 DBObject query = new BasicDBObject(key, value); 631 DBObject fields = new BasicDBObject(); 632 fields.put(MONGODB_ID, ZERO); 633 fields.put(KEY_ID, ONE); 634 fields.put(KEY_IS_PROXY, ONE); 635 fields.put(KEY_PROXY_TARGET_ID, ONE); 636 fields.put(KEY_PROXY_IDS, ONE); 637 if (log.isTraceEnabled()) { 638 logQuery(query, fields); 639 } 640 DBCursor cursor = coll.find(query, fields); 641 try { 642 for (DBObject ob : cursor) { 643 String id = (String) ob.get(KEY_ID); 644 ids.add(id); 645 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 646 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 647 proxyTargets.put(id, targetId); 648 } 649 if (targetProxies != null) { 650 Object[] proxyIds = (Object[]) ob.get(KEY_PROXY_IDS); 651 if (proxyIds != null) { 652 targetProxies.put(id, proxyIds); 653 } 654 } 655 } 656 } finally { 657 cursor.close(); 658 } 659 } 660 661 @Override 662 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 663 DBObject query = new BasicDBObject(key, value); 664 addIgnoredIds(query, ignored); 665 if (log.isTraceEnabled()) { 666 logQuery(query, justPresenceField()); 667 } 668 return coll.findOne(query, justPresenceField()) != null; 669 } 670 671 protected State findOne(DBObject query) { 672 if (log.isTraceEnabled()) { 673 logQuery(query, null); 674 } 675 return bsonToState(coll.findOne(query)); 676 } 677 678 protected List<State> findAll(DBObject query, int sizeHint) { 679 if (log.isTraceEnabled()) { 680 logQuery(query, null); 681 } 682 DBCursor cursor = coll.find(query); 683 Set<String> seen = new HashSet<>(); 684 try { 685 List<State> list = new ArrayList<>(sizeHint); 686 for (DBObject ob : cursor) { 687 if (!seen.add((String) ob.get(KEY_ID))) { 688 // MongoDB cursors may return the same 689 // object several times 690 continue; 691 } 692 list.add(bsonToState(ob)); 693 } 694 return list; 695 } finally { 696 cursor.close(); 697 } 698 } 699 700 protected DBObject justPresenceField() { 701 return new BasicDBObject(MONGODB_ID, ONE); 702 } 703 704 @Override 705 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 706 OrderByClause orderByClause, int limit, int offset, int countUpTo) { 707 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 708 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(evaluator.getExpression(), evaluator.getSelectClause(), 709 orderByClause, evaluator.pathResolver); 710 builder.walk(); 711 if (builder.hasFulltext && isFulltextDisabled()) { 712 throw new QueryParseException("Fulltext search disabled by configuration"); 713 } 714 DBObject query = builder.getQuery(); 715 addPrincipals(query, evaluator.principals); 716 DBObject orderBy = builder.getOrderBy(); 717 DBObject keys = builder.getProjection(); 718 boolean projectionHasWildcard = builder.hasProjectionWildcard(); 719 if (projectionHasWildcard) { 720 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 721 // so we need the full state from the database 722 keys = new BasicDBObject(); 723 evaluator.parse(); 724 } 725 726 if (log.isTraceEnabled()) { 727 logQuery(query, keys, orderBy, limit, offset); 728 } 729 730 List<Map<String, Serializable>> projections; 731 long totalSize; 732 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 733 try { 734 if (orderBy != null) { 735 cursor.sort(orderBy); 736 } 737 projections = new ArrayList<>(); 738 for (DBObject ob : cursor) { 739 State state = bsonToState(ob); 740 if (projectionHasWildcard) { 741 projections.addAll(evaluator.matches(state)); 742 } else { 743 projections.add(flatten(state)); 744 } 745 } 746 if (countUpTo == -1) { 747 // count full size 748 if (limit == 0) { 749 totalSize = projections.size(); 750 } else { 751 totalSize = cursor.count(); 752 } 753 } else if (countUpTo == 0) { 754 // no count 755 totalSize = -1; // not counted 756 } else { 757 // count only if less than countUpTo 758 if (limit == 0) { 759 totalSize = projections.size(); 760 } else { 761 totalSize = cursor.copy().limit(countUpTo + 1).count(); 762 } 763 if (totalSize > countUpTo) { 764 totalSize = -2; // truncated 765 } 766 } 767 } finally { 768 cursor.close(); 769 } 770 if (log.isTraceEnabled() && projections.size() != 0) { 771 log.trace("MongoDB: -> " + projections.size()); 772 } 773 return new PartialList<>(projections, totalSize); 774 } 775 776 protected void addPrincipals(DBObject query, Set<String> principals) { 777 if (principals != null) { 778 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 779 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 780 } 781 } 782 783 /** 784 * Flatten and convert from internal names to NXQL. 785 */ 786 protected Map<String, Serializable> flatten(State state) { 787 Map<String, Serializable> map = new HashMap<>(); 788 flatten(map, state, null); 789 return map; 790 } 791 792 protected void flatten(Map<String, Serializable> map, State state, String prefix) { 793 for (Entry<String, Serializable> en : state.entrySet()) { 794 String key = en.getKey(); 795 Serializable value = en.getValue(); 796 String name; 797 if (key.startsWith(KEY_PREFIX)) { 798 name = DBSSession.convToNXQL(key); 799 if (name == null) { 800 // present in state but not returned to caller 801 continue; 802 } 803 } else { 804 name = key; 805 } 806 name = prefix == null ? name : prefix + name; 807 if (value instanceof State) { 808 flatten(map, (State) value, name + '/'); 809 } else if (value instanceof List) { 810 String nameSlash = name + '/'; 811 int i = 0; 812 for (Object v : (List<?>) value) { 813 if (v instanceof State) { 814 flatten(map, (State) v, nameSlash + i + '/'); 815 } else { 816 map.put(nameSlash + i, (Serializable) v); 817 } 818 i++; 819 } 820 } else if (value instanceof Object[]) { 821 String nameSlash = name + '/'; 822 int i = 0; 823 for (Object v : (Object[]) value) { 824 map.put(nameSlash + i, (Serializable) v); 825 i++; 826 } 827 } else { 828 map.put(name, value); 829 } 830 } 831 } 832 833 /** Keys used for document projection when marking all binaries for GC. */ 834 protected DBObject binaryKeys; 835 836 @Override 837 protected void initBlobsPaths() { 838 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 839 finder.visit(); 840 binaryKeys = finder.binaryKeys; 841 } 842 843 protected static class MongoDBBlobFinder extends BlobFinder { 844 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 845 846 @Override 847 protected void recordBlobPath() { 848 path.addLast(KEY_BLOB_DATA); 849 binaryKeys.put(StringUtils.join(path, "."), ONE); 850 path.removeLast(); 851 } 852 } 853 854 @Override 855 public void markReferencedBinaries() { 856 BlobManager blobManager = Framework.getService(BlobManager.class); 857 // TODO add a query to not scan all documents 858 if (log.isTraceEnabled()) { 859 logQuery(new BasicDBObject(), binaryKeys); 860 } 861 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 862 try { 863 for (DBObject ob : cursor) { 864 markReferencedBinaries(ob, blobManager); 865 } 866 } finally { 867 cursor.close(); 868 } 869 } 870 871 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 872 for (String key : ob.keySet()) { 873 Object value = ob.get(key); 874 if (value instanceof List) { 875 @SuppressWarnings("unchecked") 876 List<Object> list = (List<Object>) value; 877 for (Object v : list) { 878 if (v instanceof DBObject) { 879 markReferencedBinaries((DBObject) v, blobManager); 880 } else { 881 markReferencedBinary(v, blobManager); 882 } 883 } 884 } else if (value instanceof Object[]) { 885 for (Object v : (Object[]) value) { 886 markReferencedBinary(v, blobManager); 887 } 888 } else if (value instanceof DBObject) { 889 markReferencedBinaries((DBObject) value, blobManager); 890 } else { 891 markReferencedBinary(value, blobManager); 892 } 893 } 894 } 895 896 protected void markReferencedBinary(Object value, BlobManager blobManager) { 897 if (!(value instanceof String)) { 898 return; 899 } 900 String key = (String) value; 901 blobManager.markReferencedBinary(key, repositoryName); 902 } 903 904 protected static final DBObject LOCK_FIELDS; 905 906 static { 907 LOCK_FIELDS = new BasicDBObject(); 908 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 909 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 910 } 911 912 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 913 914 @Override 915 public Lock getLock(String id) { 916 if (log.isTraceEnabled()) { 917 logQuery(id, LOCK_FIELDS); 918 } 919 DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 920 if (res == null) { 921 // document not found 922 throw new DocumentNotFoundException(id); 923 } 924 String owner = (String) res.get(KEY_LOCK_OWNER); 925 if (owner == null) { 926 // not locked 927 return null; 928 } 929 Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED)); 930 return new Lock(owner, created); 931 } 932 933 @Override 934 public Lock setLock(String id, Lock lock) { 935 DBObject query = new BasicDBObject(KEY_ID, id); 936 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 937 DBObject setLock = new BasicDBObject(); 938 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 939 setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated())); 940 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 941 if (log.isTraceEnabled()) { 942 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 943 } 944 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 945 if (res != null) { 946 // found a doc to lock 947 return null; 948 } else { 949 // doc not found, or lock owner already set 950 // get the old lock 951 if (log.isTraceEnabled()) { 952 logQuery(id, LOCK_FIELDS); 953 } 954 DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 955 if (old == null) { 956 // document not found 957 throw new DocumentNotFoundException(id); 958 } 959 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 960 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 961 if (oldOwner != null) { 962 return new Lock(oldOwner, oldCreated); 963 } 964 // no lock -- there was a race condition 965 // TODO do better 966 throw new ConcurrentUpdateException("Lock " + id); 967 } 968 } 969 970 @Override 971 public Lock removeLock(String id, String owner) { 972 DBObject query = new BasicDBObject(KEY_ID, id); 973 if (owner != null) { 974 // remove if owner matches or null 975 // implements LockManager.canLockBeRemoved inside MongoDB 976 Object ownerOrNull = Arrays.asList(owner, null); 977 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 978 } // else unconditional remove 979 // remove the lock 980 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 981 if (old != null) { 982 // found a doc and removed the lock, return previous lock 983 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 984 if (oldOwner == null) { 985 // was not locked 986 return null; 987 } else { 988 // return previous lock 989 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 990 return new Lock(oldOwner, oldCreated); 991 } 992 } else { 993 // doc not found, or lock owner didn't match 994 // get the old lock 995 if (log.isTraceEnabled()) { 996 logQuery(id, LOCK_FIELDS); 997 } 998 old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 999 if (old == null) { 1000 // document not found 1001 throw new DocumentNotFoundException(id); 1002 } 1003 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1004 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1005 if (oldOwner != null) { 1006 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 1007 // existing mismatched lock, flag failure 1008 return new Lock(oldOwner, oldCreated, true); 1009 } 1010 // old owner should have matched -- there was a race condition 1011 // TODO do better 1012 throw new ConcurrentUpdateException("Unlock " + id); 1013 } 1014 // old owner null, should have matched -- there was a race condition 1015 // TODO do better 1016 throw new ConcurrentUpdateException("Unlock " + id); 1017 } 1018 } 1019 1020 @Override 1021 public void closeLockManager() { 1022 1023 } 1024 1025 @Override 1026 public void clearLockManagerCaches() { 1027 } 1028 1029}