001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 032 033import java.io.Serializable; 034import java.lang.reflect.Array; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Calendar; 038import java.util.Collections; 039import java.util.List; 040import java.util.Map; 041import java.util.Map.Entry; 042import java.util.Set; 043import java.util.UUID; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.atomic.AtomicLong; 047 048import javax.resource.spi.ConnectionManager; 049 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 053import org.nuxeo.ecm.core.api.DocumentNotFoundException; 054import org.nuxeo.ecm.core.api.Lock; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.ecm.core.api.PartialList; 057import org.nuxeo.ecm.core.api.model.Delta; 058import org.nuxeo.ecm.core.blob.BlobManager; 059import org.nuxeo.ecm.core.model.LockManager; 060import org.nuxeo.ecm.core.model.Repository; 061import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 062import org.nuxeo.ecm.core.storage.State; 063import org.nuxeo.ecm.core.storage.State.ListDiff; 064import org.nuxeo.ecm.core.storage.State.StateDiff; 065import org.nuxeo.ecm.core.storage.StateHelper; 066import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 067import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 068import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 069import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 070import org.nuxeo.runtime.api.Framework; 071 072/** 073 * In-memory implementation of a {@link Repository}. 074 * <p> 075 * Internally, the repository is a map from id to document object. 076 * <p> 077 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 078 * for the description of the document. 079 * 080 * @since 5.9.4 081 */ 082public class MemRepository extends DBSRepositoryBase { 083 084 private static final Log log = LogFactory.getLog(MemRepository.class); 085 086 // for debug 087 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 088 089 /** 090 * The content of the repository, a map of document id -> object. 091 */ 092 protected Map<String, State> states; 093 094 public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) { 095 super(cm, descriptor.name, descriptor.getFulltextDescriptor()); 096 initRepository(); 097 } 098 099 @Override 100 public void shutdown() { 101 super.shutdown(); 102 states = null; 103 } 104 105 protected void initRepository() { 106 states = new ConcurrentHashMap<>(); 107 initRoot(); 108 } 109 110 @Override 111 public String generateNewId() { 112 if (DEBUG_UUIDS) { 113 return "UUID_" + temporaryIdCounter.incrementAndGet(); 114 } else { 115 return UUID.randomUUID().toString(); 116 } 117 } 118 119 @Override 120 public State readState(String id) { 121 State state = states.get(id); 122 if (state != null) { 123 if (log.isTraceEnabled()) { 124 log.trace("Mem: READ " + id + ": " + state); 125 } 126 } 127 return state; 128 } 129 130 @Override 131 public List<State> readStates(List<String> ids) { 132 List<State> list = new ArrayList<>(); 133 for (String id : ids) { 134 list.add(readState(id)); 135 } 136 return list; 137 } 138 139 @Override 140 public void createState(State state) { 141 String id = (String) state.get(KEY_ID); 142 if (log.isTraceEnabled()) { 143 log.trace("Mem: CREATE " + id + ": " + state); 144 } 145 if (states.containsKey(id)) { 146 throw new NuxeoException("Already exists: " + id); 147 } 148 state = StateHelper.deepCopy(state, true); // thread-safe 149 StateHelper.resetDeltas(state); 150 states.put(id, state); 151 } 152 153 @Override 154 public void updateState(String id, StateDiff diff) { 155 if (log.isTraceEnabled()) { 156 log.trace("Mem: UPDATE " + id + ": " + diff); 157 } 158 State state = states.get(id); 159 if (state == null) { 160 throw new ConcurrentUpdateException("Missing: " + id); 161 } 162 applyDiff(state, diff); 163 } 164 165 @Override 166 public void deleteStates(Set<String> ids) { 167 if (log.isTraceEnabled()) { 168 log.trace("Mem: REMOVE " + ids); 169 } 170 for (String id : ids) { 171 if (states.remove(id) == null) { 172 log.debug("Missing on remove: " + id); 173 } 174 } 175 } 176 177 @Override 178 public State readChildState(String parentId, String name, Set<String> ignored) { 179 // TODO optimize by maintaining a parent/child index 180 for (State state : states.values()) { 181 if (ignored.contains(state.get(KEY_ID))) { 182 continue; 183 } 184 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 185 continue; 186 } 187 if (!name.equals(state.get(KEY_NAME))) { 188 continue; 189 } 190 return state; 191 } 192 return null; 193 } 194 195 @Override 196 public boolean hasChild(String parentId, String name, Set<String> ignored) { 197 return readChildState(parentId, name, ignored) != null; 198 } 199 200 @Override 201 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 202 if (log.isTraceEnabled()) { 203 log.trace("Mem: QUERY " + key + " = " + value); 204 } 205 List<State> list = new ArrayList<>(); 206 for (State state : states.values()) { 207 String id = (String) state.get(KEY_ID); 208 if (ignored.contains(id)) { 209 continue; 210 } 211 if (!value.equals(state.get(key))) { 212 continue; 213 } 214 list.add(state); 215 } 216 if (log.isTraceEnabled() && !list.isEmpty()) { 217 log.trace("Mem: -> " + list.size()); 218 } 219 return list; 220 } 221 222 @Override 223 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 224 if (log.isTraceEnabled()) { 225 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 226 } 227 List<State> list = new ArrayList<>(); 228 for (State state : states.values()) { 229 String id = (String) state.get(KEY_ID); 230 if (ignored.contains(id)) { 231 continue; 232 } 233 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 234 continue; 235 } 236 list.add(state); 237 } 238 if (log.isTraceEnabled() && !list.isEmpty()) { 239 log.trace("Mem: -> " + list.size()); 240 } 241 return list; 242 } 243 244 @Override 245 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 246 Map<String, Object[]> targetProxies) { 247 if (log.isTraceEnabled()) { 248 log.trace("Mem: QUERY " + key + " = " + value); 249 } 250 STATE: for (State state : states.values()) { 251 Object[] array = (Object[]) state.get(key); 252 String id = (String) state.get(KEY_ID); 253 if (array != null) { 254 for (Object v : array) { 255 if (value.equals(v)) { 256 ids.add(id); 257 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 258 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 259 proxyTargets.put(id, targetId); 260 } 261 if (targetProxies != null) { 262 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 263 if (proxyIds != null) { 264 targetProxies.put(id, proxyIds); 265 } 266 } 267 continue STATE; 268 } 269 } 270 } 271 } 272 if (log.isTraceEnabled() && !ids.isEmpty()) { 273 log.trace("Mem: -> " + ids.size()); 274 } 275 } 276 277 @Override 278 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 279 if (log.isTraceEnabled()) { 280 log.trace("Mem: QUERY " + key + " = " + value); 281 } 282 for (State state : states.values()) { 283 String id = (String) state.get(KEY_ID); 284 if (ignored.contains(id)) { 285 continue; 286 } 287 if (value.equals(state.get(key))) { 288 if (log.isTraceEnabled()) { 289 log.trace("Mem: -> present"); 290 } 291 return true; 292 } 293 } 294 if (log.isTraceEnabled()) { 295 log.trace("Mem: -> absent"); 296 } 297 return false; 298 } 299 300 @Override 301 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 302 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 303 if (log.isTraceEnabled()) { 304 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 305 } 306 evaluator.parse(); 307 List<Map<String, Serializable>> projections = new ArrayList<>(); 308 for (State state : states.values()) { 309 List<Map<String, Serializable>> matches = evaluator.matches(state); 310 if (!matches.isEmpty()) { 311 if (distinctDocuments) { 312 projections.add(matches.get(0)); 313 } else { 314 projections.addAll(matches); 315 } 316 } 317 } 318 // ORDER BY 319 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 320 if (orderByClause != null) { 321 Collections.sort(projections, new OrderByComparator(orderByClause)); 322 } 323 // LIMIT / OFFSET 324 int totalSize = projections.size(); 325 if (countUpTo == -1) { 326 // count full size 327 } else if (countUpTo == 0) { 328 // no count 329 totalSize = -1; // not counted 330 } else { 331 // count only if less than countUpTo 332 if (totalSize > countUpTo) { 333 totalSize = -2; // truncated 334 } 335 } 336 if (limit != 0) { 337 int size = projections.size(); 338 projections.subList(0, offset > size ? size : offset).clear(); 339 size = projections.size(); 340 if (limit < size) { 341 projections.subList(limit, size).clear(); 342 } 343 } 344 // TODO DISTINCT 345 346 if (log.isTraceEnabled() && !projections.isEmpty()) { 347 log.trace("Mem: -> " + projections.size()); 348 } 349 return new PartialList<>(projections, totalSize); 350 } 351 352 /** 353 * Applies a {@link StateDiff} in-place onto a base {@link State}. 354 * <p> 355 * Uses thread-safe datastructures. 356 */ 357 public static void applyDiff(State state, StateDiff stateDiff) { 358 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 359 String key = en.getKey(); 360 Serializable diffElem = en.getValue(); 361 if (diffElem instanceof StateDiff) { 362 Serializable old = state.get(key); 363 if (old == null) { 364 old = new State(true); // thread-safe 365 state.put(key, old); 366 // enter the next if 367 } 368 if (!(old instanceof State)) { 369 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 370 } 371 applyDiff((State) old, (StateDiff) diffElem); 372 } else if (diffElem instanceof ListDiff) { 373 state.put(key, applyDiff(state.get(key), (ListDiff) diffElem)); 374 } else if (diffElem instanceof Delta) { 375 Delta delta = (Delta) diffElem; 376 Number oldValue = (Number) state.get(key); 377 Number value; 378 if (oldValue == null) { 379 value = delta.getFullValue(); 380 } else { 381 value = delta.add(oldValue); 382 } 383 state.put(key, value); 384 } else { 385 state.put(key, StateHelper.deepCopy(diffElem, true)); // thread-safe 386 } 387 } 388 } 389 390 /** 391 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 392 * <p> 393 * Uses thread-safe datastructures. 394 */ 395 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 396 // internally work on a list 397 // TODO this is costly, use a separate code path for arrays 398 Class<?> arrayComponentType = null; 399 if (listDiff.isArray && value != null) { 400 if (!(value instanceof Object[])) { 401 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 402 } 403 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 404 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 405 } 406 if (value == null) { 407 value = new CopyOnWriteArrayList<>(); 408 } 409 if (!(value instanceof List)) { 410 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 411 } 412 @SuppressWarnings("unchecked") 413 List<Serializable> list = (List<Serializable>) value; 414 if (listDiff.diff != null) { 415 int i = 0; 416 for (Object diffElem : listDiff.diff) { 417 if (i >= list.size()) { 418 // TODO log error applying diff to shorter list 419 break; 420 } 421 if (diffElem instanceof StateDiff) { 422 applyDiff((State) list.get(i), (StateDiff) diffElem); 423 } else if (diffElem != NOP) { 424 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 425 } 426 i++; 427 } 428 } 429 if (listDiff.rpush != null) { 430 // deepCopy of what we'll add 431 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 432 for (Object v : listDiff.rpush) { 433 add.add(StateHelper.deepCopy(v, true)); // thread-safe 434 } 435 // update CopyOnWriteArrayList in one step 436 list.addAll(add); 437 } 438 // convert back to array if needed 439 if (listDiff.isArray) { 440 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 441 } else { 442 return list.isEmpty() ? null : (Serializable) list; 443 } 444 } 445 446 /* synchronized */ 447 @Override 448 public synchronized Lock getLock(String id) { 449 State state = states.get(id); 450 if (state == null) { 451 // document not found 452 throw new DocumentNotFoundException(id); 453 } 454 String owner = (String) state.get(KEY_LOCK_OWNER); 455 if (owner == null) { 456 return null; 457 } 458 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 459 return new Lock(owner, created); 460 } 461 462 /* synchronized */ 463 @Override 464 public synchronized Lock setLock(String id, Lock lock) { 465 State state = states.get(id); 466 if (state == null) { 467 // document not found 468 throw new DocumentNotFoundException(id); 469 } 470 String owner = (String) state.get(KEY_LOCK_OWNER); 471 if (owner != null) { 472 // return old lock 473 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 474 return new Lock(owner, created); 475 } 476 state.put(KEY_LOCK_OWNER, lock.getOwner()); 477 state.put(KEY_LOCK_CREATED, lock.getCreated()); 478 return null; 479 } 480 481 /* synchronized */ 482 @Override 483 public synchronized Lock removeLock(String id, String owner) { 484 State state = states.get(id); 485 if (state == null) { 486 // document not found 487 throw new DocumentNotFoundException(id); 488 } 489 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 490 if (oldOwner == null) { 491 // no previous lock 492 return null; 493 } 494 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 495 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 496 // existing mismatched lock, flag failure 497 return new Lock(oldOwner, oldCreated, true); 498 } 499 // remove lock 500 state.put(KEY_LOCK_OWNER, null); 501 state.put(KEY_LOCK_CREATED, null); 502 // return old lock 503 return new Lock(oldOwner, oldCreated); 504 } 505 506 @Override 507 public void closeLockManager() { 508 } 509 510 @Override 511 public void clearLockManagerCaches() { 512 } 513 514 protected List<List<String>> binaryPaths; 515 516 @Override 517 protected void initBlobsPaths() { 518 MemBlobFinder finder = new MemBlobFinder(); 519 finder.visit(); 520 binaryPaths = finder.binaryPaths; 521 } 522 523 protected static class MemBlobFinder extends BlobFinder { 524 protected List<List<String>> binaryPaths = new ArrayList<>(); 525 526 @Override 527 protected void recordBlobPath() { 528 binaryPaths.add(new ArrayList<>(path)); 529 } 530 } 531 532 @Override 533 public void markReferencedBinaries() { 534 BlobManager blobManager = Framework.getService(BlobManager.class); 535 for (State state : states.values()) { 536 for (List<String> path : binaryPaths) { 537 markReferencedBinaries(state, path, 0, blobManager); 538 } 539 } 540 } 541 542 protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) { 543 for (int i = start; i < path.size(); i++) { 544 String name = path.get(i); 545 Serializable value = state.get(name); 546 if (value instanceof State) { 547 state = (State) value; 548 } else { 549 if (value instanceof List) { 550 @SuppressWarnings("unchecked") 551 List<Object> list = (List<Object>) value; 552 for (Object v : list) { 553 if (v instanceof State) { 554 markReferencedBinaries((State) v, path, i + 1, blobManager); 555 } else { 556 markReferencedBinary(v, blobManager); 557 } 558 } 559 } 560 state = null; 561 break; 562 } 563 } 564 if (state != null) { 565 Serializable data = state.get(KEY_BLOB_DATA); 566 markReferencedBinary(data, blobManager); 567 } 568 } 569 570 protected void markReferencedBinary(Object value, BlobManager blobManager) { 571 if (!(value instanceof String)) { 572 return; 573 } 574 String key = (String) value; 575 blobManager.markReferencedBinary(key, repositoryName); 576 } 577 578}