001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID; 023import static org.nuxeo.ecm.core.storage.State.NOP; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 033 034import java.io.Serializable; 035import java.lang.reflect.Array; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Calendar; 039import java.util.Collections; 040import java.util.List; 041import java.util.Map; 042import java.util.Map.Entry; 043import java.util.Set; 044import java.util.UUID; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.CopyOnWriteArrayList; 047import java.util.concurrent.atomic.AtomicLong; 048 049import javax.resource.spi.ConnectionManager; 050 051import org.apache.commons.logging.Log; 052import org.apache.commons.logging.LogFactory; 053import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 054import org.nuxeo.ecm.core.api.DocumentNotFoundException; 055import org.nuxeo.ecm.core.api.Lock; 056import org.nuxeo.ecm.core.api.NuxeoException; 057import org.nuxeo.ecm.core.api.PartialList; 058import org.nuxeo.ecm.core.api.ScrollResult; 059import org.nuxeo.ecm.core.api.ScrollResultImpl; 060import org.nuxeo.ecm.core.api.model.Delta; 061import org.nuxeo.ecm.core.blob.BlobManager; 062import org.nuxeo.ecm.core.model.LockManager; 063import org.nuxeo.ecm.core.model.Repository; 064import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 065import org.nuxeo.ecm.core.storage.State; 066import org.nuxeo.ecm.core.storage.State.ListDiff; 067import org.nuxeo.ecm.core.storage.State.StateDiff; 068import org.nuxeo.ecm.core.storage.StateHelper; 069import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 070import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 071import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 072import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 073import org.nuxeo.runtime.api.Framework; 074 075/** 076 * In-memory implementation of a {@link Repository}. 077 * <p> 078 * Internally, the repository is a map from id to document object. 079 * <p> 080 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 081 * for the description of the document. 082 * 083 * @since 5.9.4 084 */ 085public class MemRepository extends DBSRepositoryBase { 086 087 private static final Log log = LogFactory.getLog(MemRepository.class); 088 089 protected static final String NOSCROLL_ID = "noscroll"; 090 091 // for debug 092 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 093 094 /** 095 * The content of the repository, a map of document id -> object. 096 */ 097 protected Map<String, State> states; 098 099 public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) { 100 super(cm, descriptor.name, descriptor); 101 initRepository(); 102 } 103 104 @Override 105 public List<IdType> getAllowedIdTypes() { 106 return Collections.singletonList(IdType.varchar); 107 } 108 109 @Override 110 public void shutdown() { 111 super.shutdown(); 112 states = null; 113 } 114 115 protected void initRepository() { 116 states = new ConcurrentHashMap<>(); 117 initRoot(); 118 } 119 120 @Override 121 public String generateNewId() { 122 if (DEBUG_UUIDS) { 123 return "UUID_" + temporaryIdCounter.incrementAndGet(); 124 } else { 125 return UUID.randomUUID().toString(); 126 } 127 } 128 129 @Override 130 public State readState(String id) { 131 State state = states.get(id); 132 if (state != null) { 133 if (log.isTraceEnabled()) { 134 log.trace("Mem: READ " + id + ": " + state); 135 } 136 } 137 return state; 138 } 139 140 @Override 141 public List<State> readStates(List<String> ids) { 142 List<State> list = new ArrayList<>(); 143 for (String id : ids) { 144 list.add(readState(id)); 145 } 146 return list; 147 } 148 149 @Override 150 public void createState(State state) { 151 String id = (String) state.get(KEY_ID); 152 if (log.isTraceEnabled()) { 153 log.trace("Mem: CREATE " + id + ": " + state); 154 } 155 if (states.containsKey(id)) { 156 throw new NuxeoException("Already exists: " + id); 157 } 158 state = StateHelper.deepCopy(state, true); // thread-safe 159 StateHelper.resetDeltas(state); 160 states.put(id, state); 161 } 162 163 @Override 164 public void updateState(String id, StateDiff diff) { 165 if (log.isTraceEnabled()) { 166 log.trace("Mem: UPDATE " + id + ": " + diff); 167 } 168 State state = states.get(id); 169 if (state == null) { 170 throw new ConcurrentUpdateException("Missing: " + id); 171 } 172 applyDiff(state, diff); 173 } 174 175 @Override 176 public void deleteStates(Set<String> ids) { 177 if (log.isTraceEnabled()) { 178 log.trace("Mem: REMOVE " + ids); 179 } 180 for (String id : ids) { 181 if (states.remove(id) == null) { 182 log.debug("Missing on remove: " + id); 183 } 184 } 185 } 186 187 @Override 188 public State readChildState(String parentId, String name, Set<String> ignored) { 189 // TODO optimize by maintaining a parent/child index 190 for (State state : states.values()) { 191 if (ignored.contains(state.get(KEY_ID))) { 192 continue; 193 } 194 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 195 continue; 196 } 197 if (!name.equals(state.get(KEY_NAME))) { 198 continue; 199 } 200 return state; 201 } 202 return null; 203 } 204 205 @Override 206 public boolean hasChild(String parentId, String name, Set<String> ignored) { 207 return readChildState(parentId, name, ignored) != null; 208 } 209 210 @Override 211 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 212 if (log.isTraceEnabled()) { 213 log.trace("Mem: QUERY " + key + " = " + value); 214 } 215 List<State> list = new ArrayList<>(); 216 for (State state : states.values()) { 217 String id = (String) state.get(KEY_ID); 218 if (ignored.contains(id)) { 219 continue; 220 } 221 if (!value.equals(state.get(key))) { 222 continue; 223 } 224 list.add(state); 225 } 226 if (log.isTraceEnabled() && !list.isEmpty()) { 227 log.trace("Mem: -> " + list.size()); 228 } 229 return list; 230 } 231 232 @Override 233 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 234 if (log.isTraceEnabled()) { 235 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 236 } 237 List<State> list = new ArrayList<>(); 238 for (State state : states.values()) { 239 String id = (String) state.get(KEY_ID); 240 if (ignored.contains(id)) { 241 continue; 242 } 243 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 244 continue; 245 } 246 list.add(state); 247 } 248 if (log.isTraceEnabled() && !list.isEmpty()) { 249 log.trace("Mem: -> " + list.size()); 250 } 251 return list; 252 } 253 254 @Override 255 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 256 Map<String, Object[]> targetProxies) { 257 if (log.isTraceEnabled()) { 258 log.trace("Mem: QUERY " + key + " = " + value); 259 } 260 STATE: for (State state : states.values()) { 261 Object[] array = (Object[]) state.get(key); 262 String id = (String) state.get(KEY_ID); 263 if (array != null) { 264 for (Object v : array) { 265 if (value.equals(v)) { 266 ids.add(id); 267 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 268 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 269 proxyTargets.put(id, targetId); 270 } 271 if (targetProxies != null) { 272 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 273 if (proxyIds != null) { 274 targetProxies.put(id, proxyIds); 275 } 276 } 277 continue STATE; 278 } 279 } 280 } 281 } 282 if (log.isTraceEnabled() && !ids.isEmpty()) { 283 log.trace("Mem: -> " + ids.size()); 284 } 285 } 286 287 @Override 288 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 289 if (log.isTraceEnabled()) { 290 log.trace("Mem: QUERY " + key + " = " + value); 291 } 292 for (State state : states.values()) { 293 String id = (String) state.get(KEY_ID); 294 if (ignored.contains(id)) { 295 continue; 296 } 297 if (value.equals(state.get(key))) { 298 if (log.isTraceEnabled()) { 299 log.trace("Mem: -> present"); 300 } 301 return true; 302 } 303 } 304 if (log.isTraceEnabled()) { 305 log.trace("Mem: -> absent"); 306 } 307 return false; 308 } 309 310 @Override 311 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 312 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 313 if (log.isTraceEnabled()) { 314 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 315 } 316 evaluator.parse(); 317 List<Map<String, Serializable>> projections = new ArrayList<>(); 318 for (State state : states.values()) { 319 List<Map<String, Serializable>> matches = evaluator.matches(state); 320 if (!matches.isEmpty()) { 321 if (distinctDocuments) { 322 projections.add(matches.get(0)); 323 } else { 324 projections.addAll(matches); 325 } 326 } 327 } 328 // ORDER BY 329 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 330 if (orderByClause != null) { 331 Collections.sort(projections, new OrderByComparator(orderByClause)); 332 } 333 // LIMIT / OFFSET 334 int totalSize = projections.size(); 335 if (countUpTo == -1) { 336 // count full size 337 } else if (countUpTo == 0) { 338 // no count 339 totalSize = -1; // not counted 340 } else { 341 // count only if less than countUpTo 342 if (totalSize > countUpTo) { 343 totalSize = -2; // truncated 344 } 345 } 346 if (limit != 0) { 347 int size = projections.size(); 348 projections.subList(0, offset > size ? size : offset).clear(); 349 size = projections.size(); 350 if (limit < size) { 351 projections.subList(limit, size).clear(); 352 } 353 } 354 // TODO DISTINCT 355 356 if (log.isTraceEnabled() && !projections.isEmpty()) { 357 log.trace("Mem: -> " + projections.size()); 358 } 359 return new PartialList<>(projections, totalSize); 360 } 361 362 @Override 363 public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 364 if (log.isTraceEnabled()) { 365 log.trace("Mem: QUERY " + evaluator); 366 } 367 evaluator.parse(); 368 List<String> ids = new ArrayList<>(); 369 for (State state : states.values()) { 370 List<Map<String, Serializable>> matches = evaluator.matches(state); 371 if (!matches.isEmpty()) { 372 String id = matches.get(0).get(ECM_UUID).toString(); 373 ids.add(id); 374 } 375 } 376 return new ScrollResultImpl(NOSCROLL_ID, ids); 377 } 378 379 @Override 380 public ScrollResult scroll(String scrollId) { 381 if (NOSCROLL_ID.equals(scrollId)) { 382 // Id are already in memory, they are returned as a single batch 383 return ScrollResultImpl.emptyResult(); 384 } 385 throw new NuxeoException("Unknown or timed out scrollId"); 386 } 387 388 /** 389 * Applies a {@link StateDiff} in-place onto a base {@link State}. 390 * <p> 391 * Uses thread-safe datastructures. 392 */ 393 public static void applyDiff(State state, StateDiff stateDiff) { 394 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 395 String key = en.getKey(); 396 Serializable diffElem = en.getValue(); 397 if (diffElem instanceof StateDiff) { 398 Serializable old = state.get(key); 399 if (old == null) { 400 old = new State(true); // thread-safe 401 state.put(key, old); 402 // enter the next if 403 } 404 if (!(old instanceof State)) { 405 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 406 } 407 applyDiff((State) old, (StateDiff) diffElem); 408 } else if (diffElem instanceof ListDiff) { 409 state.put(key, applyDiff(state.get(key), (ListDiff) diffElem)); 410 } else if (diffElem instanceof Delta) { 411 Delta delta = (Delta) diffElem; 412 Number oldValue = (Number) state.get(key); 413 Number value; 414 if (oldValue == null) { 415 value = delta.getFullValue(); 416 } else { 417 value = delta.add(oldValue); 418 } 419 state.put(key, value); 420 } else { 421 state.put(key, StateHelper.deepCopy(diffElem, true)); // thread-safe 422 } 423 } 424 } 425 426 /** 427 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 428 * <p> 429 * Uses thread-safe datastructures. 430 */ 431 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 432 // internally work on a list 433 // TODO this is costly, use a separate code path for arrays 434 Class<?> arrayComponentType = null; 435 if (listDiff.isArray && value != null) { 436 if (!(value instanceof Object[])) { 437 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 438 } 439 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 440 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 441 } 442 if (value == null) { 443 value = new CopyOnWriteArrayList<>(); 444 } 445 if (!(value instanceof List)) { 446 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 447 } 448 @SuppressWarnings("unchecked") 449 List<Serializable> list = (List<Serializable>) value; 450 if (listDiff.diff != null) { 451 int i = 0; 452 for (Object diffElem : listDiff.diff) { 453 if (i >= list.size()) { 454 // TODO log error applying diff to shorter list 455 break; 456 } 457 if (diffElem instanceof StateDiff) { 458 applyDiff((State) list.get(i), (StateDiff) diffElem); 459 } else if (diffElem != NOP) { 460 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 461 } 462 i++; 463 } 464 } 465 if (listDiff.rpush != null) { 466 // deepCopy of what we'll add 467 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 468 for (Object v : listDiff.rpush) { 469 add.add(StateHelper.deepCopy(v, true)); // thread-safe 470 } 471 // update CopyOnWriteArrayList in one step 472 list.addAll(add); 473 } 474 // convert back to array if needed 475 if (listDiff.isArray) { 476 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 477 } else { 478 return list.isEmpty() ? null : (Serializable) list; 479 } 480 } 481 482 /* synchronized */ 483 @Override 484 public synchronized Lock getLock(String id) { 485 State state = states.get(id); 486 if (state == null) { 487 // document not found 488 throw new DocumentNotFoundException(id); 489 } 490 String owner = (String) state.get(KEY_LOCK_OWNER); 491 if (owner == null) { 492 return null; 493 } 494 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 495 return new Lock(owner, created); 496 } 497 498 /* synchronized */ 499 @Override 500 public synchronized Lock setLock(String id, Lock lock) { 501 State state = states.get(id); 502 if (state == null) { 503 // document not found 504 throw new DocumentNotFoundException(id); 505 } 506 String owner = (String) state.get(KEY_LOCK_OWNER); 507 if (owner != null) { 508 // return old lock 509 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 510 return new Lock(owner, created); 511 } 512 state.put(KEY_LOCK_OWNER, lock.getOwner()); 513 state.put(KEY_LOCK_CREATED, lock.getCreated()); 514 return null; 515 } 516 517 /* synchronized */ 518 @Override 519 public synchronized Lock removeLock(String id, String owner) { 520 State state = states.get(id); 521 if (state == null) { 522 // document not found 523 throw new DocumentNotFoundException(id); 524 } 525 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 526 if (oldOwner == null) { 527 // no previous lock 528 return null; 529 } 530 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 531 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 532 // existing mismatched lock, flag failure 533 return new Lock(oldOwner, oldCreated, true); 534 } 535 // remove lock 536 state.put(KEY_LOCK_OWNER, null); 537 state.put(KEY_LOCK_CREATED, null); 538 // return old lock 539 return new Lock(oldOwner, oldCreated); 540 } 541 542 @Override 543 public void closeLockManager() { 544 } 545 546 @Override 547 public void clearLockManagerCaches() { 548 } 549 550 protected List<List<String>> binaryPaths; 551 552 @Override 553 protected void initBlobsPaths() { 554 MemBlobFinder finder = new MemBlobFinder(); 555 finder.visit(); 556 binaryPaths = finder.binaryPaths; 557 } 558 559 protected static class MemBlobFinder extends BlobFinder { 560 protected List<List<String>> binaryPaths = new ArrayList<>(); 561 562 @Override 563 protected void recordBlobPath() { 564 binaryPaths.add(new ArrayList<>(path)); 565 } 566 } 567 568 @Override 569 public void markReferencedBinaries() { 570 BlobManager blobManager = Framework.getService(BlobManager.class); 571 for (State state : states.values()) { 572 for (List<String> path : binaryPaths) { 573 markReferencedBinaries(state, path, 0, blobManager); 574 } 575 } 576 } 577 578 protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) { 579 for (int i = start; i < path.size(); i++) { 580 String name = path.get(i); 581 Serializable value = state.get(name); 582 if (value instanceof State) { 583 state = (State) value; 584 } else { 585 if (value instanceof List) { 586 @SuppressWarnings("unchecked") 587 List<Object> list = (List<Object>) value; 588 for (Object v : list) { 589 if (v instanceof State) { 590 markReferencedBinaries((State) v, path, i + 1, blobManager); 591 } else { 592 markReferencedBinary(v, blobManager); 593 } 594 } 595 } 596 state = null; 597 break; 598 } 599 } 600 if (state != null) { 601 Serializable data = state.get(KEY_BLOB_DATA); 602 markReferencedBinary(data, blobManager); 603 } 604 } 605 606 protected void markReferencedBinary(Object value, BlobManager blobManager) { 607 if (!(value instanceof String)) { 608 return; 609 } 610 String key = (String) value; 611 blobManager.markReferencedBinary(key, repositoryName); 612 } 613 614}