001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 032 033import java.io.Serializable; 034import java.lang.reflect.Array; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Calendar; 038import java.util.Collections; 039import java.util.List; 040import java.util.Map; 041import java.util.Map.Entry; 042import java.util.Set; 043import java.util.UUID; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.atomic.AtomicLong; 047 048import org.apache.commons.logging.Log; 049import org.apache.commons.logging.LogFactory; 050import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 051import org.nuxeo.ecm.core.api.DocumentNotFoundException; 052import org.nuxeo.ecm.core.api.Lock; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.api.PartialList; 055import org.nuxeo.ecm.core.api.model.Delta; 056import org.nuxeo.ecm.core.blob.BlobManager; 057import org.nuxeo.ecm.core.model.LockManager; 058import org.nuxeo.ecm.core.model.Repository; 059import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 060import org.nuxeo.ecm.core.storage.State; 061import org.nuxeo.ecm.core.storage.State.ListDiff; 062import org.nuxeo.ecm.core.storage.State.StateDiff; 063import org.nuxeo.ecm.core.storage.StateHelper; 064import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 065import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 066import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 067import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 068import org.nuxeo.runtime.api.Framework; 069 070/** 071 * In-memory implementation of a {@link Repository}. 072 * <p> 073 * Internally, the repository is a map from id to document object. 074 * <p> 075 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 076 * for the description of the document. 077 * 078 * @since 5.9.4 079 */ 080public class MemRepository extends DBSRepositoryBase { 081 082 private static final Log log = LogFactory.getLog(MemRepository.class); 083 084 // for debug 085 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 086 087 /** 088 * The content of the repository, a map of document id -> object. 089 */ 090 protected Map<String, State> states; 091 092 public MemRepository(MemRepositoryDescriptor descriptor) { 093 super(descriptor.name, descriptor.getFulltextDescriptor()); 094 initRepository(); 095 } 096 097 @Override 098 public void shutdown() { 099 super.shutdown(); 100 states = null; 101 } 102 103 protected void initRepository() { 104 states = new ConcurrentHashMap<>(); 105 initRoot(); 106 } 107 108 @Override 109 public String generateNewId() { 110 if (DEBUG_UUIDS) { 111 return "UUID_" + temporaryIdCounter.incrementAndGet(); 112 } else { 113 return UUID.randomUUID().toString(); 114 } 115 } 116 117 @Override 118 public State readState(String id) { 119 State state = states.get(id); 120 if (state != null) { 121 if (log.isTraceEnabled()) { 122 log.trace("Mem: READ " + id + ": " + state); 123 } 124 } 125 return state; 126 } 127 128 @Override 129 public List<State> readStates(List<String> ids) { 130 List<State> list = new ArrayList<>(); 131 for (String id : ids) { 132 list.add(readState(id)); 133 } 134 return list; 135 } 136 137 @Override 138 public void createState(State state) { 139 String id = (String) state.get(KEY_ID); 140 if (log.isTraceEnabled()) { 141 log.trace("Mem: CREATE " + id + ": " + state); 142 } 143 if (states.containsKey(id)) { 144 throw new NuxeoException("Already exists: " + id); 145 } 146 state = StateHelper.deepCopy(state, true); // thread-safe 147 StateHelper.resetDeltas(state); 148 states.put(id, state); 149 } 150 151 @Override 152 public void updateState(String id, StateDiff diff) { 153 if (log.isTraceEnabled()) { 154 log.trace("Mem: UPDATE " + id + ": " + diff); 155 } 156 State state = states.get(id); 157 if (state == null) { 158 throw new ConcurrentUpdateException("Missing: " + id); 159 } 160 applyDiff(state, diff); 161 } 162 163 @Override 164 public void deleteStates(Set<String> ids) { 165 if (log.isTraceEnabled()) { 166 log.trace("Mem: REMOVE " + ids); 167 } 168 for (String id : ids) { 169 if (states.remove(id) == null) { 170 log.debug("Missing on remove: " + id); 171 } 172 } 173 } 174 175 @Override 176 public State readChildState(String parentId, String name, Set<String> ignored) { 177 // TODO optimize by maintaining a parent/child index 178 for (State state : states.values()) { 179 if (ignored.contains(state.get(KEY_ID))) { 180 continue; 181 } 182 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 183 continue; 184 } 185 if (!name.equals(state.get(KEY_NAME))) { 186 continue; 187 } 188 return state; 189 } 190 return null; 191 } 192 193 @Override 194 public boolean hasChild(String parentId, String name, Set<String> ignored) { 195 return readChildState(parentId, name, ignored) != null; 196 } 197 198 @Override 199 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 200 if (log.isTraceEnabled()) { 201 log.trace("Mem: QUERY " + key + " = " + value); 202 } 203 List<State> list = new ArrayList<>(); 204 for (State state : states.values()) { 205 String id = (String) state.get(KEY_ID); 206 if (ignored.contains(id)) { 207 continue; 208 } 209 if (!value.equals(state.get(key))) { 210 continue; 211 } 212 list.add(state); 213 } 214 if (log.isTraceEnabled() && !list.isEmpty()) { 215 log.trace("Mem: -> " + list.size()); 216 } 217 return list; 218 } 219 220 @Override 221 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 222 if (log.isTraceEnabled()) { 223 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 224 } 225 List<State> list = new ArrayList<>(); 226 for (State state : states.values()) { 227 String id = (String) state.get(KEY_ID); 228 if (ignored.contains(id)) { 229 continue; 230 } 231 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 232 continue; 233 } 234 list.add(state); 235 } 236 if (log.isTraceEnabled() && !list.isEmpty()) { 237 log.trace("Mem: -> " + list.size()); 238 } 239 return list; 240 } 241 242 @Override 243 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 244 Map<String, Object[]> targetProxies) { 245 if (log.isTraceEnabled()) { 246 log.trace("Mem: QUERY " + key + " = " + value); 247 } 248 STATE: for (State state : states.values()) { 249 Object[] array = (Object[]) state.get(key); 250 String id = (String) state.get(KEY_ID); 251 if (array != null) { 252 for (Object v : array) { 253 if (value.equals(v)) { 254 ids.add(id); 255 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 256 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 257 proxyTargets.put(id, targetId); 258 } 259 if (targetProxies != null) { 260 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 261 if (proxyIds != null) { 262 targetProxies.put(id, proxyIds); 263 } 264 } 265 continue STATE; 266 } 267 } 268 } 269 } 270 if (log.isTraceEnabled() && !ids.isEmpty()) { 271 log.trace("Mem: -> " + ids.size()); 272 } 273 } 274 275 @Override 276 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 277 if (log.isTraceEnabled()) { 278 log.trace("Mem: QUERY " + key + " = " + value); 279 } 280 for (State state : states.values()) { 281 String id = (String) state.get(KEY_ID); 282 if (ignored.contains(id)) { 283 continue; 284 } 285 if (value.equals(state.get(key))) { 286 if (log.isTraceEnabled()) { 287 log.trace("Mem: -> present"); 288 } 289 return true; 290 } 291 } 292 if (log.isTraceEnabled()) { 293 log.trace("Mem: -> absent"); 294 } 295 return false; 296 } 297 298 @Override 299 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 300 OrderByClause orderByClause, int limit, int offset, int countUpTo) { 301 if (log.isTraceEnabled()) { 302 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 303 } 304 evaluator.parse(); 305 boolean wildcardProjection = evaluator.hasWildcardProjection(); 306 List<Map<String, Serializable>> projections = new ArrayList<>(); 307 for (State state : states.values()) { 308 List<Map<String, Serializable>> matches = evaluator.matches(state); 309 if (!matches.isEmpty()) { 310 if (wildcardProjection) { 311 // all projections are relevant 312 projections.addAll(matches); 313 } else { 314 // no wildcard in projection, all projections are identical, keep the first 315 projections.add(matches.get(0)); 316 } 317 } 318 } 319 // ORDER BY 320 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 321 if (orderByClause != null) { 322 Collections.sort(projections, new OrderByComparator(orderByClause)); 323 } 324 // LIMIT / OFFSET 325 int totalSize = projections.size(); 326 if (countUpTo == -1) { 327 // count full size 328 } else if (countUpTo == 0) { 329 // no count 330 totalSize = -1; // not counted 331 } else { 332 // count only if less than countUpTo 333 if (totalSize > countUpTo) { 334 totalSize = -2; // truncated 335 } 336 } 337 if (limit != 0) { 338 int size = projections.size(); 339 projections.subList(0, offset > size ? size : offset).clear(); 340 size = projections.size(); 341 if (limit < size) { 342 projections.subList(limit, size).clear(); 343 } 344 } 345 // TODO DISTINCT 346 347 if (log.isTraceEnabled() && !projections.isEmpty()) { 348 log.trace("Mem: -> " + projections.size()); 349 } 350 return new PartialList<>(projections, totalSize); 351 } 352 353 /** 354 * Applies a {@link StateDiff} in-place onto a base {@link State}. 355 * <p> 356 * Uses thread-safe datastructures. 357 */ 358 public static void applyDiff(State state, StateDiff stateDiff) { 359 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 360 String key = en.getKey(); 361 Serializable diffElem = en.getValue(); 362 if (diffElem instanceof StateDiff) { 363 Serializable old = state.get(key); 364 if (old == null) { 365 old = new State(true); // thread-safe 366 state.put(key, old); 367 // enter the next if 368 } 369 if (!(old instanceof State)) { 370 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 371 } 372 applyDiff((State) old, (StateDiff) diffElem); 373 } else if (diffElem instanceof ListDiff) { 374 state.put(key, applyDiff(state.get(key), (ListDiff) diffElem)); 375 } else if (diffElem instanceof Delta) { 376 Delta delta = (Delta) diffElem; 377 Number oldValue = (Number) state.get(key); 378 Number value; 379 if (oldValue == null) { 380 value = delta.getFullValue(); 381 } else { 382 value = delta.add(oldValue); 383 } 384 state.put(key, value); 385 } else { 386 state.put(key, diffElem); 387 } 388 } 389 } 390 391 /** 392 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 393 * <p> 394 * Uses thread-safe datastructures. 395 */ 396 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 397 // internally work on a list 398 // TODO this is costly, use a separate code path for arrays 399 Class<?> arrayComponentType = null; 400 if (listDiff.isArray && value != null) { 401 if (!(value instanceof Object[])) { 402 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 403 } 404 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 405 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 406 } 407 if (value == null) { 408 value = new CopyOnWriteArrayList<>(); 409 } 410 if (!(value instanceof List)) { 411 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 412 } 413 @SuppressWarnings("unchecked") 414 List<Serializable> list = (List<Serializable>) value; 415 if (listDiff.diff != null) { 416 int i = 0; 417 for (Object diffElem : listDiff.diff) { 418 if (i >= list.size()) { 419 // TODO log error applying diff to shorter list 420 break; 421 } 422 if (diffElem instanceof StateDiff) { 423 applyDiff((State) list.get(i), (StateDiff) diffElem); 424 } else if (diffElem != NOP) { 425 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 426 } 427 i++; 428 } 429 } 430 if (listDiff.rpush != null) { 431 // deepCopy of what we'll add 432 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 433 for (Object v : listDiff.rpush) { 434 add.add(StateHelper.deepCopy(v, true)); // thread-safe 435 } 436 // update CopyOnWriteArrayList in one step 437 list.addAll(add); 438 } 439 // convert back to array if needed 440 if (listDiff.isArray) { 441 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 442 } else { 443 return list.isEmpty() ? null : (Serializable) list; 444 } 445 } 446 447 /* synchronized */ 448 @Override 449 public synchronized Lock getLock(String id) { 450 State state = states.get(id); 451 if (state == null) { 452 // document not found 453 throw new DocumentNotFoundException(id); 454 } 455 String owner = (String) state.get(KEY_LOCK_OWNER); 456 if (owner == null) { 457 return null; 458 } 459 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 460 return new Lock(owner, created); 461 } 462 463 /* synchronized */ 464 @Override 465 public synchronized Lock setLock(String id, Lock lock) { 466 State state = states.get(id); 467 if (state == null) { 468 // document not found 469 throw new DocumentNotFoundException(id); 470 } 471 String owner = (String) state.get(KEY_LOCK_OWNER); 472 if (owner != null) { 473 // return old lock 474 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 475 return new Lock(owner, created); 476 } 477 state.put(KEY_LOCK_OWNER, lock.getOwner()); 478 state.put(KEY_LOCK_CREATED, lock.getCreated()); 479 return null; 480 } 481 482 /* synchronized */ 483 @Override 484 public synchronized Lock removeLock(String id, String owner) { 485 State state = states.get(id); 486 if (state == null) { 487 // document not found 488 throw new DocumentNotFoundException(id); 489 } 490 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 491 if (oldOwner == null) { 492 // no previous lock 493 return null; 494 } 495 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 496 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 497 // existing mismatched lock, flag failure 498 return new Lock(oldOwner, oldCreated, true); 499 } 500 // remove lock 501 state.put(KEY_LOCK_OWNER, null); 502 state.put(KEY_LOCK_CREATED, null); 503 // return old lock 504 return new Lock(oldOwner, oldCreated); 505 } 506 507 @Override 508 public void closeLockManager() { 509 } 510 511 @Override 512 public void clearLockManagerCaches() { 513 } 514 515 protected List<List<String>> binaryPaths; 516 517 @Override 518 protected void initBlobsPaths() { 519 MemBlobFinder finder = new MemBlobFinder(); 520 finder.visit(); 521 binaryPaths = finder.binaryPaths; 522 } 523 524 protected static class MemBlobFinder extends BlobFinder { 525 protected List<List<String>> binaryPaths = new ArrayList<>(); 526 527 @Override 528 protected void recordBlobPath() { 529 binaryPaths.add(new ArrayList<>(path)); 530 } 531 } 532 533 @Override 534 public void markReferencedBinaries() { 535 BlobManager blobManager = Framework.getService(BlobManager.class); 536 for (State state : states.values()) { 537 for (List<String> path : binaryPaths) { 538 markReferencedBinaries(state, path, 0, blobManager); 539 } 540 } 541 } 542 543 protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) { 544 for (int i = start; i < path.size(); i++) { 545 String name = path.get(i); 546 Serializable value = state.get(name); 547 if (value instanceof State) { 548 state = (State) value; 549 } else { 550 if (value instanceof List) { 551 @SuppressWarnings("unchecked") 552 List<Object> list = (List<Object>) value; 553 for (Object v : list) { 554 if (v instanceof State) { 555 markReferencedBinaries((State) v, path, i + 1, blobManager); 556 } else { 557 markReferencedBinary(v, blobManager); 558 } 559 } 560 } 561 state = null; 562 break; 563 } 564 } 565 if (state != null) { 566 Serializable data = state.get(KEY_BLOB_DATA); 567 markReferencedBinary(data, blobManager); 568 } 569 } 570 571 protected void markReferencedBinary(Object value, BlobManager blobManager) { 572 if (!(value instanceof String)) { 573 return; 574 } 575 String key = (String) value; 576 blobManager.markReferencedBinary(key, repositoryName); 577 } 578 579}