001/* 002 * Copyright (c) 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 */ 012package org.nuxeo.ecm.core.storage.mem; 013 014import static java.lang.Boolean.TRUE; 015import static org.nuxeo.ecm.core.storage.State.NOP; 016import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 017import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 018import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 019import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 020import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 025 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Calendar; 030import java.util.Collections; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.UUID; 035import java.util.Map.Entry; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.atomic.AtomicLong; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 043import org.nuxeo.ecm.core.api.DocumentNotFoundException; 044import org.nuxeo.ecm.core.api.Lock; 045import org.nuxeo.ecm.core.api.NuxeoException; 046import org.nuxeo.ecm.core.api.PartialList; 047import org.nuxeo.ecm.core.api.model.Delta; 048import org.nuxeo.ecm.core.blob.BlobManager; 049import org.nuxeo.ecm.core.model.LockManager; 050import org.nuxeo.ecm.core.model.Repository; 051import org.nuxeo.ecm.core.query.sql.model.Expression; 052import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 053import org.nuxeo.ecm.core.query.sql.model.SelectClause; 054import org.nuxeo.ecm.core.storage.State.ListDiff; 055import org.nuxeo.ecm.core.storage.State.StateDiff; 056import org.nuxeo.ecm.core.storage.StateHelper; 057import org.nuxeo.ecm.core.storage.State; 058import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 059import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 060import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator.OrderByComparator; 061import org.nuxeo.runtime.api.Framework; 062 063import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 064 065/** 066 * In-memory implementation of a {@link Repository}. 067 * <p> 068 * Internally, the repository is a map from id to document object. 069 * <p> 070 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 071 * for the description of the document. 072 * 073 * @since 5.9.4 074 */ 075public class MemRepository extends DBSRepositoryBase { 076 077 private static final Log log = LogFactory.getLog(MemRepository.class); 078 079 // for debug 080 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 081 082 /** 083 * The content of the repository, a map of document id -> object. 084 */ 085 protected Map<String, State> states; 086 087 public MemRepository(String repositoryName) { 088 super(repositoryName, false); 089 initRepository(); 090 } 091 092 @Override 093 public void shutdown() { 094 super.shutdown(); 095 states = null; 096 } 097 098 protected void initRepository() { 099 states = new ConcurrentHashMap<>(); 100 initRoot(); 101 } 102 103 @Override 104 public String generateNewId() { 105 if (DEBUG_UUIDS) { 106 return "UUID_" + temporaryIdCounter.incrementAndGet(); 107 } else { 108 return UUID.randomUUID().toString(); 109 } 110 } 111 112 @Override 113 public State readState(String id) { 114 State state = states.get(id); 115 if (state != null) { 116 if (log.isTraceEnabled()) { 117 log.trace("read " + id + ": " + state); 118 } 119 } 120 return state; 121 } 122 123 @Override 124 public List<State> readStates(List<String> ids) { 125 List<State> list = new ArrayList<>(); 126 for (String id : ids) { 127 list.add(readState(id)); 128 } 129 return list; 130 } 131 132 @Override 133 public void createState(State state) { 134 String id = (String) state.get(KEY_ID); 135 if (log.isTraceEnabled()) { 136 log.trace("create " + id + ": " + state); 137 } 138 if (states.containsKey(id)) { 139 throw new NuxeoException("Already exists: " + id); 140 } 141 state = StateHelper.deepCopy(state, true); // thread-safe 142 StateHelper.resetDeltas(state); 143 states.put(id, state); 144 } 145 146 @Override 147 public void updateState(String id, StateDiff diff) { 148 if (log.isTraceEnabled()) { 149 log.trace("update " + id + ": " + diff); 150 } 151 State state = states.get(id); 152 if (state == null) { 153 throw new ConcurrentUpdateException("Missing: " + id); 154 } 155 applyDiff(state, diff); 156 } 157 158 @Override 159 public void deleteStates(Set<String> ids) { 160 if (log.isTraceEnabled()) { 161 log.trace("delete " + ids); 162 } 163 for (String id : ids) { 164 if (states.remove(id) == null) { 165 log.debug("Missing on remove: " + id); 166 } 167 } 168 } 169 170 @Override 171 public State readChildState(String parentId, String name, Set<String> ignored) { 172 // TODO optimize by maintaining a parent/child index 173 for (State state : states.values()) { 174 if (ignored.contains(state.get(KEY_ID))) { 175 continue; 176 } 177 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 178 continue; 179 } 180 if (!name.equals(state.get(KEY_NAME))) { 181 continue; 182 } 183 return state; 184 } 185 return null; 186 } 187 188 @Override 189 public boolean hasChild(String parentId, String name, Set<String> ignored) { 190 return readChildState(parentId, name, ignored) != null; 191 } 192 193 @Override 194 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 195 List<State> list = new ArrayList<>(); 196 for (State state : states.values()) { 197 String id = (String) state.get(KEY_ID); 198 if (ignored.contains(id)) { 199 continue; 200 } 201 if (!value.equals(state.get(key))) { 202 continue; 203 } 204 list.add(state); 205 } 206 return list; 207 } 208 209 @Override 210 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 211 List<State> list = new ArrayList<>(); 212 for (State state : states.values()) { 213 String id = (String) state.get(KEY_ID); 214 if (ignored.contains(id)) { 215 continue; 216 } 217 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 218 continue; 219 } 220 list.add(state); 221 } 222 return list; 223 } 224 225 @Override 226 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 227 Map<String, Object[]> targetProxies) { 228 STATE: for (State state : states.values()) { 229 Object[] array = (Object[]) state.get(key); 230 String id = (String) state.get(KEY_ID); 231 if (array != null) { 232 for (Object v : array) { 233 if (value.equals(v)) { 234 ids.add(id); 235 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 236 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 237 proxyTargets.put(id, targetId); 238 } 239 if (targetProxies != null) { 240 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 241 if (proxyIds != null) { 242 targetProxies.put(id, proxyIds); 243 } 244 } 245 continue STATE; 246 } 247 } 248 } 249 } 250 } 251 252 @Override 253 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 254 for (State state : states.values()) { 255 String id = (String) state.get(KEY_ID); 256 if (ignored.contains(id)) { 257 continue; 258 } 259 if (value.equals(state.get(key))) { 260 return true; 261 } 262 } 263 return false; 264 } 265 266 @Override 267 public PartialList<State> queryAndFetch(Expression expression, SelectClause selectClause, OrderByClause orderByClause, 268 int limit, int offset, int countUpTo, DBSExpressionEvaluator evaluator, boolean deepCopy) { 269 // TODO projection 270 List<State> maps = new ArrayList<>(); 271 for (State state : states.values()) { 272 if (evaluator.matches(state)) { 273 if (deepCopy) { 274 state = StateHelper.deepCopy(state); 275 } 276 maps.add(state); 277 } 278 } 279 // ORDER BY 280 if (orderByClause != null) { 281 Collections.sort(maps, new OrderByComparator(orderByClause, evaluator)); 282 } 283 // LIMIT / OFFSET 284 int totalSize = maps.size(); 285 if (countUpTo == -1) { 286 // count full size 287 } else if (countUpTo == 0) { 288 // no count 289 totalSize = -1; // not counted 290 } else { 291 // count only if less than countUpTo 292 if (totalSize > countUpTo) { 293 totalSize = -2; // truncated 294 } 295 } 296 if (limit != 0) { 297 int size = maps.size(); 298 maps.subList(0, offset > size ? size : offset).clear(); 299 size = maps.size(); 300 if (limit < size) { 301 maps.subList(limit, size).clear(); 302 } 303 } 304 // TODO DISTINCT 305 306 return new PartialList<>(maps, totalSize); 307 } 308 309 /** 310 * Applies a {@link StateDiff} in-place onto a base {@link State}. 311 * <p> 312 * Uses thread-safe datastructures. 313 */ 314 public static void applyDiff(State state, StateDiff stateDiff) { 315 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 316 String key = en.getKey(); 317 Serializable diffElem = en.getValue(); 318 if (diffElem instanceof StateDiff) { 319 Serializable old = state.get(key); 320 if (old == null) { 321 old = new State(true); // thread-safe 322 state.put(key, old); 323 // enter the next if 324 } 325 if (!(old instanceof State)) { 326 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 327 } 328 applyDiff((State) old, (StateDiff) diffElem); 329 } else if (diffElem instanceof ListDiff) { 330 state.put(key, applyDiff(state.get(key), (ListDiff) diffElem)); 331 } else if (diffElem instanceof Delta) { 332 Delta delta = (Delta) diffElem; 333 Number oldValue = (Number) state.get(key); 334 Number value; 335 if (oldValue == null) { 336 value = delta.getFullValue(); 337 } else { 338 value = delta.add(oldValue); 339 } 340 state.put(key, value); 341 } else { 342 state.put(key, diffElem); 343 } 344 } 345 } 346 347 /** 348 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 349 * <p> 350 * Uses thread-safe datastructures. 351 */ 352 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 353 // internally work on a list 354 // TODO this is costly, use a separate code path for arrays 355 if (listDiff.isArray && value != null) { 356 if (!(value instanceof Object[])) { 357 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 358 } 359 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 360 } 361 if (value == null) { 362 value = new CopyOnWriteArrayList<>(); 363 } 364 if (!(value instanceof List)) { 365 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 366 } 367 @SuppressWarnings("unchecked") 368 List<Serializable> list = (List<Serializable>) value; 369 if (listDiff.diff != null) { 370 int i = 0; 371 for (Object diffElem : listDiff.diff) { 372 if (i >= list.size()) { 373 // TODO log error applying diff to shorter list 374 break; 375 } 376 if (diffElem instanceof StateDiff) { 377 applyDiff((State) list.get(i), (StateDiff) diffElem); 378 } else if (diffElem != NOP) { 379 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 380 } 381 i++; 382 } 383 } 384 if (listDiff.rpush != null) { 385 // deepCopy of what we'll add 386 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 387 for (Object v : listDiff.rpush) { 388 add.add(StateHelper.deepCopy(v, true)); // thread-safe 389 } 390 // update CopyOnWriteArrayList in one step 391 list.addAll(add); 392 } 393 // convert back to array if needed 394 if (listDiff.isArray) { 395 return list.isEmpty() ? null : list.toArray(new Object[0]); 396 } else { 397 return list.isEmpty() ? null : (Serializable) list; 398 } 399 } 400 401 /* synchronized */ 402 @Override 403 public synchronized Lock getLock(String id) { 404 State state = states.get(id); 405 if (state == null) { 406 // document not found 407 throw new DocumentNotFoundException(id); 408 } 409 String owner = (String) state.get(KEY_LOCK_OWNER); 410 if (owner == null) { 411 return null; 412 } 413 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 414 return new Lock(owner, created); 415 } 416 417 /* synchronized */ 418 @Override 419 public synchronized Lock setLock(String id, Lock lock) { 420 State state = states.get(id); 421 if (state == null) { 422 // document not found 423 throw new DocumentNotFoundException(id); 424 } 425 String owner = (String) state.get(KEY_LOCK_OWNER); 426 if (owner != null) { 427 // return old lock 428 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 429 return new Lock(owner, created); 430 } 431 state.put(KEY_LOCK_OWNER, lock.getOwner()); 432 state.put(KEY_LOCK_CREATED, lock.getCreated()); 433 return null; 434 } 435 436 /* synchronized */ 437 @Override 438 public synchronized Lock removeLock(String id, String owner) { 439 State state = states.get(id); 440 if (state == null) { 441 // document not found 442 throw new DocumentNotFoundException(id); 443 } 444 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 445 if (oldOwner == null) { 446 // no previous lock 447 return null; 448 } 449 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 450 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 451 // existing mismatched lock, flag failure 452 return new Lock(oldOwner, oldCreated, true); 453 } 454 // remove lock 455 state.put(KEY_LOCK_OWNER, null); 456 state.put(KEY_LOCK_CREATED, null); 457 // return old lock 458 return new Lock(oldOwner, oldCreated); 459 } 460 461 @Override 462 public void closeLockManager() { 463 } 464 465 @Override 466 public void clearLockManagerCaches() { 467 } 468 469 protected List<List<String>> binaryPaths; 470 471 @Override 472 protected void initBlobsPaths() { 473 MemBlobFinder finder = new MemBlobFinder(); 474 finder.visit(); 475 binaryPaths = finder.binaryPaths; 476 } 477 478 protected static class MemBlobFinder extends BlobFinder { 479 protected List<List<String>> binaryPaths = new ArrayList<>(); 480 481 @Override 482 protected void recordBlobPath() { 483 binaryPaths.add(new ArrayList<>(path)); 484 } 485 } 486 487 @Override 488 public void markReferencedBinaries() { 489 BlobManager blobManager = Framework.getService(BlobManager.class); 490 for (State state : states.values()) { 491 for (List<String> path : binaryPaths) { 492 markReferencedBinaries(state, path, 0, blobManager); 493 } 494 } 495 } 496 497 protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) { 498 for (int i = start; i < path.size(); i++) { 499 String name = path.get(i); 500 Serializable value = state.get(name); 501 if (value instanceof State) { 502 state = (State) value; 503 } else { 504 if (value instanceof List) { 505 @SuppressWarnings("unchecked") 506 List<Object> list = (List<Object>) value; 507 for (Object v : list) { 508 if (v instanceof State) { 509 markReferencedBinaries((State) v, path, i + 1, blobManager); 510 } else { 511 markReferencedBinary(v, blobManager); 512 } 513 } 514 } 515 state = null; 516 break; 517 } 518 } 519 if (state != null) { 520 Serializable data = state.get(KEY_BLOB_DATA); 521 markReferencedBinary(data, blobManager); 522 } 523 } 524 525 protected void markReferencedBinary(Object value, BlobManager blobManager) { 526 if (!(value instanceof String)) { 527 return; 528 } 529 String key = (String) value; 530 blobManager.markReferencedBinary(key, repositoryName); 531 } 532 533}