001/* 002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static java.lang.Boolean.TRUE; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS; 031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID; 032 033import java.io.Serializable; 034import java.lang.reflect.Array; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Calendar; 038import java.util.Collections; 039import java.util.List; 040import java.util.Map; 041import java.util.Map.Entry; 042import java.util.Set; 043import java.util.UUID; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.atomic.AtomicLong; 047 048import javax.resource.spi.ConnectionManager; 049 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 053import org.nuxeo.ecm.core.api.DocumentNotFoundException; 054import org.nuxeo.ecm.core.api.Lock; 055import org.nuxeo.ecm.core.api.NuxeoException; 056import org.nuxeo.ecm.core.api.PartialList; 057import org.nuxeo.ecm.core.api.model.Delta; 058import org.nuxeo.ecm.core.blob.BlobManager; 059import org.nuxeo.ecm.core.model.LockManager; 060import org.nuxeo.ecm.core.model.Repository; 061import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 062import org.nuxeo.ecm.core.storage.State; 063import org.nuxeo.ecm.core.storage.State.ListDiff; 064import org.nuxeo.ecm.core.storage.State.StateDiff; 065import org.nuxeo.ecm.core.storage.StateHelper; 066import org.nuxeo.ecm.core.storage.dbs.DBSDocument; 067import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 068import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase; 069import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase.IdType; 070import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 071import org.nuxeo.runtime.api.Framework; 072 073/** 074 * In-memory implementation of a {@link Repository}. 075 * <p> 076 * Internally, the repository is a map from id to document object. 077 * <p> 078 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument} 079 * for the description of the document. 080 * 081 * @since 5.9.4 082 */ 083public class MemRepository extends DBSRepositoryBase { 084 085 private static final Log log = LogFactory.getLog(MemRepository.class); 086 087 // for debug 088 private final AtomicLong temporaryIdCounter = new AtomicLong(0); 089 090 /** 091 * The content of the repository, a map of document id -> object. 092 */ 093 protected Map<String, State> states; 094 095 public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) { 096 super(cm, descriptor.name, descriptor); 097 initRepository(); 098 } 099 100 @Override 101 public List<IdType> getAllowedIdTypes() { 102 return Collections.singletonList(IdType.varchar); 103 } 104 105 @Override 106 public void shutdown() { 107 super.shutdown(); 108 states = null; 109 } 110 111 protected void initRepository() { 112 states = new ConcurrentHashMap<>(); 113 initRoot(); 114 } 115 116 @Override 117 public String generateNewId() { 118 if (DEBUG_UUIDS) { 119 return "UUID_" + temporaryIdCounter.incrementAndGet(); 120 } else { 121 return UUID.randomUUID().toString(); 122 } 123 } 124 125 @Override 126 public State readState(String id) { 127 State state = states.get(id); 128 if (state != null) { 129 if (log.isTraceEnabled()) { 130 log.trace("Mem: READ " + id + ": " + state); 131 } 132 } 133 return state; 134 } 135 136 @Override 137 public List<State> readStates(List<String> ids) { 138 List<State> list = new ArrayList<>(); 139 for (String id : ids) { 140 list.add(readState(id)); 141 } 142 return list; 143 } 144 145 @Override 146 public void createState(State state) { 147 String id = (String) state.get(KEY_ID); 148 if (log.isTraceEnabled()) { 149 log.trace("Mem: CREATE " + id + ": " + state); 150 } 151 if (states.containsKey(id)) { 152 throw new NuxeoException("Already exists: " + id); 153 } 154 state = StateHelper.deepCopy(state, true); // thread-safe 155 StateHelper.resetDeltas(state); 156 states.put(id, state); 157 } 158 159 @Override 160 public void updateState(String id, StateDiff diff) { 161 if (log.isTraceEnabled()) { 162 log.trace("Mem: UPDATE " + id + ": " + diff); 163 } 164 State state = states.get(id); 165 if (state == null) { 166 throw new ConcurrentUpdateException("Missing: " + id); 167 } 168 applyDiff(state, diff); 169 } 170 171 @Override 172 public void deleteStates(Set<String> ids) { 173 if (log.isTraceEnabled()) { 174 log.trace("Mem: REMOVE " + ids); 175 } 176 for (String id : ids) { 177 if (states.remove(id) == null) { 178 log.debug("Missing on remove: " + id); 179 } 180 } 181 } 182 183 @Override 184 public State readChildState(String parentId, String name, Set<String> ignored) { 185 // TODO optimize by maintaining a parent/child index 186 for (State state : states.values()) { 187 if (ignored.contains(state.get(KEY_ID))) { 188 continue; 189 } 190 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 191 continue; 192 } 193 if (!name.equals(state.get(KEY_NAME))) { 194 continue; 195 } 196 return state; 197 } 198 return null; 199 } 200 201 @Override 202 public boolean hasChild(String parentId, String name, Set<String> ignored) { 203 return readChildState(parentId, name, ignored) != null; 204 } 205 206 @Override 207 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 208 if (log.isTraceEnabled()) { 209 log.trace("Mem: QUERY " + key + " = " + value); 210 } 211 List<State> list = new ArrayList<>(); 212 for (State state : states.values()) { 213 String id = (String) state.get(KEY_ID); 214 if (ignored.contains(id)) { 215 continue; 216 } 217 if (!value.equals(state.get(key))) { 218 continue; 219 } 220 list.add(state); 221 } 222 if (log.isTraceEnabled() && !list.isEmpty()) { 223 log.trace("Mem: -> " + list.size()); 224 } 225 return list; 226 } 227 228 @Override 229 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 230 if (log.isTraceEnabled()) { 231 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 232 } 233 List<State> list = new ArrayList<>(); 234 for (State state : states.values()) { 235 String id = (String) state.get(KEY_ID); 236 if (ignored.contains(id)) { 237 continue; 238 } 239 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 240 continue; 241 } 242 list.add(state); 243 } 244 if (log.isTraceEnabled() && !list.isEmpty()) { 245 log.trace("Mem: -> " + list.size()); 246 } 247 return list; 248 } 249 250 @Override 251 public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets, 252 Map<String, Object[]> targetProxies) { 253 if (log.isTraceEnabled()) { 254 log.trace("Mem: QUERY " + key + " = " + value); 255 } 256 STATE: for (State state : states.values()) { 257 Object[] array = (Object[]) state.get(key); 258 String id = (String) state.get(KEY_ID); 259 if (array != null) { 260 for (Object v : array) { 261 if (value.equals(v)) { 262 ids.add(id); 263 if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) { 264 String targetId = (String) state.get(KEY_PROXY_TARGET_ID); 265 proxyTargets.put(id, targetId); 266 } 267 if (targetProxies != null) { 268 Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS); 269 if (proxyIds != null) { 270 targetProxies.put(id, proxyIds); 271 } 272 } 273 continue STATE; 274 } 275 } 276 } 277 } 278 if (log.isTraceEnabled() && !ids.isEmpty()) { 279 log.trace("Mem: -> " + ids.size()); 280 } 281 } 282 283 @Override 284 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 285 if (log.isTraceEnabled()) { 286 log.trace("Mem: QUERY " + key + " = " + value); 287 } 288 for (State state : states.values()) { 289 String id = (String) state.get(KEY_ID); 290 if (ignored.contains(id)) { 291 continue; 292 } 293 if (value.equals(state.get(key))) { 294 if (log.isTraceEnabled()) { 295 log.trace("Mem: -> present"); 296 } 297 return true; 298 } 299 } 300 if (log.isTraceEnabled()) { 301 log.trace("Mem: -> absent"); 302 } 303 return false; 304 } 305 306 @Override 307 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 308 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 309 if (log.isTraceEnabled()) { 310 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 311 } 312 evaluator.parse(); 313 List<Map<String, Serializable>> projections = new ArrayList<>(); 314 for (State state : states.values()) { 315 List<Map<String, Serializable>> matches = evaluator.matches(state); 316 if (!matches.isEmpty()) { 317 if (distinctDocuments) { 318 projections.add(matches.get(0)); 319 } else { 320 projections.addAll(matches); 321 } 322 } 323 } 324 // ORDER BY 325 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 326 if (orderByClause != null) { 327 Collections.sort(projections, new OrderByComparator(orderByClause)); 328 } 329 // LIMIT / OFFSET 330 int totalSize = projections.size(); 331 if (countUpTo == -1) { 332 // count full size 333 } else if (countUpTo == 0) { 334 // no count 335 totalSize = -1; // not counted 336 } else { 337 // count only if less than countUpTo 338 if (totalSize > countUpTo) { 339 totalSize = -2; // truncated 340 } 341 } 342 if (limit != 0) { 343 int size = projections.size(); 344 projections.subList(0, offset > size ? size : offset).clear(); 345 size = projections.size(); 346 if (limit < size) { 347 projections.subList(limit, size).clear(); 348 } 349 } 350 // TODO DISTINCT 351 352 if (log.isTraceEnabled() && !projections.isEmpty()) { 353 log.trace("Mem: -> " + projections.size()); 354 } 355 return new PartialList<>(projections, totalSize); 356 } 357 358 /** 359 * Applies a {@link StateDiff} in-place onto a base {@link State}. 360 * <p> 361 * Uses thread-safe datastructures. 362 */ 363 public static void applyDiff(State state, StateDiff stateDiff) { 364 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 365 String key = en.getKey(); 366 Serializable diffElem = en.getValue(); 367 if (diffElem instanceof StateDiff) { 368 Serializable old = state.get(key); 369 if (old == null) { 370 old = new State(true); // thread-safe 371 state.put(key, old); 372 // enter the next if 373 } 374 if (!(old instanceof State)) { 375 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 376 } 377 applyDiff((State) old, (StateDiff) diffElem); 378 } else if (diffElem instanceof ListDiff) { 379 state.put(key, applyDiff(state.get(key), (ListDiff) diffElem)); 380 } else if (diffElem instanceof Delta) { 381 Delta delta = (Delta) diffElem; 382 Number oldValue = (Number) state.get(key); 383 Number value; 384 if (oldValue == null) { 385 value = delta.getFullValue(); 386 } else { 387 value = delta.add(oldValue); 388 } 389 state.put(key, value); 390 } else { 391 state.put(key, StateHelper.deepCopy(diffElem, true)); // thread-safe 392 } 393 } 394 } 395 396 /** 397 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 398 * <p> 399 * Uses thread-safe datastructures. 400 */ 401 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 402 // internally work on a list 403 // TODO this is costly, use a separate code path for arrays 404 Class<?> arrayComponentType = null; 405 if (listDiff.isArray && value != null) { 406 if (!(value instanceof Object[])) { 407 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 408 } 409 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 410 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 411 } 412 if (value == null) { 413 value = new CopyOnWriteArrayList<>(); 414 } 415 if (!(value instanceof List)) { 416 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 417 } 418 @SuppressWarnings("unchecked") 419 List<Serializable> list = (List<Serializable>) value; 420 if (listDiff.diff != null) { 421 int i = 0; 422 for (Object diffElem : listDiff.diff) { 423 if (i >= list.size()) { 424 // TODO log error applying diff to shorter list 425 break; 426 } 427 if (diffElem instanceof StateDiff) { 428 applyDiff((State) list.get(i), (StateDiff) diffElem); 429 } else if (diffElem != NOP) { 430 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 431 } 432 i++; 433 } 434 } 435 if (listDiff.rpush != null) { 436 // deepCopy of what we'll add 437 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 438 for (Object v : listDiff.rpush) { 439 add.add(StateHelper.deepCopy(v, true)); // thread-safe 440 } 441 // update CopyOnWriteArrayList in one step 442 list.addAll(add); 443 } 444 // convert back to array if needed 445 if (listDiff.isArray) { 446 return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 447 } else { 448 return list.isEmpty() ? null : (Serializable) list; 449 } 450 } 451 452 /* synchronized */ 453 @Override 454 public synchronized Lock getLock(String id) { 455 State state = states.get(id); 456 if (state == null) { 457 // document not found 458 throw new DocumentNotFoundException(id); 459 } 460 String owner = (String) state.get(KEY_LOCK_OWNER); 461 if (owner == null) { 462 return null; 463 } 464 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 465 return new Lock(owner, created); 466 } 467 468 /* synchronized */ 469 @Override 470 public synchronized Lock setLock(String id, Lock lock) { 471 State state = states.get(id); 472 if (state == null) { 473 // document not found 474 throw new DocumentNotFoundException(id); 475 } 476 String owner = (String) state.get(KEY_LOCK_OWNER); 477 if (owner != null) { 478 // return old lock 479 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 480 return new Lock(owner, created); 481 } 482 state.put(KEY_LOCK_OWNER, lock.getOwner()); 483 state.put(KEY_LOCK_CREATED, lock.getCreated()); 484 return null; 485 } 486 487 /* synchronized */ 488 @Override 489 public synchronized Lock removeLock(String id, String owner) { 490 State state = states.get(id); 491 if (state == null) { 492 // document not found 493 throw new DocumentNotFoundException(id); 494 } 495 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 496 if (oldOwner == null) { 497 // no previous lock 498 return null; 499 } 500 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 501 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 502 // existing mismatched lock, flag failure 503 return new Lock(oldOwner, oldCreated, true); 504 } 505 // remove lock 506 state.put(KEY_LOCK_OWNER, null); 507 state.put(KEY_LOCK_CREATED, null); 508 // return old lock 509 return new Lock(oldOwner, oldCreated); 510 } 511 512 @Override 513 public void closeLockManager() { 514 } 515 516 @Override 517 public void clearLockManagerCaches() { 518 } 519 520 protected List<List<String>> binaryPaths; 521 522 @Override 523 protected void initBlobsPaths() { 524 MemBlobFinder finder = new MemBlobFinder(); 525 finder.visit(); 526 binaryPaths = finder.binaryPaths; 527 } 528 529 protected static class MemBlobFinder extends BlobFinder { 530 protected List<List<String>> binaryPaths = new ArrayList<>(); 531 532 @Override 533 protected void recordBlobPath() { 534 binaryPaths.add(new ArrayList<>(path)); 535 } 536 } 537 538 @Override 539 public void markReferencedBinaries() { 540 BlobManager blobManager = Framework.getService(BlobManager.class); 541 for (State state : states.values()) { 542 for (List<String> path : binaryPaths) { 543 markReferencedBinaries(state, path, 0, blobManager); 544 } 545 } 546 } 547 548 protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) { 549 for (int i = start; i < path.size(); i++) { 550 String name = path.get(i); 551 Serializable value = state.get(name); 552 if (value instanceof State) { 553 state = (State) value; 554 } else { 555 if (value instanceof List) { 556 @SuppressWarnings("unchecked") 557 List<Object> list = (List<Object>) value; 558 for (Object v : list) { 559 if (v instanceof State) { 560 markReferencedBinaries((State) v, path, i + 1, blobManager); 561 } else { 562 markReferencedBinary(v, blobManager); 563 } 564 } 565 } 566 state = null; 567 break; 568 } 569 } 570 if (state != null) { 571 Serializable data = state.get(KEY_BLOB_DATA); 572 markReferencedBinary(data, blobManager); 573 } 574 } 575 576 protected void markReferencedBinary(Object value, BlobManager blobManager) { 577 if (!(value instanceof String)) { 578 return; 579 } 580 String key = (String) value; 581 blobManager.markReferencedBinary(key, repositoryName); 582 } 583 584}