001/* 002 * (C) Copyright 2014-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.mem; 020 021import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID; 022import static org.nuxeo.ecm.core.storage.State.NOP; 023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS; 024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID; 025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED; 026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER; 027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME; 028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID; 029 030import java.io.Serializable; 031import java.lang.reflect.Array; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Calendar; 035import java.util.Collection; 036import java.util.List; 037import java.util.Map; 038import java.util.Map.Entry; 039import java.util.Objects; 040import java.util.Set; 041import java.util.concurrent.CopyOnWriteArrayList; 042import java.util.stream.Stream; 043 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 047import org.nuxeo.ecm.core.api.DocumentNotFoundException; 048import org.nuxeo.ecm.core.api.Lock; 049import org.nuxeo.ecm.core.api.NuxeoException; 050import org.nuxeo.ecm.core.api.PartialList; 051import org.nuxeo.ecm.core.api.ScrollResult; 052import org.nuxeo.ecm.core.api.ScrollResultImpl; 053import org.nuxeo.ecm.core.api.lock.LockManager; 054import org.nuxeo.ecm.core.api.model.Delta; 055import org.nuxeo.ecm.core.query.sql.model.OrderByClause; 056import org.nuxeo.ecm.core.storage.State; 057import org.nuxeo.ecm.core.storage.State.ListDiff; 058import org.nuxeo.ecm.core.storage.State.StateDiff; 059import org.nuxeo.ecm.core.storage.StateHelper; 060import org.nuxeo.ecm.core.storage.dbs.DBSConnection; 061import org.nuxeo.ecm.core.storage.dbs.DBSConnectionBase; 062import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator; 063import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator; 064import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater; 065 066/** 067 * In-memory implementation of a {@link DBSConnection}. 068 * 069 * @since 11.1 (introduced in 5.9.4 as MemRepository) 070 */ 071public class MemConnection extends DBSConnectionBase { 072 073 private static final Log log = LogFactory.getLog(MemRepository.class); 074 075 protected static final String NOSCROLL_ID = "noscroll"; 076 077 // the global state, from the repository (thread-safe map) 078 protected Map<String, State> states; 079 080 public MemConnection(MemRepository repository) { 081 super(repository); 082 states = repository.states; 083 } 084 085 @Override 086 public void begin() { 087 // nothing 088 } 089 090 @Override 091 public void commit() { 092 // nothing 093 } 094 095 @Override 096 public void rollback() { 097 // nothing 098 } 099 100 protected void initRepository() { 101 initRoot(); 102 } 103 104 @Override 105 public void close() { 106 // nothing 107 } 108 109 @Override 110 public String generateNewId() { 111 return ((MemRepository) repository).generateNewId(); 112 } 113 114 @Override 115 public State readState(String id) { 116 return readPartialState(id, null); 117 } 118 119 @Override 120 public State readPartialState(String id, Collection<String> keys) { 121 if (id == null) { 122 return null; 123 } 124 State state = states.get(id); 125 if (state != null) { 126 if (keys != null && !keys.isEmpty()) { 127 State partialState = new State(); 128 for (String key : keys) { 129 Serializable value = state.get(key); 130 if (value != null) { 131 partialState.put(key, value); 132 } 133 } 134 state = partialState; 135 } 136 if (log.isTraceEnabled()) { 137 log.trace("Mem: READ " + id + ": " + state); 138 } 139 } 140 return state; 141 } 142 143 @Override 144 public List<State> readStates(List<String> ids) { 145 List<State> list = new ArrayList<>(); 146 for (String id : ids) { 147 list.add(readState(id)); 148 } 149 return list; 150 } 151 152 @Override 153 public void createState(State state) { 154 String id = (String) state.get(KEY_ID); 155 if (log.isTraceEnabled()) { 156 log.trace("Mem: CREATE " + id + ": " + state); 157 } 158 if (states.containsKey(id)) { 159 throw new NuxeoException("Already exists: " + id); 160 } 161 state = StateHelper.deepCopy(state, true); // thread-safe 162 StateHelper.resetDeltas(state); 163 states.put(id, state); 164 } 165 166 @Override 167 public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) { 168 if (log.isTraceEnabled()) { 169 log.trace("Mem: UPDATE " + id + ": " + diff); 170 } 171 State state = states.get(id); 172 if (state == null) { 173 throw new ConcurrentUpdateException("Missing: " + id); 174 } 175 synchronized (state) { 176 // synchronization needed for atomic change token 177 if (changeTokenUpdater != null) { 178 for (Entry<String, Serializable> en : changeTokenUpdater.getConditions().entrySet()) { 179 if (!Objects.equals(state.get(en.getKey()), en.getValue())) { 180 throw new ConcurrentUpdateException((String) state.get(KEY_ID)); 181 } 182 } 183 for (Entry<String, Serializable> en : changeTokenUpdater.getUpdates().entrySet()) { 184 applyDiff(state, en.getKey(), en.getValue()); 185 } 186 } 187 applyDiff(state, diff); 188 } 189 } 190 191 @Override 192 public void deleteStates(Set<String> ids) { 193 if (log.isTraceEnabled()) { 194 log.trace("Mem: REMOVE " + ids); 195 } 196 for (String id : ids) { 197 if (states.remove(id) == null) { 198 log.debug("Missing on remove: " + id); 199 } 200 } 201 } 202 203 @Override 204 public State readChildState(String parentId, String name, Set<String> ignored) { 205 // TODO optimize by maintaining a parent/child index 206 for (State state : states.values()) { 207 if (ignored.contains(state.get(KEY_ID))) { 208 continue; 209 } 210 if (!parentId.equals(state.get(KEY_PARENT_ID))) { 211 continue; 212 } 213 if (!name.equals(state.get(KEY_NAME))) { 214 continue; 215 } 216 return state; 217 } 218 return null; 219 } 220 221 @Override 222 public boolean hasChild(String parentId, String name, Set<String> ignored) { 223 return readChildState(parentId, name, ignored) != null; 224 } 225 226 @Override 227 public List<State> queryKeyValue(String key, Object value, Set<String> ignored) { 228 if (log.isTraceEnabled()) { 229 log.trace("Mem: QUERY " + key + " = " + value); 230 } 231 List<State> list = new ArrayList<>(); 232 for (State state : states.values()) { 233 String id = (String) state.get(KEY_ID); 234 if (ignored.contains(id)) { 235 continue; 236 } 237 if (!value.equals(state.get(key))) { 238 continue; 239 } 240 list.add(state); 241 } 242 if (log.isTraceEnabled() && !list.isEmpty()) { 243 log.trace("Mem: -> " + list.size()); 244 } 245 return list; 246 } 247 248 @Override 249 public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) { 250 if (log.isTraceEnabled()) { 251 log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2); 252 } 253 List<State> list = new ArrayList<>(); 254 for (State state : states.values()) { 255 String id = (String) state.get(KEY_ID); 256 if (ignored.contains(id)) { 257 continue; 258 } 259 if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) { 260 continue; 261 } 262 list.add(state); 263 } 264 if (log.isTraceEnabled() && !list.isEmpty()) { 265 log.trace("Mem: -> " + list.size()); 266 } 267 return list; 268 } 269 270 @Override 271 public Stream<State> getDescendants(String rootId, Set<String> keys) { 272 return getDescendants(rootId, keys, 0); 273 } 274 275 @Override 276 public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) { 277 if (log.isTraceEnabled()) { 278 log.trace("Mem: QUERY " + KEY_ANCESTOR_IDS + " = " + rootId); 279 } 280 Stream<State> stream = states.values() // 281 .stream() 282 .filter(state -> hasAncestor(state, rootId)); 283 if (limit != 0) { 284 stream = stream.limit(limit); 285 } 286 return stream; 287 } 288 289 protected static boolean hasAncestor(State state, String id) { 290 Object[] array = (Object[]) state.get(KEY_ANCESTOR_IDS); 291 return array == null ? false : Arrays.asList(array).contains(id); 292 } 293 294 @Override 295 public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) { 296 if (log.isTraceEnabled()) { 297 log.trace("Mem: QUERY " + key + " = " + value); 298 } 299 for (State state : states.values()) { 300 String id = (String) state.get(KEY_ID); 301 if (ignored.contains(id)) { 302 continue; 303 } 304 if (value.equals(state.get(key))) { 305 if (log.isTraceEnabled()) { 306 log.trace("Mem: -> present"); 307 } 308 return true; 309 } 310 } 311 if (log.isTraceEnabled()) { 312 log.trace("Mem: -> absent"); 313 } 314 return false; 315 } 316 317 @Override 318 public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator, 319 OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) { 320 if (log.isTraceEnabled()) { 321 log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit); 322 } 323 evaluator.parse(); 324 List<Map<String, Serializable>> projections = new ArrayList<>(); 325 for (State state : states.values()) { 326 List<Map<String, Serializable>> matches = evaluator.matches(state); 327 if (!matches.isEmpty()) { 328 if (distinctDocuments) { 329 projections.add(matches.get(0)); 330 } else { 331 projections.addAll(matches); 332 } 333 } 334 } 335 // ORDER BY 336 // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter 337 if (orderByClause != null) { 338 projections.sort(new OrderByComparator(orderByClause)); 339 } 340 // LIMIT / OFFSET 341 int totalSize = projections.size(); 342 if (countUpTo == -1) { 343 // count full size 344 } else if (countUpTo == 0) { 345 // no count 346 totalSize = -1; // not counted 347 } else { 348 // count only if less than countUpTo 349 if (totalSize > countUpTo) { 350 totalSize = -2; // truncated 351 } 352 } 353 if (limit != 0) { 354 int size = projections.size(); 355 projections.subList(0, offset > size ? size : offset).clear(); 356 size = projections.size(); 357 if (limit < size) { 358 projections.subList(limit, size).clear(); 359 } 360 } 361 // TODO DISTINCT 362 363 if (log.isTraceEnabled() && !projections.isEmpty()) { 364 log.trace("Mem: -> " + projections.size()); 365 } 366 return new PartialList<>(projections, totalSize); 367 } 368 369 @Override 370 public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) { 371 if (log.isTraceEnabled()) { 372 log.trace("Mem: QUERY " + evaluator); 373 } 374 evaluator.parse(); 375 List<String> ids = new ArrayList<>(); 376 for (State state : states.values()) { 377 List<Map<String, Serializable>> matches = evaluator.matches(state); 378 if (!matches.isEmpty()) { 379 String id = matches.get(0).get(ECM_UUID).toString(); 380 ids.add(id); 381 } 382 } 383 return new ScrollResultImpl<>(NOSCROLL_ID, ids); 384 } 385 386 @Override 387 public ScrollResult<String> scroll(String scrollId) { 388 if (NOSCROLL_ID.equals(scrollId)) { 389 // Id are already in memory, they are returned as a single batch 390 return ScrollResultImpl.emptyResult(); 391 } 392 throw new NuxeoException("Unknown or timed out scrollId"); 393 } 394 395 /** 396 * Applies a {@link StateDiff} in-place onto a base {@link State}. 397 * <p> 398 * Uses thread-safe datastructures. 399 */ 400 public static void applyDiff(State state, StateDiff stateDiff) { 401 for (Entry<String, Serializable> en : stateDiff.entrySet()) { 402 applyDiff(state, en.getKey(), en.getValue()); 403 } 404 } 405 406 /** 407 * Applies a key/value diff in-place onto a base {@link State}. 408 * <p> 409 * Uses thread-safe datastructures. 410 */ 411 protected static void applyDiff(State state, String key, Serializable value) { 412 if (value instanceof StateDiff) { 413 Serializable old = state.get(key); 414 if (old == null) { 415 old = new State(true); // thread-safe 416 state.put(key, old); 417 // enter the next if 418 } 419 if (!(old instanceof State)) { 420 throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old); 421 } 422 applyDiff((State) old, (StateDiff) value); 423 } else if (value instanceof ListDiff) { 424 state.put(key, applyDiff(state.get(key), (ListDiff) value)); 425 } else if (value instanceof Delta) { 426 Delta delta = (Delta) value; 427 Number oldValue = (Number) state.get(key); 428 Number newValue; 429 if (oldValue == null) { 430 newValue = delta.getFullValue(); 431 } else { 432 newValue = delta.add(oldValue); 433 } 434 state.put(key, newValue); 435 } else { 436 state.put(key, StateHelper.deepCopy(value, true)); // thread-safe 437 } 438 } 439 440 /** 441 * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value. 442 * <p> 443 * Uses thread-safe datastructures. 444 */ 445 public static Serializable applyDiff(Serializable value, ListDiff listDiff) { 446 // internally work on a list 447 // TODO this is costly, use a separate code path for arrays 448 Class<?> arrayComponentType = null; 449 if (listDiff.isArray && value != null) { 450 if (!(value instanceof Object[])) { 451 throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value); 452 } 453 arrayComponentType = ((Object[]) value).getClass().getComponentType(); 454 value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value)); 455 } 456 if (value == null) { 457 value = new CopyOnWriteArrayList<>(); 458 } 459 if (!(value instanceof List)) { 460 throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value); 461 } 462 @SuppressWarnings("unchecked") 463 List<Serializable> list = (List<Serializable>) value; 464 if (listDiff.diff != null) { 465 int i = 0; 466 for (Object diffElem : listDiff.diff) { 467 if (i >= list.size()) { 468 // TODO log error applying diff to shorter list 469 break; 470 } 471 if (diffElem instanceof StateDiff) { 472 applyDiff((State) list.get(i), (StateDiff) diffElem); 473 } else if (diffElem != NOP) { 474 list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe 475 } 476 i++; 477 } 478 } 479 if (listDiff.rpush != null) { 480 // deepCopy of what we'll add 481 List<Serializable> add = new ArrayList<>(listDiff.rpush.size()); 482 for (Object v : listDiff.rpush) { 483 add.add(StateHelper.deepCopy(v, true)); // thread-safe 484 } 485 // update CopyOnWriteArrayList in one step 486 list.addAll(add); 487 } 488 if (listDiff.pull != null) { 489 list.removeAll(listDiff.pull); 490 } 491 if (list.isEmpty()) { 492 return null; 493 } 494 // convert back to array if needed 495 if (listDiff.isArray) { 496 if (arrayComponentType == null) { 497 // initial value was null (empty) so we couldn't get the type 498 // instead get the type from the first new element (added through rpush) 499 arrayComponentType = list.get(0).getClass(); 500 } 501 return list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size())); 502 } else { 503 return (Serializable) list; 504 } 505 } 506 507 /* synchronized */ 508 @Override 509 public synchronized Lock getLock(String id) { 510 State state = states.get(id); 511 if (state == null) { 512 // document not found 513 throw new DocumentNotFoundException(id); 514 } 515 String owner = (String) state.get(KEY_LOCK_OWNER); 516 if (owner == null) { 517 return null; 518 } 519 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 520 return new Lock(owner, created); 521 } 522 523 /* synchronized */ 524 @Override 525 public synchronized Lock setLock(String id, Lock lock) { 526 State state = states.get(id); 527 if (state == null) { 528 // document not found 529 throw new DocumentNotFoundException(id); 530 } 531 String owner = (String) state.get(KEY_LOCK_OWNER); 532 if (owner != null) { 533 // return old lock 534 Calendar created = (Calendar) state.get(KEY_LOCK_CREATED); 535 return new Lock(owner, created); 536 } 537 state.put(KEY_LOCK_OWNER, lock.getOwner()); 538 state.put(KEY_LOCK_CREATED, lock.getCreated()); 539 return null; 540 } 541 542 /* synchronized */ 543 @Override 544 public synchronized Lock removeLock(String id, String owner) { 545 State state = states.get(id); 546 if (state == null) { 547 // document not found 548 throw new DocumentNotFoundException(id); 549 } 550 String oldOwner = (String) state.get(KEY_LOCK_OWNER); 551 if (oldOwner == null) { 552 // no previous lock 553 return null; 554 } 555 Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED); 556 if (!LockManager.canLockBeRemoved(oldOwner, owner)) { 557 // existing mismatched lock, flag failure 558 return new Lock(oldOwner, oldCreated, true); 559 } 560 // remove lock 561 state.put(KEY_LOCK_OWNER, null); 562 state.put(KEY_LOCK_CREATED, null); 563 // return old lock 564 return new Lock(oldOwner, oldCreated); 565 } 566 567 @Override 568 public List<State> queryKeyValueWithOperator(String key1, Object value1, String key2, DBSQueryOperator operator, 569 Object value2, Set<String> ignored) { 570 throw new UnsupportedOperationException(); 571 } 572 573}