001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 032 033import java.io.Serializable; 034import java.lang.reflect.Array; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Calendar; 038import java.util.Collections; 039import java.util.List; 040import java.util.Map; 041import java.util.Map.Entry; 042import java.util.Set; 043import java.util.UUID; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.atomic.AtomicLong; 047 048import org.apache.commons.logging.Log; 049import org.apache.commons.logging.LogFactory; 050import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 051import org.nuxeo.ecm.core.api.DocumentNotFoundException; 052import org.nuxeo.ecm.core.api.Lock; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.api.PartialList; 055import org.nuxeo.ecm.core.api.model.Delta; 056import org.nuxeo.ecm.core.blob.BlobManager; 057import org.nuxeo.ecm.core.model.LockManager; 058import org.nuxeo.ecm.core.model.Repository; 059import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 060import org.nuxeo.ecm.core.storage.State; 061import org.nuxeo.ecm.core.storage.State.ListDiff; 062import org.nuxeo.ecm.core.storage.State.StateDiff; 063import org.nuxeo.ecm.core.storage.StateHelper; 064import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 065import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 066import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 067import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 068import org.nuxeo.runtime.api.Framework; 069 070/** 071 * In-memory implementation of a {@link Repository}. 072 * <p> 073 * Internally, the repository is a map from id to document object. 074 * <p> 075 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 076 * for the description of the document. 077 * 078 * @since 5.9.4 079 */ 080public class MemRepository extends DBSRepositoryBase { 081 082 private static final Log log = LogFactory.getLog(MemRepository.class); 083 084 // for debug 085 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 086 087 /** 088 * The content of the repository, a map of document id -> object. 089 */ 090 protected Map<String, State> states; 091 092 public MemRepository(MemRepositoryDescriptor descriptor) { 093 super(descriptor.name, descriptor.getFulltextDescriptor()); 094 initRepository(); 095 } 096 097 @Override 098 public void shutdown() { 099 super.shutdown(); 100 states = null; 101 } 102 103 protected void initRepository() { 104 states = new ConcurrentHashMap<>(); 105 initRoot(); 106 } 107 108 @Override 109 public String generateNewId() { 110 if (DEBUG_UUIDS) { 111 return "UUID_" + temporaryIdCounter.incrementAndGet(); 112 } else { 113 return UUID.randomUUID().toString(); 114 } 115 } 116 117 @Override 118 public State readState(String id) { 119 State state = states.get(id); 120 if (state != null) { 121 if (log.isTraceEnabled()) { 122 log.trace("Mem: READ " + id + ": " + state); 123 } 124 } 125 return state; 126 } 127 128 @Override 129 public List<State> readStates(List<String> ids) { 130 List<State> list = new ArrayList<>(); 131 for (String id : ids) { 132 list.add(readState(id)); 133 } 134 return list; 135 } 136 137 @Override 138 public void createState(State state) { 139 String id = (String) state.get(KEY_ID); 140 if (log.isTraceEnabled()) { 141 log.trace("Mem: CREATE " + id + ": " + state); 142 } 143 if (states.containsKey(id)) { 144 throw new NuxeoException("Already exists: " + id); 145 } 146 state = StateHelper.deepCopy(state, true); // thread-safe 147 StateHelper.resetDeltas(state); 148 states.put(id, state); 149 } 150 151 @Override 152 public void updateState(String id, StateDiff diff) { 153 if (log.isTraceEnabled()) { 154 log.trace("Mem: UPDATE " + id + ": " + diff); 155 } 156 State state = states.get(id); 157 if (state == null) { 158 throw new ConcurrentUpdateException("Missing: " + id); 159 } 160 applyDiff(state, diff); 161 } 162 163 @Override 164 public void deleteStates(Set<String> ids) { 165 if (log.isTraceEnabled()) { 166 log.trace("Mem: REMOVE " + ids); 167 } 168 for (String id : ids) { 169 if (states.remove(id) == null) { 170 log.debug("Missing on remove: " + id); 171 } 172 } 173 } 174 175 @Override 176 public State readChildState(String parentId, String name, Set<String> ignored) { 177 // TODO optimize by maintaining a parent/child index 178 for (State state : states.values()) { 179 if (ignored.contains(state.get(KEY_ID))) { 180 continue; 181 } 182 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 183 continue; 184 } 185 if (!name.equals(state.get(KEY_NAME))) { 186 continue; 187 } 188 return state; 189 } 190 return null; 191 } 192 193 @Override 194 public boolean hasChild(String parentId, String name, Set<String> ignored) { 195 return readChildState(parentId, name, ignored) != null; 196 } 197 198 @Override 199 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 200 if (log.isTraceEnabled()) { 201 log.trace("Mem: QUERY " + key + " = " + value); 202 } 203 List<State> list = new ArrayList<>(); 204 for (State state : states.values()) { 205 String id = (String) state.get(KEY_ID); 206 if (ignored.contains(id)) { 207 continue; 208 } 209 if (!value.equals(state.get(key))) { 210 continue; 211 } 212 list.add(state); 213 } 214 if (log.isTraceEnabled() && !list.isEmpty()) { 215 log.trace("Mem: -> " + list.size()); 216 } 217 return list; 218 } 219 220 @Override 221 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 222 if (log.isTraceEnabled()) { 223 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 224 } 225 List<State> list = new ArrayList<>(); 226 for (State state : states.values()) { 227 String id = (String) state.get(KEY_ID); 228 if (ignored.contains(id)) { 229 continue; 230 } 231 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 232 continue; 233 } 234 list.add(state); 235 } 236 if (log.isTraceEnabled() && !list.isEmpty()) { 237 log.trace("Mem: -> " + list.size()); 238 } 239 return list; 240 } 241 242 @Override 243 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 244 Map<String, Object[]> targetProxies) { 245 if (log.isTraceEnabled()) { 246 log.trace("Mem: QUERY " + key + " = " + value); 247 } 248 STATE: for (State state : states.values()) { 249 Object[] array = (Object[]) state.get(key); 250 String id = (String) state.get(KEY_ID); 251 if (array != null) { 252 for (Object v : array) { 253 if (value.equals(v)) { 254 ids.add(id); 255 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 256 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 257 proxyTargets.put(id, targetId); 258 } 259 if (targetProxies != null) { 260 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 261 if (proxyIds != null) { 262 targetProxies.put(id, proxyIds); 263 } 264 } 265 continue STATE; 266 } 267 } 268 } 269 } 270 if (log.isTraceEnabled() && !ids.isEmpty()) { 271 log.trace("Mem: -> " + ids.size()); 272 } 273 } 274 275 @Override 276 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 277 if (log.isTraceEnabled()) { 278 log.trace("Mem: QUERY " + key + " = " + value); 279 } 280 for (State state : states.values()) { 281 String id = (String) state.get(KEY_ID); 282 if (ignored.contains(id)) { 283 continue; 284 } 285 if (value.equals(state.get(key))) { 286 if (log.isTraceEnabled()) { 287 log.trace("Mem: -> present"); 288 } 289 return true; 290 } 291 } 292 if (log.isTraceEnabled()) { 293 log.trace("Mem: -> absent"); 294 } 295 return false; 296 } 297 298 @Override 299 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 300 OrderByClause orderByClause, boolean selectDocuments, int limit, int offset, int countUpTo) { 301 if (log.isTraceEnabled()) { 302 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 303 } 304 evaluator.parse(); 305 List<Map<String, Serializable>> projections = new ArrayList<>(); 306 for (State state : states.values()) { 307 List<Map<String, Serializable>> matches = evaluator.matches(state); 308 if (!matches.isEmpty()) { 309 if (selectDocuments) { 310 projections.add(matches.get(0)); 311 } else { 312 projections.addAll(matches); 313 } 314 } 315 } 316 // ORDER BY 317 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 318 if (orderByClause != null) { 319 Collections.sort(projections, new OrderByComparator(orderByClause)); 320 } 321 // LIMIT / OFFSET 322 int totalSize = projections.size(); 323 if (countUpTo == -1) { 324 // count full size 325 } else if (countUpTo == 0) { 326 // no count 327 totalSize = -1; // not counted 328 } else { 329 // count only if less than countUpTo 330 if (totalSize > countUpTo) { 331 totalSize = -2; // truncated 332 } 333 } 334 if (limit != 0) { 335 int size = projections.size(); 336 projections.subList(0, offset > size ? size : offset).clear(); 337 size = projections.size(); 338 if (limit < size) { 339 projections.subList(limit, size).clear(); 340 } 341 } 342 // TODO DISTINCT 343 344 if (log.isTraceEnabled() && !projections.isEmpty()) { 345 log.trace("Mem: -> " + projections.size()); 346 } 347 return new PartialList<>(projections, totalSize); 348 } 349 350 /** 351 * Applies a {@link StateDiff} in-place onto a base {@link State}. 352 * <p> 353 * Uses thread-safe datastructures. 354 */ 355 public static void applyDiff(State state, StateDiff stateDiff) { 356 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 357 String key = en.getKey(); 358 Serializable diffElem = en.getValue(); 359 if (diffElem instanceof StateDiff) { 360 Serializable old = state.get(key); 361 if (old == null) { 362 old = new State(true); // thread-safe 363 state.put(key, old); 364 // enter the next if 365 } 366 if (!(old instanceof State)) { 367 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 368 } 369 applyDiff((State) old, (StateDiff) diffElem); 370 } else if (diffElem instanceof ListDiff) { 371 state.put(key, applyDiff(state.get(key), (ListDiff) diffElem)); 372 } else if (diffElem instanceof Delta) { 373 Delta delta = (Delta) diffElem; 374 Number oldValue = (Number) state.get(key); 375 Number value; 376 if (oldValue == null) { 377 value = delta.getFullValue(); 378 } else { 379 value = delta.add(oldValue); 380 } 381 state.put(key, value); 382 } else { 383 state.put(key, diffElem); 384 } 385 } 386 } 387 388 /** 389 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 390 * <p> 391 * Uses thread-safe datastructures. 392 */ 393 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 394 // internally work on a list 395 // TODO this is costly, use a separate code path for arrays 396 Class<?> arrayComponentType = null; 397 if (listDiff.isArray && value != null) { 398 if (!(value instanceof Object[])) { 399 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 400 } 401 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 402 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 403 } 404 if (value == null) { 405 value = new CopyOnWriteArrayList<>(); 406 } 407 if (!(value instanceof List)) { 408 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 409 } 410 @SuppressWarnings("unchecked") 411 List<Serializable> list = (List<Serializable>) value; 412 if (listDiff.diff != null) { 413 int i = 0; 414 for (Object diffElem : listDiff.diff) { 415 if (i >= list.size()) { 416 // TODO log error applying diff to shorter list 417 break; 418 } 419 if (diffElem instanceof StateDiff) { 420 applyDiff((State) list.get(i), (StateDiff) diffElem); 421 } else if (diffElem != NOP) { 422 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 423 } 424 i++; 425 } 426 } 427 if (listDiff.rpush != null) { 428 // deepCopy of what we'll add 429 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 430 for (Object v : listDiff.rpush) { 431 add.add(StateHelper.deepCopy(v, true)); // thread-safe 432 } 433 // update CopyOnWriteArrayList in one step 434 list.addAll(add); 435 } 436 // convert back to array if needed 437 if (listDiff.isArray) { 438 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 439 } else { 440 return list.isEmpty() ? null : (Serializable) list; 441 } 442 } 443 444 /* synchronized */ 445 @Override 446 public synchronized Lock getLock(String id) { 447 State state = states.get(id); 448 if (state == null) { 449 // document not found 450 throw new DocumentNotFoundException(id); 451 } 452 String owner = (String) state.get(KEY_LOCK_OWNER); 453 if (owner == null) { 454 return null; 455 } 456 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 457 return new Lock(owner, created); 458 } 459 460 /* synchronized */ 461 @Override 462 public synchronized Lock setLock(String id, Lock lock) { 463 State state = states.get(id); 464 if (state == null) { 465 // document not found 466 throw new DocumentNotFoundException(id); 467 } 468 String owner = (String) state.get(KEY_LOCK_OWNER); 469 if (owner != null) { 470 // return old lock 471 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 472 return new Lock(owner, created); 473 } 474 state.put(KEY_LOCK_OWNER, lock.getOwner()); 475 state.put(KEY_LOCK_CREATED, lock.getCreated()); 476 return null; 477 } 478 479 /* synchronized */ 480 @Override 481 public synchronized Lock removeLock(String id, String owner) { 482 State state = states.get(id); 483 if (state == null) { 484 // document not found 485 throw new DocumentNotFoundException(id); 486 } 487 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 488 if (oldOwner == null) { 489 // no previous lock 490 return null; 491 } 492 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 493 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 494 // existing mismatched lock, flag failure 495 return new Lock(oldOwner, oldCreated, true); 496 } 497 // remove lock 498 state.put(KEY_LOCK_OWNER, null); 499 state.put(KEY_LOCK_CREATED, null); 500 // return old lock 501 return new Lock(oldOwner, oldCreated); 502 } 503 504 @Override 505 public void closeLockManager() { 506 } 507 508 @Override 509 public void clearLockManagerCaches() { 510 } 511 512 protected List<List<String>> binaryPaths; 513 514 @Override 515 protected void initBlobsPaths() { 516 MemBlobFinder finder = new MemBlobFinder(); 517 finder.visit(); 518 binaryPaths = finder.binaryPaths; 519 } 520 521 protected static class MemBlobFinder extends BlobFinder { 522 protected List<List<String>> binaryPaths = new ArrayList<>(); 523 524 @Override 525 protected void recordBlobPath() { 526 binaryPaths.add(new ArrayList<>(path)); 527 } 528 } 529 530 @Override 531 public void markReferencedBinaries() { 532 BlobManager blobManager = Framework.getService(BlobManager.class); 533 for (State state : states.values()) { 534 for (List<String> path : binaryPaths) { 535 markReferencedBinaries(state, path, 0, blobManager); 536 } 537 } 538 } 539 540 protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) { 541 for (int i = start; i < path.size(); i++) { 542 String name = path.get(i); 543 Serializable value = state.get(name); 544 if (value instanceof State) { 545 state = (State) value; 546 } else { 547 if (value instanceof List) { 548 @SuppressWarnings("unchecked") 549 List<Object> list = (List<Object>) value; 550 for (Object v : list) { 551 if (v instanceof State) { 552 markReferencedBinaries((State) v, path, i + 1, blobManager); 553 } else { 554 markReferencedBinary(v, blobManager); 555 } 556 } 557 } 558 state = null; 559 break; 560 } 561 } 562 if (state != null) { 563 Serializable data = state.get(KEY_BLOB_DATA); 564 markReferencedBinary(data, blobManager); 565 } 566 } 567 568 protected void markReferencedBinary(Object value, BlobManager blobManager) { 569 if (!(value instanceof String)) { 570 return; 571 } 572 String key = (String) value; 573 blobManager.markReferencedBinary(key, repositoryName); 574 } 575 576}