001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030
031import java.io.Serializable;
032import java.lang.reflect.Array;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Calendar;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.List;
039import java.util.Map;
040import java.util.Map.Entry;
041import java.util.Objects;
042import java.util.Set;
043import java.util.UUID;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.stream.Stream;
048
049import javax.resource.spi.ConnectionManager;
050
051import org.apache.commons.logging.Log;
052import org.apache.commons.logging.LogFactory;
053import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
054import org.nuxeo.ecm.core.api.DocumentNotFoundException;
055import org.nuxeo.ecm.core.api.Lock;
056import org.nuxeo.ecm.core.api.NuxeoException;
057import org.nuxeo.ecm.core.api.PartialList;
058import org.nuxeo.ecm.core.api.ScrollResult;
059import org.nuxeo.ecm.core.api.ScrollResultImpl;
060import org.nuxeo.ecm.core.api.model.Delta;
061import org.nuxeo.ecm.core.blob.DocumentBlobManager;
062import org.nuxeo.ecm.core.model.LockManager;
063import org.nuxeo.ecm.core.model.Repository;
064import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
065import org.nuxeo.ecm.core.storage.State;
066import org.nuxeo.ecm.core.storage.State.ListDiff;
067import org.nuxeo.ecm.core.storage.State.StateDiff;
068import org.nuxeo.ecm.core.storage.StateHelper;
069import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
070import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
071import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
072import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
073import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
074import org.nuxeo.runtime.api.Framework;
075
076/**
077 * In-memory implementation of a {@link Repository}.
078 * <p>
079 * Internally, the repository is a map from id to document object.
080 * <p>
081 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
082 * for the description of the document.
083 *
084 * @since 5.9.4
085 */
086public class MemRepository extends DBSRepositoryBase {
087
088    private static final Log log = LogFactory.getLog(MemRepository.class);
089
090    protected static final String NOSCROLL_ID = "noscroll";
091
092    // for debug
093    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
094
095    /**
096     * The content of the repository, a map of document id -> object.
097     */
098    protected Map<String, State> states;
099
100    public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) {
101        super(cm, descriptor.name, descriptor);
102        initRepository();
103    }
104
105    @Override
106    public List<IdType> getAllowedIdTypes() {
107        return Collections.singletonList(IdType.varchar);
108    }
109
110    @Override
111    public void shutdown() {
112        super.shutdown();
113        states = null;
114    }
115
116    protected void initRepository() {
117        states = new ConcurrentHashMap<>();
118        initRoot();
119    }
120
121    @Override
122    public String generateNewId() {
123        if (DEBUG_UUIDS) {
124            return "UUID_" + temporaryIdCounter.incrementAndGet();
125        } else {
126            return UUID.randomUUID().toString();
127        }
128    }
129
130    @Override
131    public State readState(String id) {
132        return readPartialState(id, null);
133    }
134
135    @Override
136    public State readPartialState(String id, Collection<String> keys) {
137        if (id == null) {
138            return null;
139        }
140        State state = states.get(id);
141        if (state != null) {
142            if (keys != null && !keys.isEmpty()) {
143                State partialState = new State();
144                for (String key : keys) {
145                    Serializable value = state.get(key);
146                    if (value != null) {
147                        partialState.put(key, value);
148                    }
149                }
150                state = partialState;
151            }
152            if (log.isTraceEnabled()) {
153                log.trace("Mem: READ  " + id + ": " + state);
154            }
155        }
156        return state;
157    }
158
159    @Override
160    public List<State> readStates(List<String> ids) {
161        List<State> list = new ArrayList<>();
162        for (String id : ids) {
163            list.add(readState(id));
164        }
165        return list;
166    }
167
168    @Override
169    public void createState(State state) {
170        String id = (String) state.get(KEY_ID);
171        if (log.isTraceEnabled()) {
172            log.trace("Mem: CREATE " + id + ": " + state);
173        }
174        if (states.containsKey(id)) {
175            throw new NuxeoException("Already exists: " + id);
176        }
177        state = StateHelper.deepCopy(state, true); // thread-safe
178        StateHelper.resetDeltas(state);
179        states.put(id, state);
180    }
181
182    @Override
183    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
184        if (log.isTraceEnabled()) {
185            log.trace("Mem: UPDATE " + id + ": " + diff);
186        }
187        State state = states.get(id);
188        if (state == null) {
189            throw new ConcurrentUpdateException("Missing: " + id);
190        }
191        synchronized (state) {
192            // synchronization needed for atomic change token
193            if (changeTokenUpdater != null) {
194                for (Entry<String, Serializable> en : changeTokenUpdater.getConditions().entrySet()) {
195                    if (!Objects.equals(state.get(en.getKey()), en.getValue())) {
196                        throw new ConcurrentUpdateException((String) state.get(KEY_ID));
197                    }
198                }
199                for (Entry<String, Serializable> en : changeTokenUpdater.getUpdates().entrySet()) {
200                    applyDiff(state, en.getKey(), en.getValue());
201                }
202            }
203            applyDiff(state, diff);
204        }
205    }
206
207    @Override
208    public void deleteStates(Set<String> ids) {
209        if (log.isTraceEnabled()) {
210            log.trace("Mem: REMOVE " + ids);
211        }
212        for (String id : ids) {
213            if (states.remove(id) == null) {
214                log.debug("Missing on remove: " + id);
215            }
216        }
217    }
218
219    @Override
220    public State readChildState(String parentId, String name, Set<String> ignored) {
221        // TODO optimize by maintaining a parent/child index
222        for (State state : states.values()) {
223            if (ignored.contains(state.get(KEY_ID))) {
224                continue;
225            }
226            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
227                continue;
228            }
229            if (!name.equals(state.get(KEY_NAME))) {
230                continue;
231            }
232            return state;
233        }
234        return null;
235    }
236
237    @Override
238    public boolean hasChild(String parentId, String name, Set<String> ignored) {
239        return readChildState(parentId, name, ignored) != null;
240    }
241
242    @Override
243    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
244        if (log.isTraceEnabled()) {
245            log.trace("Mem: QUERY " + key + " = " + value);
246        }
247        List<State> list = new ArrayList<>();
248        for (State state : states.values()) {
249            String id = (String) state.get(KEY_ID);
250            if (ignored.contains(id)) {
251                continue;
252            }
253            if (!value.equals(state.get(key))) {
254                continue;
255            }
256            list.add(state);
257        }
258        if (log.isTraceEnabled() && !list.isEmpty()) {
259            log.trace("Mem:    -> " + list.size());
260        }
261        return list;
262    }
263
264    @Override
265    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
266        if (log.isTraceEnabled()) {
267            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
268        }
269        List<State> list = new ArrayList<>();
270        for (State state : states.values()) {
271            String id = (String) state.get(KEY_ID);
272            if (ignored.contains(id)) {
273                continue;
274            }
275            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
276                continue;
277            }
278            list.add(state);
279        }
280        if (log.isTraceEnabled() && !list.isEmpty()) {
281            log.trace("Mem:    -> " + list.size());
282        }
283        return list;
284    }
285
286    @Override
287    public Stream<State> getDescendants(String rootId, Set<String> keys) {
288        return getDescendants(rootId, keys, 0);
289    }
290
291    @Override
292    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
293        if (log.isTraceEnabled()) {
294            log.trace("Mem: QUERY " + KEY_ANCESTOR_IDS + " = " + rootId);
295        }
296        Stream<State> stream = states.values() //
297                                     .stream()
298                                     .filter(state -> hasAncestor(state, rootId));
299        if (limit != 0) {
300            stream = stream.limit(limit);
301        }
302        return stream;
303    }
304
305    protected static boolean hasAncestor(State state, String id) {
306        Object[] array = (Object[]) state.get(KEY_ANCESTOR_IDS);
307        return array == null ? false : Arrays.asList(array).contains(id);
308    }
309
310    @Override
311    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
312        if (log.isTraceEnabled()) {
313            log.trace("Mem: QUERY " + key + " = " + value);
314        }
315        for (State state : states.values()) {
316            String id = (String) state.get(KEY_ID);
317            if (ignored.contains(id)) {
318                continue;
319            }
320            if (value.equals(state.get(key))) {
321                if (log.isTraceEnabled()) {
322                    log.trace("Mem:    -> present");
323                }
324                return true;
325            }
326        }
327        if (log.isTraceEnabled()) {
328            log.trace("Mem:    -> absent");
329        }
330        return false;
331    }
332
333    @Override
334    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
335            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
336        if (log.isTraceEnabled()) {
337            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
338        }
339        evaluator.parse();
340        List<Map<String, Serializable>> projections = new ArrayList<>();
341        for (State state : states.values()) {
342            List<Map<String, Serializable>> matches = evaluator.matches(state);
343            if (!matches.isEmpty()) {
344                if (distinctDocuments) {
345                    projections.add(matches.get(0));
346                } else {
347                    projections.addAll(matches);
348                }
349            }
350        }
351        // ORDER BY
352        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
353        if (orderByClause != null) {
354            Collections.sort(projections, new OrderByComparator(orderByClause));
355        }
356        // LIMIT / OFFSET
357        int totalSize = projections.size();
358        if (countUpTo == -1) {
359            // count full size
360        } else if (countUpTo == 0) {
361            // no count
362            totalSize = -1; // not counted
363        } else {
364            // count only if less than countUpTo
365            if (totalSize > countUpTo) {
366                totalSize = -2; // truncated
367            }
368        }
369        if (limit != 0) {
370            int size = projections.size();
371            projections.subList(0, offset > size ? size : offset).clear();
372            size = projections.size();
373            if (limit < size) {
374                projections.subList(limit, size).clear();
375            }
376        }
377        // TODO DISTINCT
378
379        if (log.isTraceEnabled() && !projections.isEmpty()) {
380            log.trace("Mem:    -> " + projections.size());
381        }
382        return new PartialList<>(projections, totalSize);
383    }
384
385    @Override
386    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
387        if (log.isTraceEnabled()) {
388            log.trace("Mem: QUERY " + evaluator);
389        }
390        evaluator.parse();
391        List<String> ids = new ArrayList<>();
392        for (State state : states.values()) {
393            List<Map<String, Serializable>> matches = evaluator.matches(state);
394            if (!matches.isEmpty()) {
395                String id = matches.get(0).get(ECM_UUID).toString();
396                ids.add(id);
397            }
398        }
399        return new ScrollResultImpl<>(NOSCROLL_ID, ids);
400    }
401
402    @Override
403    public ScrollResult<String> scroll(String scrollId) {
404        if (NOSCROLL_ID.equals(scrollId)) {
405            // Id are already in memory, they are returned as a single batch
406            return ScrollResultImpl.emptyResult();
407        }
408        throw new NuxeoException("Unknown or timed out scrollId");
409    }
410
411    /**
412     * Applies a {@link StateDiff} in-place onto a base {@link State}.
413     * <p>
414     * Uses thread-safe datastructures.
415     */
416    public static void applyDiff(State state, StateDiff stateDiff) {
417        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
418            applyDiff(state, en.getKey(), en.getValue());
419        }
420    }
421
422    /**
423     * Applies a key/value diff in-place onto a base {@link State}.
424     * <p>
425     * Uses thread-safe datastructures.
426     */
427    protected static void applyDiff(State state, String key, Serializable value) {
428        if (value instanceof StateDiff) {
429            Serializable old = state.get(key);
430            if (old == null) {
431                old = new State(true); // thread-safe
432                state.put(key, old);
433                // enter the next if
434            }
435            if (!(old instanceof State)) {
436                throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
437            }
438            applyDiff((State) old, (StateDiff) value);
439        } else if (value instanceof ListDiff) {
440            state.put(key, applyDiff(state.get(key), (ListDiff) value));
441        } else if (value instanceof Delta) {
442            Delta delta = (Delta) value;
443            Number oldValue = (Number) state.get(key);
444            Number newValue;
445            if (oldValue == null) {
446                newValue = delta.getFullValue();
447            } else {
448                newValue = delta.add(oldValue);
449            }
450            state.put(key, newValue);
451        } else {
452            state.put(key, StateHelper.deepCopy(value, true)); // thread-safe
453        }
454    }
455
456    /**
457     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
458     * <p>
459     * Uses thread-safe datastructures.
460     */
461    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
462        // internally work on a list
463        // TODO this is costly, use a separate code path for arrays
464        Class<?> arrayComponentType = null;
465        if (listDiff.isArray && value != null) {
466            if (!(value instanceof Object[])) {
467                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
468            }
469            arrayComponentType = ((Object[]) value).getClass().getComponentType();
470            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
471        }
472        if (value == null) {
473            value = new CopyOnWriteArrayList<>();
474        }
475        if (!(value instanceof List)) {
476            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
477        }
478        @SuppressWarnings("unchecked")
479        List<Serializable> list = (List<Serializable>) value;
480        if (listDiff.diff != null) {
481            int i = 0;
482            for (Object diffElem : listDiff.diff) {
483                if (i >= list.size()) {
484                    // TODO log error applying diff to shorter list
485                    break;
486                }
487                if (diffElem instanceof StateDiff) {
488                    applyDiff((State) list.get(i), (StateDiff) diffElem);
489                } else if (diffElem != NOP) {
490                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
491                }
492                i++;
493            }
494        }
495        if (listDiff.rpush != null) {
496            // deepCopy of what we'll add
497            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
498            for (Object v : listDiff.rpush) {
499                add.add(StateHelper.deepCopy(v, true)); // thread-safe
500            }
501            // update CopyOnWriteArrayList in one step
502            list.addAll(add);
503        }
504        // convert back to array if needed
505        if (listDiff.isArray) {
506            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
507        } else {
508            return list.isEmpty() ? null : (Serializable) list;
509        }
510    }
511
512    /* synchronized */
513    @Override
514    public synchronized Lock getLock(String id) {
515        State state = states.get(id);
516        if (state == null) {
517            // document not found
518            throw new DocumentNotFoundException(id);
519        }
520        String owner = (String) state.get(KEY_LOCK_OWNER);
521        if (owner == null) {
522            return null;
523        }
524        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
525        return new Lock(owner, created);
526    }
527
528    /* synchronized */
529    @Override
530    public synchronized Lock setLock(String id, Lock lock) {
531        State state = states.get(id);
532        if (state == null) {
533            // document not found
534            throw new DocumentNotFoundException(id);
535        }
536        String owner = (String) state.get(KEY_LOCK_OWNER);
537        if (owner != null) {
538            // return old lock
539            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
540            return new Lock(owner, created);
541        }
542        state.put(KEY_LOCK_OWNER, lock.getOwner());
543        state.put(KEY_LOCK_CREATED, lock.getCreated());
544        return null;
545    }
546
547    /* synchronized */
548    @Override
549    public synchronized Lock removeLock(String id, String owner) {
550        State state = states.get(id);
551        if (state == null) {
552            // document not found
553            throw new DocumentNotFoundException(id);
554        }
555        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
556        if (oldOwner == null) {
557            // no previous lock
558            return null;
559        }
560        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
561        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
562            // existing mismatched lock, flag failure
563            return new Lock(oldOwner, oldCreated, true);
564        }
565        // remove lock
566        state.put(KEY_LOCK_OWNER, null);
567        state.put(KEY_LOCK_CREATED, null);
568        // return old lock
569        return new Lock(oldOwner, oldCreated);
570    }
571
572    @Override
573    public void closeLockManager() {
574    }
575
576    @Override
577    public void clearLockManagerCaches() {
578    }
579
580    protected List<List<String>> binaryPaths;
581
582    @Override
583    protected void initBlobsPaths() {
584        MemBlobFinder finder = new MemBlobFinder();
585        finder.visit();
586        binaryPaths = finder.binaryPaths;
587    }
588
589    protected static class MemBlobFinder extends BlobFinder {
590        protected List<List<String>> binaryPaths = new ArrayList<>();
591
592        @Override
593        protected void recordBlobPath() {
594            binaryPaths.add(new ArrayList<>(path));
595        }
596    }
597
598    @Override
599    public void markReferencedBinaries() {
600        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
601        for (State state : states.values()) {
602            for (List<String> path : binaryPaths) {
603                markReferencedBinaries(state, path, 0, blobManager);
604            }
605        }
606    }
607
608    protected void markReferencedBinaries(State state, List<String> path, int start, DocumentBlobManager blobManager) {
609        for (int i = start; i < path.size(); i++) {
610            String name = path.get(i);
611            Serializable value = state.get(name);
612            if (value instanceof State) {
613                state = (State) value;
614            } else {
615                if (value instanceof List) {
616                    @SuppressWarnings("unchecked")
617                    List<Object> list = (List<Object>) value;
618                    for (Object v : list) {
619                        if (v instanceof State) {
620                            markReferencedBinaries((State) v, path, i + 1, blobManager);
621                        } else {
622                            markReferencedBinary(v, blobManager);
623                        }
624                    }
625                }
626                state = null;
627                break;
628            }
629        }
630        if (state != null) {
631            Serializable data = state.get(KEY_BLOB_DATA);
632            markReferencedBinary(data, blobManager);
633        }
634    }
635
636    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
637        if (!(value instanceof String)) {
638            return;
639        }
640        String key = (String) value;
641        blobManager.markReferencedBinary(key, repositoryName);
642    }
643
644}