001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mongodb; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_STATUS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACE_USER; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACL; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ACP; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_BINARY; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_JOBID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_FULLTEXT_SIMPLE; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 033import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 034import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LIFECYCLE_STATE; 035import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 036import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 037import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 038import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 039import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PREFIX; 040import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PRIMARY_TYPE; 041import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 042import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 043import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_VERSION_SERIES_ID; 044import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_READ_ACL; 045import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_VERSION_SERIES_ID; 046 047import java.io.Serializable; 048import java.lang.reflect.Array; 049import java.net.UnknownHostException; 050import java.util.ArrayList; 051import java.util.Arrays; 052import java.util.Calendar; 053import java.util.Date; 054import java.util.HashMap; 055import java.util.HashSet; 056import java.util.List; 057import java.util.Map; 058import java.util.Map.Entry; 059import java.util.Set; 060import java.util.UUID; 061 062import javax.resource.spi.ConnectionManager; 063 064import org.apache.commons.lang.StringUtils; 065import org.apache.commons.logging.Log; 066import org.apache.commons.logging.LogFactory; 067import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 068import org.nuxeo.ecm.core.api.DocumentNotFoundException; 069import org.nuxeo.ecm.core.api.Lock; 070import org.nuxeo.ecm.core.api.NuxeoException; 071import org.nuxeo.ecm.core.api.PartialList; 072import org.nuxeo.ecm.core.api.model.Delta; 073import org.nuxeo.ecm.core.blob.BlobManager; 074import org.nuxeo.ecm.core.model.LockManager; 075import org.nuxeo.ecm.core.model.Repository; 076import org.nuxeo.ecm.core.query.QueryParseException; 077import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 078import org.nuxeo.ecm.core.storage.State; 079import org.nuxeo.ecm.core.storage.State.ListDiff; 080import org.nuxeo.ecm.core.storage.State.StateDiff; 081import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 082import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 083import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 084import org.nuxeo.ecm.core.storage.dbs.DBSSession; 085import org.nuxeo.runtime.api.Framework; 086 087import com.mongodb.BasicDBObject; 088import com.mongodb.DB; 089import com.mongodb.DBCollection; 090import com.mongodb.DBCursor; 091import com.mongodb.DBObject; 092import com.mongodb.MongoClient; 093import com.mongodb.MongoClientURI; 094import com.mongodb.QueryOperators; 095import com.mongodb.ServerAddress; 096import com.mongodb.WriteResult; 097 098/** 099 * MongoDB implementation of a {@link Repository}. 100 * 101 * @since 5.9.4 102 */ 103public class MongoDBRepository extends DBSRepositoryBase { 104 105 private static final Log log = LogFactory.getLog(MongoDBRepository.class); 106 107 private static final Long ZERO = Long.valueOf(0); 108 109 private static final Long ONE = Long.valueOf(1); 110 111 private static final Long MINUS_ONE = Long.valueOf(-11); 112 113 public static final String DB_DEFAULT = "nuxeo"; 114 115 public static final String MONGODB_ID = "_id"; 116 117 public static final String MONGODB_INC = "$inc"; 118 119 public static final String MONGODB_SET = "$set"; 120 121 public static final String MONGODB_UNSET = "$unset"; 122 123 public static final String MONGODB_PUSH = "$push"; 124 125 public static final String MONGODB_EACH = "$each"; 126 127 public static final String MONGODB_META = "$meta"; 128 129 public static final String MONGODB_TEXT_SCORE = "textScore"; 130 131 private static final String MONGODB_INDEX_TEXT = "text"; 132 133 private static final String MONGODB_INDEX_NAME = "name"; 134 135 private static final String MONGODB_LANGUAGE_OVERRIDE = "language_override"; 136 137 private static final String FULLTEXT_INDEX_NAME = "fulltext"; 138 139 private static final String LANGUAGE_FIELD = "__language"; 140 141 protected static final String COUNTER_NAME_UUID = "ecm:id"; 142 143 protected static final String COUNTER_FIELD = "seq"; 144 145 protected MongoClient mongoClient; 146 147 protected DBCollection coll; 148 149 protected DBCollection countersColl; 150 151 public MongoDBRepository(ConnectionManager cm, MongoDBRepositoryDescriptor descriptor) { 152 super(cm, descriptor.name, descriptor.getFulltextDescriptor()); 153 try { 154 mongoClient = newMongoClient(descriptor); 155 coll = getCollection(descriptor, mongoClient); 156 countersColl = getCountersCollection(descriptor, mongoClient); 157 } catch (UnknownHostException e) { 158 throw new RuntimeException(e); 159 } 160 initRepository(); 161 } 162 163 @Override 164 public void shutdown() { 165 super.shutdown(); 166 mongoClient.close(); 167 } 168 169 // used also by unit tests 170 public static MongoClient newMongoClient(MongoDBRepositoryDescriptor descriptor) throws UnknownHostException { 171 String server = descriptor.server; 172 if (StringUtils.isBlank(server)) { 173 throw new NuxeoException("Missing <server> in MongoDB repository descriptor"); 174 } 175 if (server.startsWith("mongodb://")) { 176 // allow mongodb:// URI syntax for the server, to pass everything in one string 177 return new MongoClient(new MongoClientURI(server)); 178 } else { 179 return new MongoClient(new ServerAddress(server)); 180 } 181 } 182 183 protected static DBCollection getCollection(MongoClient mongoClient, String dbname, String collection) { 184 if (StringUtils.isBlank(dbname)) { 185 dbname = DB_DEFAULT; 186 } 187 DB db = mongoClient.getDB(dbname); 188 return db.getCollection(collection); 189 } 190 191 // used also by unit tests 192 public static DBCollection getCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 193 return getCollection(mongoClient, descriptor.dbname, descriptor.name); 194 } 195 196 // used also by unit tests 197 public static DBCollection getCountersCollection(MongoDBRepositoryDescriptor descriptor, MongoClient mongoClient) { 198 return getCollection(mongoClient, descriptor.dbname, descriptor.name + ".counters"); 199 } 200 201 protected Object valueToBson(Object value) { 202 if (value instanceof State) { 203 return stateToBson((State) value); 204 } else if (value instanceof List) { 205 @SuppressWarnings("unchecked") 206 List<Object> values = (List<Object>) value; 207 return listToBson(values); 208 } else if (value instanceof Object[]) { 209 return listToBson(Arrays.asList((Object[]) value)); 210 } else { 211 return serializableToBson(value); 212 } 213 } 214 215 protected DBObject stateToBson(State state) { 216 DBObject ob = new BasicDBObject(); 217 for (Entry<String, Serializable> en : state.entrySet()) { 218 Object val = valueToBson(en.getValue()); 219 if (val != null) { 220 ob.put(en.getKey(), val); 221 } 222 } 223 return ob; 224 } 225 226 protected List<Object> listToBson(List<Object> values) { 227 ArrayList<Object> objects = new ArrayList<Object>(values.size()); 228 for (Object value : values) { 229 objects.add(valueToBson(value)); 230 } 231 return objects; 232 } 233 234 protected State bsonToState(DBObject ob) { 235 if (ob == null) { 236 return null; 237 } 238 State state = new State(ob.keySet().size()); 239 for (String key : ob.keySet()) { 240 if (MONGODB_ID.equals(key)) { 241 // skip ObjectId 242 continue; 243 } 244 state.put(key, bsonToValue(ob.get(key))); 245 } 246 return state; 247 } 248 249 protected Serializable bsonToValue(Object value) { 250 if (value instanceof List) { 251 @SuppressWarnings("unchecked") 252 List<Object> list = (List<Object>) value; 253 if (list.isEmpty()) { 254 return null; 255 } else { 256 if (list.get(0) instanceof DBObject) { 257 List<Serializable> l = new ArrayList<>(list.size()); 258 for (Object el : list) { 259 l.add(bsonToState((DBObject) el)); 260 } 261 return (Serializable) l; 262 } else { 263 // turn the list into a properly-typed array 264 Class<?> klass = Object.class; 265 for (Object o : list) { 266 if (o != null) { 267 klass = scalarToSerializableClass(o.getClass()); 268 break; 269 } 270 } 271 Object[] ar = (Object[]) Array.newInstance(klass, list.size()); 272 int i = 0; 273 for (Object el : list) { 274 ar[i++] = scalarToSerializable(el); 275 } 276 return ar; 277 } 278 } 279 } else if (value instanceof DBObject) { 280 return bsonToState((DBObject) value); 281 } else { 282 return scalarToSerializable(value); 283 } 284 } 285 286 public static class Updates { 287 public BasicDBObject set = new BasicDBObject(); 288 289 public BasicDBObject unset = new BasicDBObject(); 290 291 public BasicDBObject push = new BasicDBObject(); 292 293 public BasicDBObject inc = new BasicDBObject(); 294 } 295 296 /** 297 * Constructs a list of MongoDB updates from the given {@link StateDiff}. 298 * <p> 299 * We need a list because some cases need two operations to avoid conflicts. 300 */ 301 protected List<DBObject> diffToBson(StateDiff diff) { 302 Updates updates = new Updates(); 303 diffToUpdates(diff, null, updates); 304 UpdateListBuilder builder = new UpdateListBuilder(); 305 for (Entry<String, Object> en : updates.set.entrySet()) { 306 builder.update(MONGODB_SET, en.getKey(), en.getValue()); 307 } 308 for (Entry<String, Object> en : updates.unset.entrySet()) { 309 builder.update(MONGODB_UNSET, en.getKey(), en.getValue()); 310 } 311 for (Entry<String, Object> en : updates.push.entrySet()) { 312 builder.update(MONGODB_PUSH, en.getKey(), en.getValue()); 313 } 314 for (Entry<String, Object> en : updates.inc.entrySet()) { 315 builder.update(MONGODB_INC, en.getKey(), en.getValue()); 316 } 317 return builder.updateList; 318 } 319 320 /** 321 * Update list builder to prevent several updates of the same field. 322 * <p> 323 * This happens if two operations act on two fields where one is a prefix of the other. 324 * <p> 325 * Example: Cannot update 'mylist.0.string' and 'mylist' at the same time (error 16837) 326 * 327 * @since 5.9.5 328 */ 329 protected static class UpdateListBuilder { 330 331 protected List<DBObject> updateList = new ArrayList<>(1); 332 333 protected DBObject update; 334 335 protected List<String> keys; 336 337 protected UpdateListBuilder() { 338 newUpdate(); 339 } 340 341 protected void newUpdate() { 342 updateList.add(update = new BasicDBObject()); 343 keys = new ArrayList<>(); 344 } 345 346 protected void update(String op, String key, Object value) { 347 if (conflicts(key, keys)) { 348 newUpdate(); 349 } 350 keys.add(key); 351 DBObject map = (DBObject) update.get(op); 352 if (map == null) { 353 update.put(op, map = new BasicDBObject()); 354 } 355 map.put(key, value); 356 } 357 358 /** 359 * Checks if the key conflicts with one of the previous keys. 360 * <p> 361 * A conflict occurs if one key is equals to or is a prefix of the other. 362 */ 363 protected boolean conflicts(String key, List<String> previousKeys) { 364 String keydot = key + '.'; 365 for (String prev : previousKeys) { 366 if (prev.equals(key) || prev.startsWith(keydot) || key.startsWith(prev + '.')) { 367 return true; 368 } 369 } 370 return false; 371 } 372 } 373 374 protected void diffToUpdates(StateDiff diff, String prefix, Updates updates) { 375 String elemPrefix = prefix == null ? "" : prefix + '.'; 376 for (Entry<String, Serializable> en : diff.entrySet()) { 377 String name = elemPrefix + en.getKey(); 378 Serializable value = en.getValue(); 379 if (value instanceof StateDiff) { 380 diffToUpdates((StateDiff) value, name, updates); 381 } else if (value instanceof ListDiff) { 382 diffToUpdates((ListDiff) value, name, updates); 383 } else if (value instanceof Delta) { 384 diffToUpdates((Delta) value, name, updates); 385 } else { 386 // not a diff 387 updates.set.put(name, valueToBson(value)); 388 } 389 } 390 } 391 392 protected void diffToUpdates(ListDiff listDiff, String prefix, Updates updates) { 393 if (listDiff.diff != null) { 394 String elemPrefix = prefix == null ? "" : prefix + '.'; 395 int i = 0; 396 for (Object value : listDiff.diff) { 397 String name = elemPrefix + i; 398 if (value instanceof StateDiff) { 399 diffToUpdates((StateDiff) value, name, updates); 400 } else if (value != NOP) { 401 // set value 402 updates.set.put(name, valueToBson(value)); 403 } 404 i++; 405 } 406 } 407 if (listDiff.rpush != null) { 408 Object pushed; 409 if (listDiff.rpush.size() == 1) { 410 // no need to use $each for one element 411 pushed = valueToBson(listDiff.rpush.get(0)); 412 } else { 413 pushed = new BasicDBObject(MONGODB_EACH, listToBson(listDiff.rpush)); 414 } 415 updates.push.put(prefix, pushed); 416 } 417 } 418 419 protected void diffToUpdates(Delta delta, String prefix, Updates updates) { 420 Object inc = valueToBson(delta.getDeltaValue()); 421 updates.inc.put(prefix, inc); 422 } 423 424 protected Object serializableToBson(Object value) { 425 if (value instanceof Calendar) { 426 return ((Calendar) value).getTime(); 427 } 428 return value; 429 } 430 431 protected Serializable scalarToSerializable(Object val) { 432 if (val instanceof Date) { 433 Calendar cal = Calendar.getInstance(); 434 cal.setTime((Date) val); 435 return cal; 436 } 437 return (Serializable) val; 438 } 439 440 protected Class<?> scalarToSerializableClass(Class<?> klass) { 441 if (Date.class.isAssignableFrom(klass)) { 442 return Calendar.class; 443 } 444 return klass; 445 } 446 447 protected void initRepository() { 448 // create required indexes 449 // code does explicit queries on those 450 coll.createIndex(new BasicDBObject(KEY_ID, ONE)); 451 coll.createIndex(new BasicDBObject(KEY_PARENT_ID, ONE)); 452 coll.createIndex(new BasicDBObject(KEY_ANCESTOR_IDS, ONE)); 453 coll.createIndex(new BasicDBObject(KEY_VERSION_SERIES_ID, ONE)); 454 coll.createIndex(new BasicDBObject(KEY_PROXY_TARGET_ID, ONE)); 455 coll.createIndex(new BasicDBObject(KEY_PROXY_VERSION_SERIES_ID, ONE)); 456 coll.createIndex(new BasicDBObject(KEY_READ_ACL, ONE)); 457 DBObject parentChild = new BasicDBObject(); 458 parentChild.put(KEY_PARENT_ID, ONE); 459 parentChild.put(KEY_NAME, ONE); 460 coll.createIndex(parentChild); 461 // often used in user-generated queries 462 coll.createIndex(new BasicDBObject(KEY_PRIMARY_TYPE, ONE)); 463 coll.createIndex(new BasicDBObject(KEY_LIFECYCLE_STATE, ONE)); 464 coll.createIndex(new BasicDBObject(KEY_FULLTEXT_JOBID, ONE)); 465 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_USER, ONE)); 466 coll.createIndex(new BasicDBObject(KEY_ACP + "." + KEY_ACL + "." + KEY_ACE_STATUS, ONE)); 467 // TODO configure these from somewhere else 468 coll.createIndex(new BasicDBObject("dc:modified", MINUS_ONE)); 469 coll.createIndex(new BasicDBObject("rend:renditionName", ONE)); 470 coll.createIndex(new BasicDBObject("drv:subscriptions.enabled", ONE)); 471 coll.createIndex(new BasicDBObject("collectionMember:collectionIds", ONE)); 472 if (!isFulltextDisabled()) { 473 DBObject indexKeys = new BasicDBObject(); 474 indexKeys.put(KEY_FULLTEXT_SIMPLE, MONGODB_INDEX_TEXT); 475 indexKeys.put(KEY_FULLTEXT_BINARY, MONGODB_INDEX_TEXT); 476 DBObject indexOptions = new BasicDBObject(); 477 indexOptions.put(MONGODB_INDEX_NAME, FULLTEXT_INDEX_NAME); 478 indexOptions.put(MONGODB_LANGUAGE_OVERRIDE, LANGUAGE_FIELD); 479 coll.createIndex(indexKeys, indexOptions); 480 } 481 // check root presence 482 DBObject query = new BasicDBObject(KEY_ID, getRootId()); 483 if (coll.findOne(query, justPresenceField()) != null) { 484 return; 485 } 486 // create basic repository structure needed 487 if (DEBUG_UUIDS) { 488 // create the id counter 489 DBObject idCounter = new BasicDBObject(); 490 idCounter.put(MONGODB_ID, COUNTER_NAME_UUID); 491 idCounter.put(COUNTER_FIELD, ZERO); 492 countersColl.insert(idCounter); 493 } 494 initRoot(); 495 } 496 497 protected Long getNextUuidSeq() { 498 DBObject query = new BasicDBObject(MONGODB_ID, COUNTER_NAME_UUID); 499 DBObject update = new BasicDBObject(MONGODB_INC, new BasicDBObject(COUNTER_FIELD, ONE)); 500 boolean returnNew = true; 501 DBObject idCounter = countersColl.findAndModify(query, null, null, false, update, returnNew, false); 502 if (idCounter == null) { 503 throw new RuntimeException("Repository id counter not initialized"); 504 } 505 return (Long) idCounter.get(COUNTER_FIELD); 506 } 507 508 @Override 509 public String generateNewId() { 510 if (DEBUG_UUIDS) { 511 Long id = getNextUuidSeq(); 512 return "UUID_" + id; 513 } else { 514 return UUID.randomUUID().toString(); 515 } 516 } 517 518 @Override 519 public void createState(State state) { 520 DBObject ob = stateToBson(state); 521 if (log.isTraceEnabled()) { 522 log.trace("MongoDB: CREATE " + ob.get(KEY_ID) + ": " + ob); 523 } 524 coll.insert(ob); 525 // TODO dupe exception 526 // throw new DocumentException("Already exists: " + id); 527 } 528 529 @Override 530 public State readState(String id) { 531 DBObject query = new BasicDBObject(KEY_ID, id); 532 return findOne(query); 533 } 534 535 @Override 536 public List<State> readStates(List<String> ids) { 537 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 538 return findAll(query, ids.size()); 539 } 540 541 @Override 542 public void updateState(String id, StateDiff diff) { 543 DBObject query = new BasicDBObject(KEY_ID, id); 544 for (DBObject update : diffToBson(diff)) { 545 if (log.isTraceEnabled()) { 546 log.trace("MongoDB: UPDATE " + id + ": " + update); 547 } 548 coll.update(query, update); 549 // TODO dupe exception 550 // throw new DocumentException("Missing: " + id); 551 } 552 } 553 554 @Override 555 public void deleteStates(Set<String> ids) { 556 DBObject query = new BasicDBObject(KEY_ID, new BasicDBObject(QueryOperators.IN, ids)); 557 if (log.isTraceEnabled()) { 558 log.trace("MongoDB: REMOVE " + ids); 559 } 560 WriteResult w = coll.remove(query); 561 if (w.getN() != ids.size()) { 562 log.error("Removed " + w.getN() + " docs for " + ids.size() + " ids: " + ids); 563 } 564 } 565 566 @Override 567 public State readChildState(String parentId, String name, Set<String> ignored) { 568 DBObject query = getChildQuery(parentId, name, ignored); 569 return findOne(query); 570 } 571 572 protected void logQuery(String id, DBObject fields) { 573 logQuery(new BasicDBObject(KEY_ID, id), fields); 574 } 575 576 protected void logQuery(DBObject query, DBObject fields) { 577 if (fields == null) { 578 log.trace("MongoDB: QUERY " + query); 579 } else { 580 log.trace("MongoDB: QUERY " + query + " KEYS " + fields); 581 } 582 } 583 584 protected void logQuery(DBObject query, DBObject fields, DBObject orderBy, int limit, int offset) { 585 log.trace("MongoDB: QUERY " + query + " KEYS " + fields + (orderBy == null ? "" : " ORDER BY " + orderBy) 586 + " OFFSET " + offset + " LIMIT " + limit); 587 } 588 589 @Override 590 public boolean hasChild(String parentId, String name, Set<String> ignored) { 591 DBObject query = getChildQuery(parentId, name, ignored); 592 if (log.isTraceEnabled()) { 593 logQuery(query, justPresenceField()); 594 } 595 return coll.findOne(query, justPresenceField()) != null; 596 } 597 598 protected DBObject getChildQuery(String parentId, String name, Set<String> ignored) { 599 DBObject query = new BasicDBObject(); 600 query.put(KEY_PARENT_ID, parentId); 601 query.put(KEY_NAME, name); 602 addIgnoredIds(query, ignored); 603 return query; 604 } 605 606 protected void addIgnoredIds(DBObject query, Set<String> ignored) { 607 if (!ignored.isEmpty()) { 608 DBObject notInIds = new BasicDBObject(QueryOperators.NIN, new ArrayList<String>(ignored)); 609 query.put(KEY_ID, notInIds); 610 } 611 } 612 613 @Override 614 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 615 DBObject query = new BasicDBObject(key, value); 616 addIgnoredIds(query, ignored); 617 return findAll(query, 0); 618 } 619 620 @Override 621 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 622 DBObject query = new BasicDBObject(key1, value1); 623 query.put(key2, value2); 624 addIgnoredIds(query, ignored); 625 return findAll(query, 0); 626 } 627 628 @Override 629 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 630 Map<String, Object[]> targetProxies) { 631 DBObject query = new BasicDBObject(key, value); 632 DBObject fields = new BasicDBObject(); 633 fields.put(MONGODB_ID, ZERO); 634 fields.put(KEY_ID, ONE); 635 fields.put(KEY_IS_PROXY, ONE); 636 fields.put(KEY_PROXY_TARGET_ID, ONE); 637 fields.put(KEY_PROXY_IDS, ONE); 638 if (log.isTraceEnabled()) { 639 logQuery(query, fields); 640 } 641 DBCursor cursor = coll.find(query, fields); 642 try { 643 for (DBObject ob : cursor) { 644 String id = (String) ob.get(KEY_ID); 645 ids.add(id); 646 if (proxyTargets != null && TRUE.equals(ob.get(KEY_IS_PROXY))) { 647 String targetId = (String) ob.get(KEY_PROXY_TARGET_ID); 648 proxyTargets.put(id, targetId); 649 } 650 if (targetProxies != null) { 651 Object[] proxyIds = (Object[]) bsonToValue(ob.get(KEY_PROXY_IDS)); 652 if (proxyIds != null) { 653 targetProxies.put(id, proxyIds); 654 } 655 } 656 } 657 } finally { 658 cursor.close(); 659 } 660 } 661 662 @Override 663 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 664 DBObject query = new BasicDBObject(key, value); 665 addIgnoredIds(query, ignored); 666 if (log.isTraceEnabled()) { 667 logQuery(query, justPresenceField()); 668 } 669 return coll.findOne(query, justPresenceField()) != null; 670 } 671 672 protected State findOne(DBObject query) { 673 if (log.isTraceEnabled()) { 674 logQuery(query, null); 675 } 676 return bsonToState(coll.findOne(query)); 677 } 678 679 protected List<State> findAll(DBObject query, int sizeHint) { 680 if (log.isTraceEnabled()) { 681 logQuery(query, null); 682 } 683 DBCursor cursor = coll.find(query); 684 Set<String> seen = new HashSet<>(); 685 try { 686 List<State> list = new ArrayList<>(sizeHint); 687 for (DBObject ob : cursor) { 688 if (!seen.add((String) ob.get(KEY_ID))) { 689 // MongoDB cursors may return the same 690 // object several times 691 continue; 692 } 693 list.add(bsonToState(ob)); 694 } 695 return list; 696 } finally { 697 cursor.close(); 698 } 699 } 700 701 protected DBObject justPresenceField() { 702 return new BasicDBObject(MONGODB_ID, ONE); 703 } 704 705 @Override 706 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 707 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 708 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 709 MongoDBQueryBuilder builder = new MongoDBQueryBuilder(evaluator.getExpression(), evaluator.getSelectClause(), 710 orderByClause, evaluator.pathResolver, evaluator.fulltextSearchDisabled); 711 builder.walk(); 712 if (builder.hasFulltext && isFulltextDisabled()) { 713 throw new QueryParseException("Fulltext search disabled by configuration"); 714 } 715 DBObject query = builder.getQuery(); 716 addPrincipals(query, evaluator.principals); 717 DBObject orderBy = builder.getOrderBy(); 718 DBObject keys = builder.getProjection(); 719 // Don't do manual projection if there are no projection wildcards, as this brings no new 720 // information and is costly. The only difference is several identical rows instead of one. 721 boolean manualProjection = !distinctDocuments && builder.hasProjectionWildcard(); 722 if (manualProjection) { 723 // we'll do post-treatment to re-evaluate the query to get proper wildcard projections 724 // so we need the full state from the database 725 keys = new BasicDBObject(); 726 evaluator.parse(); 727 } 728 729 if (log.isTraceEnabled()) { 730 logQuery(query, keys, orderBy, limit, offset); 731 } 732 733 List<Map<String, Serializable>> projections; 734 long totalSize; 735 DBCursor cursor = coll.find(query, keys).skip(offset).limit(limit); 736 try { 737 if (orderBy != null) { 738 cursor.sort(orderBy); 739 } 740 projections = new ArrayList<>(); 741 for (DBObject ob : cursor) { 742 State state = bsonToState(ob); 743 if (manualProjection) { 744 projections.addAll(evaluator.matches(state)); 745 } else { 746 projections.add(flatten(state)); 747 } 748 } 749 if (countUpTo == -1) { 750 // count full size 751 if (limit == 0) { 752 totalSize = projections.size(); 753 } else { 754 totalSize = cursor.count(); 755 } 756 } else if (countUpTo == 0) { 757 // no count 758 totalSize = -1; // not counted 759 } else { 760 // count only if less than countUpTo 761 if (limit == 0) { 762 totalSize = projections.size(); 763 } else { 764 totalSize = cursor.copy().limit(countUpTo + 1).count(); 765 } 766 if (totalSize > countUpTo) { 767 totalSize = -2; // truncated 768 } 769 } 770 } finally { 771 cursor.close(); 772 } 773 if (log.isTraceEnabled() && projections.size() != 0) { 774 log.trace("MongoDB: -> " + projections.size()); 775 } 776 return new PartialList<>(projections, totalSize); 777 } 778 779 protected void addPrincipals(DBObject query, Set<String> principals) { 780 if (principals != null) { 781 DBObject inPrincipals = new BasicDBObject(QueryOperators.IN, new ArrayList<String>(principals)); 782 query.put(DBSDocument.KEY_READ_ACL, inPrincipals); 783 } 784 } 785 786 /** 787 * Flatten and convert from internal names to NXQL. 788 */ 789 protected Map<String, Serializable> flatten(State state) { 790 Map<String, Serializable> map = new HashMap<>(); 791 flatten(map, state, null); 792 return map; 793 } 794 795 protected void flatten(Map<String, Serializable> map, State state, String prefix) { 796 for (Entry<String, Serializable> en : state.entrySet()) { 797 String key = en.getKey(); 798 Serializable value = en.getValue(); 799 String name; 800 if (key.startsWith(KEY_PREFIX)) { 801 name = DBSSession.convToNXQL(key); 802 if (name == null) { 803 // present in state but not returned to caller 804 continue; 805 } 806 } else { 807 name = key; 808 } 809 name = prefix == null ? name : prefix + name; 810 if (value instanceof State) { 811 flatten(map, (State) value, name + '/'); 812 } else if (value instanceof List) { 813 String nameSlash = name + '/'; 814 int i = 0; 815 for (Object v : (List<?>) value) { 816 if (v instanceof State) { 817 flatten(map, (State) v, nameSlash + i + '/'); 818 } else { 819 map.put(nameSlash + i, (Serializable) v); 820 } 821 i++; 822 } 823 } else if (value instanceof Object[]) { 824 String nameSlash = name + '/'; 825 int i = 0; 826 for (Object v : (Object[]) value) { 827 map.put(nameSlash + i, (Serializable) v); 828 i++; 829 } 830 } else { 831 map.put(name, value); 832 } 833 } 834 } 835 836 /** Keys used for document projection when marking all binaries for GC. */ 837 protected DBObject binaryKeys; 838 839 @Override 840 protected void initBlobsPaths() { 841 MongoDBBlobFinder finder = new MongoDBBlobFinder(); 842 finder.visit(); 843 binaryKeys = finder.binaryKeys; 844 } 845 846 protected static class MongoDBBlobFinder extends BlobFinder { 847 protected DBObject binaryKeys = new BasicDBObject(MONGODB_ID, ZERO); 848 849 @Override 850 protected void recordBlobPath() { 851 path.addLast(KEY_BLOB_DATA); 852 binaryKeys.put(StringUtils.join(path, "."), ONE); 853 path.removeLast(); 854 } 855 } 856 857 @Override 858 public void markReferencedBinaries() { 859 BlobManager blobManager = Framework.getService(BlobManager.class); 860 // TODO add a query to not scan all documents 861 if (log.isTraceEnabled()) { 862 logQuery(new BasicDBObject(), binaryKeys); 863 } 864 DBCursor cursor = coll.find(new BasicDBObject(), binaryKeys); 865 try { 866 for (DBObject ob : cursor) { 867 markReferencedBinaries(ob, blobManager); 868 } 869 } finally { 870 cursor.close(); 871 } 872 } 873 874 protected void markReferencedBinaries(DBObject ob, BlobManager blobManager) { 875 for (String key : ob.keySet()) { 876 Object value = ob.get(key); 877 if (value instanceof List) { 878 @SuppressWarnings("unchecked") 879 List<Object> list = (List<Object>) value; 880 for (Object v : list) { 881 if (v instanceof DBObject) { 882 markReferencedBinaries((DBObject) v, blobManager); 883 } else { 884 markReferencedBinary(v, blobManager); 885 } 886 } 887 } else if (value instanceof Object[]) { 888 for (Object v : (Object[]) value) { 889 markReferencedBinary(v, blobManager); 890 } 891 } else if (value instanceof DBObject) { 892 markReferencedBinaries((DBObject) value, blobManager); 893 } else { 894 markReferencedBinary(value, blobManager); 895 } 896 } 897 } 898 899 protected void markReferencedBinary(Object value, BlobManager blobManager) { 900 if (!(value instanceof String)) { 901 return; 902 } 903 String key = (String) value; 904 blobManager.markReferencedBinary(key, repositoryName); 905 } 906 907 protected static final DBObject LOCK_FIELDS; 908 909 static { 910 LOCK_FIELDS = new BasicDBObject(); 911 LOCK_FIELDS.put(KEY_LOCK_OWNER, ONE); 912 LOCK_FIELDS.put(KEY_LOCK_CREATED, ONE); 913 } 914 915 protected static final DBObject UNSET_LOCK_UPDATE = new BasicDBObject(MONGODB_UNSET, LOCK_FIELDS); 916 917 @Override 918 public Lock getLock(String id) { 919 if (log.isTraceEnabled()) { 920 logQuery(id, LOCK_FIELDS); 921 } 922 DBObject res = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 923 if (res == null) { 924 // document not found 925 throw new DocumentNotFoundException(id); 926 } 927 String owner = (String) res.get(KEY_LOCK_OWNER); 928 if (owner == null) { 929 // not locked 930 return null; 931 } 932 Calendar created = (Calendar) scalarToSerializable(res.get(KEY_LOCK_CREATED)); 933 return new Lock(owner, created); 934 } 935 936 @Override 937 public Lock setLock(String id, Lock lock) { 938 DBObject query = new BasicDBObject(KEY_ID, id); 939 query.put(KEY_LOCK_OWNER, null); // select doc if no lock is set 940 DBObject setLock = new BasicDBObject(); 941 setLock.put(KEY_LOCK_OWNER, lock.getOwner()); 942 setLock.put(KEY_LOCK_CREATED, serializableToBson(lock.getCreated())); 943 DBObject setLockUpdate = new BasicDBObject(MONGODB_SET, setLock); 944 if (log.isTraceEnabled()) { 945 log.trace("MongoDB: FINDANDMODIFY " + query + " UPDATE " + setLockUpdate); 946 } 947 DBObject res = coll.findAndModify(query, null, null, false, setLockUpdate, false, false); 948 if (res != null) { 949 // found a doc to lock 950 return null; 951 } else { 952 // doc not found, or lock owner already set 953 // get the old lock 954 if (log.isTraceEnabled()) { 955 logQuery(id, LOCK_FIELDS); 956 } 957 DBObject old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 958 if (old == null) { 959 // document not found 960 throw new DocumentNotFoundException(id); 961 } 962 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 963 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 964 if (oldOwner != null) { 965 return new Lock(oldOwner, oldCreated); 966 } 967 // no lock -- there was a race condition 968 // TODO do better 969 throw new ConcurrentUpdateException("Lock " + id); 970 } 971 } 972 973 @Override 974 public Lock removeLock(String id, String owner) { 975 DBObject query = new BasicDBObject(KEY_ID, id); 976 if (owner != null) { 977 // remove if owner matches or null 978 // implements LockManager.canLockBeRemoved inside MongoDB 979 Object ownerOrNull = Arrays.asList(owner, null); 980 query.put(KEY_LOCK_OWNER, new BasicDBObject(QueryOperators.IN, ownerOrNull)); 981 } // else unconditional remove 982 // remove the lock 983 DBObject old = coll.findAndModify(query, null, null, false, UNSET_LOCK_UPDATE, false, false); 984 if (old != null) { 985 // found a doc and removed the lock, return previous lock 986 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 987 if (oldOwner == null) { 988 // was not locked 989 return null; 990 } else { 991 // return previous lock 992 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 993 return new Lock(oldOwner, oldCreated); 994 } 995 } else { 996 // doc not found, or lock owner didn't match 997 // get the old lock 998 if (log.isTraceEnabled()) { 999 logQuery(id, LOCK_FIELDS); 1000 } 1001 old = coll.findOne(new BasicDBObject(KEY_ID, id), LOCK_FIELDS); 1002 if (old == null) { 1003 // document not found 1004 throw new DocumentNotFoundException(id); 1005 } 1006 String oldOwner = (String) old.get(KEY_LOCK_OWNER); 1007 Calendar oldCreated = (Calendar) scalarToSerializable(old.get(KEY_LOCK_CREATED)); 1008 if (oldOwner != null) { 1009 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 1010 // existing mismatched lock, flag failure 1011 return new Lock(oldOwner, oldCreated, true); 1012 } 1013 // old owner should have matched -- there was a race condition 1014 // TODO do better 1015 throw new ConcurrentUpdateException("Unlock " + id); 1016 } 1017 // old owner null, should have matched -- there was a race condition 1018 // TODO do better 1019 throw new ConcurrentUpdateException("Unlock " + id); 1020 } 1021 } 1022 1023 @Override 1024 public void closeLockManager() { 1025 1026 } 1027 1028 @Override 1029 public void clearLockManagerCaches() { 1030 } 1031 1032}