001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID; 023import static org.nuxeo.ecm.core.storage.State.NOP; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 033 034import java.io.Serializable; 035import java.lang.reflect.Array; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Calendar; 039import java.util.Collections; 040import java.util.List; 041import java.util.Map; 042import java.util.Map.Entry; 043import java.util.Objects; 044import java.util.Set; 045import java.util.UUID; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.CopyOnWriteArrayList; 048import java.util.concurrent.atomic.AtomicLong; 049 050import javax.resource.spi.ConnectionManager; 051 052import org.apache.commons.logging.Log; 053import org.apache.commons.logging.LogFactory; 054import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 055import org.nuxeo.ecm.core.api.DocumentNotFoundException; 056import org.nuxeo.ecm.core.api.Lock; 057import org.nuxeo.ecm.core.api.NuxeoException; 058import org.nuxeo.ecm.core.api.PartialList; 059import org.nuxeo.ecm.core.api.ScrollResult; 060import org.nuxeo.ecm.core.api.ScrollResultImpl; 061import org.nuxeo.ecm.core.api.model.Delta; 062import org.nuxeo.ecm.core.blob.DocumentBlobManager; 063import org.nuxeo.ecm.core.model.LockManager; 064import org.nuxeo.ecm.core.model.Repository; 065import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 066import org.nuxeo.ecm.core.storage.State; 067import org.nuxeo.ecm.core.storage.State.ListDiff; 068import org.nuxeo.ecm.core.storage.State.StateDiff; 069import org.nuxeo.ecm.core.storage.StateHelper; 070import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 071import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 072import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 073import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 074import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 075import org.nuxeo.runtime.api.Framework; 076 077/** 078 * In-memory implementation of a {@link Repository}. 079 * <p> 080 * Internally, the repository is a map from id to document object. 081 * <p> 082 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 083 * for the description of the document. 084 * 085 * @since 5.9.4 086 */ 087public class MemRepository extends DBSRepositoryBase { 088 089 private static final Log log = LogFactory.getLog(MemRepository.class); 090 091 protected static final String NOSCROLL_ID = "noscroll"; 092 093 // for debug 094 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 095 096 /** 097 * The content of the repository, a map of document id -> object. 098 */ 099 protected Map<String, State> states; 100 101 public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) { 102 super(cm, descriptor.name, descriptor); 103 initRepository(); 104 } 105 106 @Override 107 public List<IdType> getAllowedIdTypes() { 108 return Collections.singletonList(IdType.varchar); 109 } 110 111 @Override 112 public void shutdown() { 113 super.shutdown(); 114 states = null; 115 } 116 117 protected void initRepository() { 118 states = new ConcurrentHashMap<>(); 119 initRoot(); 120 } 121 122 @Override 123 public String generateNewId() { 124 if (DEBUG_UUIDS) { 125 return "UUID_" + temporaryIdCounter.incrementAndGet(); 126 } else { 127 return UUID.randomUUID().toString(); 128 } 129 } 130 131 @Override 132 public State readState(String id) { 133 State state = states.get(id); 134 if (state != null) { 135 if (log.isTraceEnabled()) { 136 log.trace("Mem: READ " + id + ": " + state); 137 } 138 } 139 return state; 140 } 141 142 @Override 143 public List<State> readStates(List<String> ids) { 144 List<State> list = new ArrayList<>(); 145 for (String id : ids) { 146 list.add(readState(id)); 147 } 148 return list; 149 } 150 151 @Override 152 public void createState(State state) { 153 String id = (String) state.get(KEY_ID); 154 if (log.isTraceEnabled()) { 155 log.trace("Mem: CREATE " + id + ": " + state); 156 } 157 if (states.containsKey(id)) { 158 throw new NuxeoException("Already exists: " + id); 159 } 160 state = StateHelper.deepCopy(state, true); // thread-safe 161 StateHelper.resetDeltas(state); 162 states.put(id, state); 163 } 164 165 @Override 166 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 167 if (log.isTraceEnabled()) { 168 log.trace("Mem: UPDATE " + id + ": " + diff); 169 } 170 State state = states.get(id); 171 if (state == null) { 172 throw new ConcurrentUpdateException("Missing: " + id); 173 } 174 synchronized (state) { 175 // synchronization needed for atomic change token 176 if (changeTokenUpdater != null) { 177 for (Entry<String, Serializable> en : changeTokenUpdater.getConditions().entrySet()) { 178 if (!Objects.equals(state.get(en.getKey()), en.getValue())) { 179 throw new ConcurrentUpdateException((String) state.get(KEY_ID)); 180 } 181 } 182 for (Entry<String, Serializable> en : changeTokenUpdater.getUpdates().entrySet()) { 183 applyDiff(state, en.getKey(), en.getValue()); 184 } 185 } 186 applyDiff(state, diff); 187 } 188 } 189 190 @Override 191 public void deleteStates(Set<String> ids) { 192 if (log.isTraceEnabled()) { 193 log.trace("Mem: REMOVE " + ids); 194 } 195 for (String id : ids) { 196 if (states.remove(id) == null) { 197 log.debug("Missing on remove: " + id); 198 } 199 } 200 } 201 202 @Override 203 public State readChildState(String parentId, String name, Set<String> ignored) { 204 // TODO optimize by maintaining a parent/child index 205 for (State state : states.values()) { 206 if (ignored.contains(state.get(KEY_ID))) { 207 continue; 208 } 209 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 210 continue; 211 } 212 if (!name.equals(state.get(KEY_NAME))) { 213 continue; 214 } 215 return state; 216 } 217 return null; 218 } 219 220 @Override 221 public boolean hasChild(String parentId, String name, Set<String> ignored) { 222 return readChildState(parentId, name, ignored) != null; 223 } 224 225 @Override 226 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 227 if (log.isTraceEnabled()) { 228 log.trace("Mem: QUERY " + key + " = " + value); 229 } 230 List<State> list = new ArrayList<>(); 231 for (State state : states.values()) { 232 String id = (String) state.get(KEY_ID); 233 if (ignored.contains(id)) { 234 continue; 235 } 236 if (!value.equals(state.get(key))) { 237 continue; 238 } 239 list.add(state); 240 } 241 if (log.isTraceEnabled() && !list.isEmpty()) { 242 log.trace("Mem: -> " + list.size()); 243 } 244 return list; 245 } 246 247 @Override 248 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 249 if (log.isTraceEnabled()) { 250 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 251 } 252 List<State> list = new ArrayList<>(); 253 for (State state : states.values()) { 254 String id = (String) state.get(KEY_ID); 255 if (ignored.contains(id)) { 256 continue; 257 } 258 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 259 continue; 260 } 261 list.add(state); 262 } 263 if (log.isTraceEnabled() && !list.isEmpty()) { 264 log.trace("Mem: -> " + list.size()); 265 } 266 return list; 267 } 268 269 @Override 270 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 271 Map<String, Object[]> targetProxies) { 272 if (log.isTraceEnabled()) { 273 log.trace("Mem: QUERY " + key + " = " + value); 274 } 275 STATE: for (State state : states.values()) { 276 Object[] array = (Object[]) state.get(key); 277 String id = (String) state.get(KEY_ID); 278 if (array != null) { 279 for (Object v : array) { 280 if (value.equals(v)) { 281 ids.add(id); 282 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 283 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 284 proxyTargets.put(id, targetId); 285 } 286 if (targetProxies != null) { 287 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 288 if (proxyIds != null) { 289 targetProxies.put(id, proxyIds); 290 } 291 } 292 continue STATE; 293 } 294 } 295 } 296 } 297 if (log.isTraceEnabled() && !ids.isEmpty()) { 298 log.trace("Mem: -> " + ids.size()); 299 } 300 } 301 302 @Override 303 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 304 if (log.isTraceEnabled()) { 305 log.trace("Mem: QUERY " + key + " = " + value); 306 } 307 for (State state : states.values()) { 308 String id = (String) state.get(KEY_ID); 309 if (ignored.contains(id)) { 310 continue; 311 } 312 if (value.equals(state.get(key))) { 313 if (log.isTraceEnabled()) { 314 log.trace("Mem: -> present"); 315 } 316 return true; 317 } 318 } 319 if (log.isTraceEnabled()) { 320 log.trace("Mem: -> absent"); 321 } 322 return false; 323 } 324 325 @Override 326 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 327 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 328 if (log.isTraceEnabled()) { 329 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 330 } 331 evaluator.parse(); 332 List<Map<String, Serializable>> projections = new ArrayList<>(); 333 for (State state : states.values()) { 334 List<Map<String, Serializable>> matches = evaluator.matches(state); 335 if (!matches.isEmpty()) { 336 if (distinctDocuments) { 337 projections.add(matches.get(0)); 338 } else { 339 projections.addAll(matches); 340 } 341 } 342 } 343 // ORDER BY 344 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 345 if (orderByClause != null) { 346 Collections.sort(projections, new OrderByComparator(orderByClause)); 347 } 348 // LIMIT / OFFSET 349 int totalSize = projections.size(); 350 if (countUpTo == -1) { 351 // count full size 352 } else if (countUpTo == 0) { 353 // no count 354 totalSize = -1; // not counted 355 } else { 356 // count only if less than countUpTo 357 if (totalSize > countUpTo) { 358 totalSize = -2; // truncated 359 } 360 } 361 if (limit != 0) { 362 int size = projections.size(); 363 projections.subList(0, offset > size ? size : offset).clear(); 364 size = projections.size(); 365 if (limit < size) { 366 projections.subList(limit, size).clear(); 367 } 368 } 369 // TODO DISTINCT 370 371 if (log.isTraceEnabled() && !projections.isEmpty()) { 372 log.trace("Mem: -> " + projections.size()); 373 } 374 return new PartialList<>(projections, totalSize); 375 } 376 377 @Override 378 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 379 if (log.isTraceEnabled()) { 380 log.trace("Mem: QUERY " + evaluator); 381 } 382 evaluator.parse(); 383 List<String> ids = new ArrayList<>(); 384 for (State state : states.values()) { 385 List<Map<String, Serializable>> matches = evaluator.matches(state); 386 if (!matches.isEmpty()) { 387 String id = matches.get(0).get(ECM_UUID).toString(); 388 ids.add(id); 389 } 390 } 391 return new ScrollResultImpl(NOSCROLL_ID, ids); 392 } 393 394 @Override 395 public ScrollResult scroll(String scrollId) { 396 if (NOSCROLL_ID.equals(scrollId)) { 397 // Id are already in memory, they are returned as a single batch 398 return ScrollResultImpl.emptyResult(); 399 } 400 throw new NuxeoException("Unknown or timed out scrollId"); 401 } 402 403 /** 404 * Applies a {@link StateDiff} in-place onto a base {@link State}. 405 * <p> 406 * Uses thread-safe datastructures. 407 */ 408 public static void applyDiff(State state, StateDiff stateDiff) { 409 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 410 applyDiff(state, en.getKey(), en.getValue()); 411 } 412 } 413 414 /** 415 * Applies a key/value diff in-place onto a base {@link State}. 416 * <p> 417 * Uses thread-safe datastructures. 418 */ 419 protected static void applyDiff(State state, String key, Serializable value) { 420 if (value instanceof StateDiff) { 421 Serializable old = state.get(key); 422 if (old == null) { 423 old = new State(true); // thread-safe 424 state.put(key, old); 425 // enter the next if 426 } 427 if (!(old instanceof State)) { 428 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 429 } 430 applyDiff((State) old, (StateDiff) value); 431 } else if (value instanceof ListDiff) { 432 state.put(key, applyDiff(state.get(key), (ListDiff) value)); 433 } else if (value instanceof Delta) { 434 Delta delta = (Delta) value; 435 Number oldValue = (Number) state.get(key); 436 Number newValue; 437 if (oldValue == null) { 438 newValue = delta.getFullValue(); 439 } else { 440 newValue = delta.add(oldValue); 441 } 442 state.put(key, newValue); 443 } else { 444 state.put(key, StateHelper.deepCopy(value, true)); // thread-safe 445 } 446 } 447 448 /** 449 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 450 * <p> 451 * Uses thread-safe datastructures. 452 */ 453 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 454 // internally work on a list 455 // TODO this is costly, use a separate code path for arrays 456 Class<?> arrayComponentType = null; 457 if (listDiff.isArray && value != null) { 458 if (!(value instanceof Object[])) { 459 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 460 } 461 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 462 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 463 } 464 if (value == null) { 465 value = new CopyOnWriteArrayList<>(); 466 } 467 if (!(value instanceof List)) { 468 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 469 } 470 @SuppressWarnings("unchecked") 471 List<Serializable> list = (List<Serializable>) value; 472 if (listDiff.diff != null) { 473 int i = 0; 474 for (Object diffElem : listDiff.diff) { 475 if (i >= list.size()) { 476 // TODO log error applying diff to shorter list 477 break; 478 } 479 if (diffElem instanceof StateDiff) { 480 applyDiff((State) list.get(i), (StateDiff) diffElem); 481 } else if (diffElem != NOP) { 482 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 483 } 484 i++; 485 } 486 } 487 if (listDiff.rpush != null) { 488 // deepCopy of what we'll add 489 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 490 for (Object v : listDiff.rpush) { 491 add.add(StateHelper.deepCopy(v, true)); // thread-safe 492 } 493 // update CopyOnWriteArrayList in one step 494 list.addAll(add); 495 } 496 // convert back to array if needed 497 if (listDiff.isArray) { 498 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 499 } else { 500 return list.isEmpty() ? null : (Serializable) list; 501 } 502 } 503 504 /* synchronized */ 505 @Override 506 public synchronized Lock getLock(String id) { 507 State state = states.get(id); 508 if (state == null) { 509 // document not found 510 throw new DocumentNotFoundException(id); 511 } 512 String owner = (String) state.get(KEY_LOCK_OWNER); 513 if (owner == null) { 514 return null; 515 } 516 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 517 return new Lock(owner, created); 518 } 519 520 /* synchronized */ 521 @Override 522 public synchronized Lock setLock(String id, Lock lock) { 523 State state = states.get(id); 524 if (state == null) { 525 // document not found 526 throw new DocumentNotFoundException(id); 527 } 528 String owner = (String) state.get(KEY_LOCK_OWNER); 529 if (owner != null) { 530 // return old lock 531 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 532 return new Lock(owner, created); 533 } 534 state.put(KEY_LOCK_OWNER, lock.getOwner()); 535 state.put(KEY_LOCK_CREATED, lock.getCreated()); 536 return null; 537 } 538 539 /* synchronized */ 540 @Override 541 public synchronized Lock removeLock(String id, String owner) { 542 State state = states.get(id); 543 if (state == null) { 544 // document not found 545 throw new DocumentNotFoundException(id); 546 } 547 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 548 if (oldOwner == null) { 549 // no previous lock 550 return null; 551 } 552 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 553 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 554 // existing mismatched lock, flag failure 555 return new Lock(oldOwner, oldCreated, true); 556 } 557 // remove lock 558 state.put(KEY_LOCK_OWNER, null); 559 state.put(KEY_LOCK_CREATED, null); 560 // return old lock 561 return new Lock(oldOwner, oldCreated); 562 } 563 564 @Override 565 public void closeLockManager() { 566 } 567 568 @Override 569 public void clearLockManagerCaches() { 570 } 571 572 protected List<List<String>> binaryPaths; 573 574 @Override 575 protected void initBlobsPaths() { 576 MemBlobFinder finder = new MemBlobFinder(); 577 finder.visit(); 578 binaryPaths = finder.binaryPaths; 579 } 580 581 protected static class MemBlobFinder extends BlobFinder { 582 protected List<List<String>> binaryPaths = new ArrayList<>(); 583 584 @Override 585 protected void recordBlobPath() { 586 binaryPaths.add(new ArrayList<>(path)); 587 } 588 } 589 590 @Override 591 public void markReferencedBinaries() { 592 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 593 for (State state : states.values()) { 594 for (List<String> path : binaryPaths) { 595 markReferencedBinaries(state, path, 0, blobManager); 596 } 597 } 598 } 599 600 protected void markReferencedBinaries(State state, List<String> path, int start, DocumentBlobManager blobManager) { 601 for (int i = start; i < path.size(); i++) { 602 String name = path.get(i); 603 Serializable value = state.get(name); 604 if (value instanceof State) { 605 state = (State) value; 606 } else { 607 if (value instanceof List) { 608 @SuppressWarnings("unchecked") 609 List<Object> list = (List<Object>) value; 610 for (Object v : list) { 611 if (v instanceof State) { 612 markReferencedBinaries((State) v, path, i + 1, blobManager); 613 } else { 614 markReferencedBinary(v, blobManager); 615 } 616 } 617 } 618 state = null; 619 break; 620 } 621 } 622 if (state != null) { 623 Serializable data = state.get(KEY_BLOB_DATA); 624 markReferencedBinary(data, blobManager); 625 } 626 } 627 628 protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) { 629 if (!(value instanceof String)) { 630 return; 631 } 632 String key = (String) value; 633 blobManager.markReferencedBinary(key, repositoryName); 634 } 635 636}