001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030 031import java.io.Serializable; 032import java.lang.reflect.Array; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Calendar; 036import java.util.Collection; 037import java.util.Collections; 038import java.util.List; 039import java.util.Map; 040import java.util.Map.Entry; 041import java.util.Objects; 042import java.util.Set; 043import java.util.UUID; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.stream.Stream; 048 049import javax.resource.spi.ConnectionManager; 050 051import org.apache.commons.logging.Log; 052import org.apache.commons.logging.LogFactory; 053import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 054import org.nuxeo.ecm.core.api.DocumentNotFoundException; 055import org.nuxeo.ecm.core.api.Lock; 056import org.nuxeo.ecm.core.api.NuxeoException; 057import org.nuxeo.ecm.core.api.PartialList; 058import org.nuxeo.ecm.core.api.ScrollResult; 059import org.nuxeo.ecm.core.api.ScrollResultImpl; 060import org.nuxeo.ecm.core.api.model.Delta; 061import org.nuxeo.ecm.core.blob.DocumentBlobManager; 062import org.nuxeo.ecm.core.model.LockManager; 063import org.nuxeo.ecm.core.model.Repository; 064import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 065import org.nuxeo.ecm.core.storage.State; 066import org.nuxeo.ecm.core.storage.State.ListDiff; 067import org.nuxeo.ecm.core.storage.State.StateDiff; 068import org.nuxeo.ecm.core.storage.StateHelper; 069import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 070import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 071import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 072import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 073import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 074import org.nuxeo.runtime.api.Framework; 075 076/** 077 * In-memory implementation of a {@link Repository}. 078 * <p> 079 * Internally, the repository is a map from id to document object. 080 * <p> 081 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 082 * for the description of the document. 083 * 084 * @since 5.9.4 085 */ 086public class MemRepository extends DBSRepositoryBase { 087 088 private static final Log log = LogFactory.getLog(MemRepository.class); 089 090 protected static final String NOSCROLL_ID = "noscroll"; 091 092 // for debug 093 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 094 095 /** 096 * The content of the repository, a map of document id -> object. 097 */ 098 protected Map<String, State> states; 099 100 public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) { 101 super(cm, descriptor.name, descriptor); 102 initRepository(); 103 } 104 105 @Override 106 public List<IdType> getAllowedIdTypes() { 107 return Collections.singletonList(IdType.varchar); 108 } 109 110 @Override 111 public void shutdown() { 112 super.shutdown(); 113 states = null; 114 } 115 116 protected void initRepository() { 117 states = new ConcurrentHashMap<>(); 118 initRoot(); 119 } 120 121 @Override 122 public String generateNewId() { 123 if (DEBUG_UUIDS) { 124 return "UUID_" + temporaryIdCounter.incrementAndGet(); 125 } else { 126 return UUID.randomUUID().toString(); 127 } 128 } 129 130 @Override 131 public State readState(String id) { 132 return readPartialState(id, null); 133 } 134 135 @Override 136 public State readPartialState(String id, Collection<String> keys) { 137 if (id == null) { 138 return null; 139 } 140 State state = states.get(id); 141 if (state != null) { 142 if (keys != null && !keys.isEmpty()) { 143 State partialState = new State(); 144 for (String key : keys) { 145 Serializable value = state.get(key); 146 if (value != null) { 147 partialState.put(key, value); 148 } 149 } 150 state = partialState; 151 } 152 if (log.isTraceEnabled()) { 153 log.trace("Mem: READ " + id + ": " + state); 154 } 155 } 156 return state; 157 } 158 159 @Override 160 public List<State> readStates(List<String> ids) { 161 List<State> list = new ArrayList<>(); 162 for (String id : ids) { 163 list.add(readState(id)); 164 } 165 return list; 166 } 167 168 @Override 169 public void createState(State state) { 170 String id = (String) state.get(KEY_ID); 171 if (log.isTraceEnabled()) { 172 log.trace("Mem: CREATE " + id + ": " + state); 173 } 174 if (states.containsKey(id)) { 175 throw new NuxeoException("Already exists: " + id); 176 } 177 state = StateHelper.deepCopy(state, true); // thread-safe 178 StateHelper.resetDeltas(state); 179 states.put(id, state); 180 } 181 182 @Override 183 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 184 if (log.isTraceEnabled()) { 185 log.trace("Mem: UPDATE " + id + ": " + diff); 186 } 187 State state = states.get(id); 188 if (state == null) { 189 throw new ConcurrentUpdateException("Missing: " + id); 190 } 191 synchronized (state) { 192 // synchronization needed for atomic change token 193 if (changeTokenUpdater != null) { 194 for (Entry<String, Serializable> en : changeTokenUpdater.getConditions().entrySet()) { 195 if (!Objects.equals(state.get(en.getKey()), en.getValue())) { 196 throw new ConcurrentUpdateException((String) state.get(KEY_ID)); 197 } 198 } 199 for (Entry<String, Serializable> en : changeTokenUpdater.getUpdates().entrySet()) { 200 applyDiff(state, en.getKey(), en.getValue()); 201 } 202 } 203 applyDiff(state, diff); 204 } 205 } 206 207 @Override 208 public void deleteStates(Set<String> ids) { 209 if (log.isTraceEnabled()) { 210 log.trace("Mem: REMOVE " + ids); 211 } 212 for (String id : ids) { 213 if (states.remove(id) == null) { 214 log.debug("Missing on remove: " + id); 215 } 216 } 217 } 218 219 @Override 220 public State readChildState(String parentId, String name, Set<String> ignored) { 221 // TODO optimize by maintaining a parent/child index 222 for (State state : states.values()) { 223 if (ignored.contains(state.get(KEY_ID))) { 224 continue; 225 } 226 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 227 continue; 228 } 229 if (!name.equals(state.get(KEY_NAME))) { 230 continue; 231 } 232 return state; 233 } 234 return null; 235 } 236 237 @Override 238 public boolean hasChild(String parentId, String name, Set<String> ignored) { 239 return readChildState(parentId, name, ignored) != null; 240 } 241 242 @Override 243 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 244 if (log.isTraceEnabled()) { 245 log.trace("Mem: QUERY " + key + " = " + value); 246 } 247 List<State> list = new ArrayList<>(); 248 for (State state : states.values()) { 249 String id = (String) state.get(KEY_ID); 250 if (ignored.contains(id)) { 251 continue; 252 } 253 if (!value.equals(state.get(key))) { 254 continue; 255 } 256 list.add(state); 257 } 258 if (log.isTraceEnabled() && !list.isEmpty()) { 259 log.trace("Mem: -> " + list.size()); 260 } 261 return list; 262 } 263 264 @Override 265 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 266 if (log.isTraceEnabled()) { 267 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 268 } 269 List<State> list = new ArrayList<>(); 270 for (State state : states.values()) { 271 String id = (String) state.get(KEY_ID); 272 if (ignored.contains(id)) { 273 continue; 274 } 275 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 276 continue; 277 } 278 list.add(state); 279 } 280 if (log.isTraceEnabled() && !list.isEmpty()) { 281 log.trace("Mem: -> " + list.size()); 282 } 283 return list; 284 } 285 286 @Override 287 public Stream<State> getDescendants(String rootId, Set<String> keys) { 288 return getDescendants(rootId, keys, 0); 289 } 290 291 @Override 292 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 293 if (log.isTraceEnabled()) { 294 log.trace("Mem: QUERY " + KEY_ANCESTOR_IDS + " = " + rootId); 295 } 296 Stream<State> stream = states.values() // 297 .stream() 298 .filter(state -> hasAncestor(state, rootId)); 299 if (limit != 0) { 300 stream = stream.limit(limit); 301 } 302 return stream; 303 } 304 305 protected static boolean hasAncestor(State state, String id) { 306 Object[] array = (Object[]) state.get(KEY_ANCESTOR_IDS); 307 return array == null ? false : Arrays.asList(array).contains(id); 308 } 309 310 @Override 311 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 312 if (log.isTraceEnabled()) { 313 log.trace("Mem: QUERY " + key + " = " + value); 314 } 315 for (State state : states.values()) { 316 String id = (String) state.get(KEY_ID); 317 if (ignored.contains(id)) { 318 continue; 319 } 320 if (value.equals(state.get(key))) { 321 if (log.isTraceEnabled()) { 322 log.trace("Mem: -> present"); 323 } 324 return true; 325 } 326 } 327 if (log.isTraceEnabled()) { 328 log.trace("Mem: -> absent"); 329 } 330 return false; 331 } 332 333 @Override 334 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 335 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 336 if (log.isTraceEnabled()) { 337 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 338 } 339 evaluator.parse(); 340 List<Map<String, Serializable>> projections = new ArrayList<>(); 341 for (State state : states.values()) { 342 List<Map<String, Serializable>> matches = evaluator.matches(state); 343 if (!matches.isEmpty()) { 344 if (distinctDocuments) { 345 projections.add(matches.get(0)); 346 } else { 347 projections.addAll(matches); 348 } 349 } 350 } 351 // ORDER BY 352 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 353 if (orderByClause != null) { 354 Collections.sort(projections, new OrderByComparator(orderByClause)); 355 } 356 // LIMIT / OFFSET 357 int totalSize = projections.size(); 358 if (countUpTo == -1) { 359 // count full size 360 } else if (countUpTo == 0) { 361 // no count 362 totalSize = -1; // not counted 363 } else { 364 // count only if less than countUpTo 365 if (totalSize > countUpTo) { 366 totalSize = -2; // truncated 367 } 368 } 369 if (limit != 0) { 370 int size = projections.size(); 371 projections.subList(0, offset > size ? size : offset).clear(); 372 size = projections.size(); 373 if (limit < size) { 374 projections.subList(limit, size).clear(); 375 } 376 } 377 // TODO DISTINCT 378 379 if (log.isTraceEnabled() && !projections.isEmpty()) { 380 log.trace("Mem: -> " + projections.size()); 381 } 382 return new PartialList<>(projections, totalSize); 383 } 384 385 @Override 386 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 387 if (log.isTraceEnabled()) { 388 log.trace("Mem: QUERY " + evaluator); 389 } 390 evaluator.parse(); 391 List<String> ids = new ArrayList<>(); 392 for (State state : states.values()) { 393 List<Map<String, Serializable>> matches = evaluator.matches(state); 394 if (!matches.isEmpty()) { 395 String id = matches.get(0).get(ECM_UUID).toString(); 396 ids.add(id); 397 } 398 } 399 return new ScrollResultImpl<>(NOSCROLL_ID, ids); 400 } 401 402 @Override 403 public ScrollResult<String> scroll(String scrollId) { 404 if (NOSCROLL_ID.equals(scrollId)) { 405 // Id are already in memory, they are returned as a single batch 406 return ScrollResultImpl.emptyResult(); 407 } 408 throw new NuxeoException("Unknown or timed out scrollId"); 409 } 410 411 /** 412 * Applies a {@link StateDiff} in-place onto a base {@link State}. 413 * <p> 414 * Uses thread-safe datastructures. 415 */ 416 public static void applyDiff(State state, StateDiff stateDiff) { 417 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 418 applyDiff(state, en.getKey(), en.getValue()); 419 } 420 } 421 422 /** 423 * Applies a key/value diff in-place onto a base {@link State}. 424 * <p> 425 * Uses thread-safe datastructures. 426 */ 427 protected static void applyDiff(State state, String key, Serializable value) { 428 if (value instanceof StateDiff) { 429 Serializable old = state.get(key); 430 if (old == null) { 431 old = new State(true); // thread-safe 432 state.put(key, old); 433 // enter the next if 434 } 435 if (!(old instanceof State)) { 436 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 437 } 438 applyDiff((State) old, (StateDiff) value); 439 } else if (value instanceof ListDiff) { 440 state.put(key, applyDiff(state.get(key), (ListDiff) value)); 441 } else if (value instanceof Delta) { 442 Delta delta = (Delta) value; 443 Number oldValue = (Number) state.get(key); 444 Number newValue; 445 if (oldValue == null) { 446 newValue = delta.getFullValue(); 447 } else { 448 newValue = delta.add(oldValue); 449 } 450 state.put(key, newValue); 451 } else { 452 state.put(key, StateHelper.deepCopy(value, true)); // thread-safe 453 } 454 } 455 456 /** 457 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 458 * <p> 459 * Uses thread-safe datastructures. 460 */ 461 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 462 // internally work on a list 463 // TODO this is costly, use a separate code path for arrays 464 Class<?> arrayComponentType = null; 465 if (listDiff.isArray && value != null) { 466 if (!(value instanceof Object[])) { 467 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 468 } 469 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 470 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 471 } 472 if (value == null) { 473 value = new CopyOnWriteArrayList<>(); 474 } 475 if (!(value instanceof List)) { 476 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 477 } 478 @SuppressWarnings("unchecked") 479 List<Serializable> list = (List<Serializable>) value; 480 if (listDiff.diff != null) { 481 int i = 0; 482 for (Object diffElem : listDiff.diff) { 483 if (i >= list.size()) { 484 // TODO log error applying diff to shorter list 485 break; 486 } 487 if (diffElem instanceof StateDiff) { 488 applyDiff((State) list.get(i), (StateDiff) diffElem); 489 } else if (diffElem != NOP) { 490 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 491 } 492 i++; 493 } 494 } 495 if (listDiff.rpush != null) { 496 // deepCopy of what we'll add 497 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 498 for (Object v : listDiff.rpush) { 499 add.add(StateHelper.deepCopy(v, true)); // thread-safe 500 } 501 // update CopyOnWriteArrayList in one step 502 list.addAll(add); 503 } 504 // convert back to array if needed 505 if (listDiff.isArray) { 506 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 507 } else { 508 return list.isEmpty() ? null : (Serializable) list; 509 } 510 } 511 512 /* synchronized */ 513 @Override 514 public synchronized Lock getLock(String id) { 515 State state = states.get(id); 516 if (state == null) { 517 // document not found 518 throw new DocumentNotFoundException(id); 519 } 520 String owner = (String) state.get(KEY_LOCK_OWNER); 521 if (owner == null) { 522 return null; 523 } 524 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 525 return new Lock(owner, created); 526 } 527 528 /* synchronized */ 529 @Override 530 public synchronized Lock setLock(String id, Lock lock) { 531 State state = states.get(id); 532 if (state == null) { 533 // document not found 534 throw new DocumentNotFoundException(id); 535 } 536 String owner = (String) state.get(KEY_LOCK_OWNER); 537 if (owner != null) { 538 // return old lock 539 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 540 return new Lock(owner, created); 541 } 542 state.put(KEY_LOCK_OWNER, lock.getOwner()); 543 state.put(KEY_LOCK_CREATED, lock.getCreated()); 544 return null; 545 } 546 547 /* synchronized */ 548 @Override 549 public synchronized Lock removeLock(String id, String owner) { 550 State state = states.get(id); 551 if (state == null) { 552 // document not found 553 throw new DocumentNotFoundException(id); 554 } 555 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 556 if (oldOwner == null) { 557 // no previous lock 558 return null; 559 } 560 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 561 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 562 // existing mismatched lock, flag failure 563 return new Lock(oldOwner, oldCreated, true); 564 } 565 // remove lock 566 state.put(KEY_LOCK_OWNER, null); 567 state.put(KEY_LOCK_CREATED, null); 568 // return old lock 569 return new Lock(oldOwner, oldCreated); 570 } 571 572 @Override 573 public void closeLockManager() { 574 } 575 576 @Override 577 public void clearLockManagerCaches() { 578 } 579 580 protected List<List<String>> binaryPaths; 581 582 @Override 583 protected void initBlobsPaths() { 584 MemBlobFinder finder = new MemBlobFinder(); 585 finder.visit(); 586 binaryPaths = finder.binaryPaths; 587 } 588 589 protected static class MemBlobFinder extends BlobFinder { 590 protected List<List<String>> binaryPaths = new ArrayList<>(); 591 592 @Override 593 protected void recordBlobPath() { 594 binaryPaths.add(new ArrayList<>(path)); 595 } 596 } 597 598 @Override 599 public void markReferencedBinaries() { 600 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 601 for (State state : states.values()) { 602 for (List<String> path : binaryPaths) { 603 markReferencedBinaries(state, path, 0, blobManager); 604 } 605 } 606 } 607 608 protected void markReferencedBinaries(State state, List<String> path, int start, DocumentBlobManager blobManager) { 609 for (int i = start; i < path.size(); i++) { 610 String name = path.get(i); 611 Serializable value = state.get(name); 612 if (value instanceof State) { 613 state = (State) value; 614 } else { 615 if (value instanceof List) { 616 @SuppressWarnings("unchecked") 617 List<Object> list = (List<Object>) value; 618 for (Object v : list) { 619 if (v instanceof State) { 620 markReferencedBinaries((State) v, path, i + 1, blobManager); 621 } else { 622 markReferencedBinary(v, blobManager); 623 } 624 } 625 } 626 state = null; 627 break; 628 } 629 } 630 if (state != null) { 631 Serializable data = state.get(KEY_BLOB_DATA); 632 markReferencedBinary(data, blobManager); 633 } 634 } 635 636 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 637 if (!(value instanceof String)) { 638 return; 639 } 640 String key = (String) value; 641 blobManager.markReferencedBinary(key, repositoryName); 642 } 643 644}