001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID;
023import static org.nuxeo.ecm.core.storage.State.NOP;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
033
034import java.io.Serializable;
035import java.lang.reflect.Array;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Calendar;
039import java.util.Collections;
040import java.util.List;
041import java.util.Map;
042import java.util.Map.Entry;
043import java.util.Objects;
044import java.util.Set;
045import java.util.UUID;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.CopyOnWriteArrayList;
048import java.util.concurrent.atomic.AtomicLong;
049
050import javax.resource.spi.ConnectionManager;
051
052import org.apache.commons.logging.Log;
053import org.apache.commons.logging.LogFactory;
054import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
055import org.nuxeo.ecm.core.api.DocumentNotFoundException;
056import org.nuxeo.ecm.core.api.Lock;
057import org.nuxeo.ecm.core.api.NuxeoException;
058import org.nuxeo.ecm.core.api.PartialList;
059import org.nuxeo.ecm.core.api.ScrollResult;
060import org.nuxeo.ecm.core.api.ScrollResultImpl;
061import org.nuxeo.ecm.core.api.model.Delta;
062import org.nuxeo.ecm.core.blob.DocumentBlobManager;
063import org.nuxeo.ecm.core.model.LockManager;
064import org.nuxeo.ecm.core.model.Repository;
065import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
066import org.nuxeo.ecm.core.storage.State;
067import org.nuxeo.ecm.core.storage.State.ListDiff;
068import org.nuxeo.ecm.core.storage.State.StateDiff;
069import org.nuxeo.ecm.core.storage.StateHelper;
070import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
071import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
072import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
073import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
074import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
075import org.nuxeo.runtime.api.Framework;
076
077/**
078 * In-memory implementation of a {@link Repository}.
079 * <p>
080 * Internally, the repository is a map from id to document object.
081 * <p>
082 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
083 * for the description of the document.
084 *
085 * @since 5.9.4
086 */
087public class MemRepository extends DBSRepositoryBase {
088
089    private static final Log log = LogFactory.getLog(MemRepository.class);
090
091    protected static final String NOSCROLL_ID = "noscroll";
092
093    // for debug
094    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
095
096    /**
097     * The content of the repository, a map of document id -> object.
098     */
099    protected Map<String, State> states;
100
101    public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) {
102        super(cm, descriptor.name, descriptor);
103        initRepository();
104    }
105
106    @Override
107    public List<IdType> getAllowedIdTypes() {
108        return Collections.singletonList(IdType.varchar);
109    }
110
111    @Override
112    public void shutdown() {
113        super.shutdown();
114        states = null;
115    }
116
117    protected void initRepository() {
118        states = new ConcurrentHashMap<>();
119        initRoot();
120    }
121
122    @Override
123    public String generateNewId() {
124        if (DEBUG_UUIDS) {
125            return "UUID_" + temporaryIdCounter.incrementAndGet();
126        } else {
127            return UUID.randomUUID().toString();
128        }
129    }
130
131    @Override
132    public State readState(String id) {
133        State state = states.get(id);
134        if (state != null) {
135            if (log.isTraceEnabled()) {
136                log.trace("Mem: READ  " + id + ": " + state);
137            }
138        }
139        return state;
140    }
141
142    @Override
143    public List<State> readStates(List<String> ids) {
144        List<State> list = new ArrayList<>();
145        for (String id : ids) {
146            list.add(readState(id));
147        }
148        return list;
149    }
150
151    @Override
152    public void createState(State state) {
153        String id = (String) state.get(KEY_ID);
154        if (log.isTraceEnabled()) {
155            log.trace("Mem: CREATE " + id + ": " + state);
156        }
157        if (states.containsKey(id)) {
158            throw new NuxeoException("Already exists: " + id);
159        }
160        state = StateHelper.deepCopy(state, true); // thread-safe
161        StateHelper.resetDeltas(state);
162        states.put(id, state);
163    }
164
165    @Override
166    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
167        if (log.isTraceEnabled()) {
168            log.trace("Mem: UPDATE " + id + ": " + diff);
169        }
170        State state = states.get(id);
171        if (state == null) {
172            throw new ConcurrentUpdateException("Missing: " + id);
173        }
174        synchronized (state) {
175            // synchronization needed for atomic change token
176            if (changeTokenUpdater != null) {
177                for (Entry<String, Serializable> en : changeTokenUpdater.getConditions().entrySet()) {
178                    if (!Objects.equals(state.get(en.getKey()), en.getValue())) {
179                        throw new ConcurrentUpdateException((String) state.get(KEY_ID));
180                    }
181                }
182                for (Entry<String, Serializable> en : changeTokenUpdater.getUpdates().entrySet()) {
183                    applyDiff(state, en.getKey(), en.getValue());
184                }
185            }
186            applyDiff(state, diff);
187        }
188    }
189
190    @Override
191    public void deleteStates(Set<String> ids) {
192        if (log.isTraceEnabled()) {
193            log.trace("Mem: REMOVE " + ids);
194        }
195        for (String id : ids) {
196            if (states.remove(id) == null) {
197                log.debug("Missing on remove: " + id);
198            }
199        }
200    }
201
202    @Override
203    public State readChildState(String parentId, String name, Set<String> ignored) {
204        // TODO optimize by maintaining a parent/child index
205        for (State state : states.values()) {
206            if (ignored.contains(state.get(KEY_ID))) {
207                continue;
208            }
209            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
210                continue;
211            }
212            if (!name.equals(state.get(KEY_NAME))) {
213                continue;
214            }
215            return state;
216        }
217        return null;
218    }
219
220    @Override
221    public boolean hasChild(String parentId, String name, Set<String> ignored) {
222        return readChildState(parentId, name, ignored) != null;
223    }
224
225    @Override
226    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
227        if (log.isTraceEnabled()) {
228            log.trace("Mem: QUERY " + key + " = " + value);
229        }
230        List<State> list = new ArrayList<>();
231        for (State state : states.values()) {
232            String id = (String) state.get(KEY_ID);
233            if (ignored.contains(id)) {
234                continue;
235            }
236            if (!value.equals(state.get(key))) {
237                continue;
238            }
239            list.add(state);
240        }
241        if (log.isTraceEnabled() && !list.isEmpty()) {
242            log.trace("Mem:    -> " + list.size());
243        }
244        return list;
245    }
246
247    @Override
248    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
249        if (log.isTraceEnabled()) {
250            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
251        }
252        List<State> list = new ArrayList<>();
253        for (State state : states.values()) {
254            String id = (String) state.get(KEY_ID);
255            if (ignored.contains(id)) {
256                continue;
257            }
258            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
259                continue;
260            }
261            list.add(state);
262        }
263        if (log.isTraceEnabled() && !list.isEmpty()) {
264            log.trace("Mem:    -> " + list.size());
265        }
266        return list;
267    }
268
269    @Override
270    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
271            Map<String, Object[]> targetProxies) {
272        if (log.isTraceEnabled()) {
273            log.trace("Mem: QUERY " + key + " = " + value);
274        }
275        STATE: for (State state : states.values()) {
276            Object[] array = (Object[]) state.get(key);
277            String id = (String) state.get(KEY_ID);
278            if (array != null) {
279                for (Object v : array) {
280                    if (value.equals(v)) {
281                        ids.add(id);
282                        if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
283                            String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
284                            proxyTargets.put(id, targetId);
285                        }
286                        if (targetProxies != null) {
287                            Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
288                            if (proxyIds != null) {
289                                targetProxies.put(id, proxyIds);
290                            }
291                        }
292                        continue STATE;
293                    }
294                }
295            }
296        }
297        if (log.isTraceEnabled() && !ids.isEmpty()) {
298            log.trace("Mem:    -> " + ids.size());
299        }
300    }
301
302    @Override
303    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
304        if (log.isTraceEnabled()) {
305            log.trace("Mem: QUERY " + key + " = " + value);
306        }
307        for (State state : states.values()) {
308            String id = (String) state.get(KEY_ID);
309            if (ignored.contains(id)) {
310                continue;
311            }
312            if (value.equals(state.get(key))) {
313                if (log.isTraceEnabled()) {
314                    log.trace("Mem:    -> present");
315                }
316                return true;
317            }
318        }
319        if (log.isTraceEnabled()) {
320            log.trace("Mem:    -> absent");
321        }
322        return false;
323    }
324
325    @Override
326    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
327            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
328        if (log.isTraceEnabled()) {
329            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
330        }
331        evaluator.parse();
332        List<Map<String, Serializable>> projections = new ArrayList<>();
333        for (State state : states.values()) {
334            List<Map<String, Serializable>> matches = evaluator.matches(state);
335            if (!matches.isEmpty()) {
336                if (distinctDocuments) {
337                    projections.add(matches.get(0));
338                } else {
339                    projections.addAll(matches);
340                }
341            }
342        }
343        // ORDER BY
344        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
345        if (orderByClause != null) {
346            Collections.sort(projections, new OrderByComparator(orderByClause));
347        }
348        // LIMIT / OFFSET
349        int totalSize = projections.size();
350        if (countUpTo == -1) {
351            // count full size
352        } else if (countUpTo == 0) {
353            // no count
354            totalSize = -1; // not counted
355        } else {
356            // count only if less than countUpTo
357            if (totalSize > countUpTo) {
358                totalSize = -2; // truncated
359            }
360        }
361        if (limit != 0) {
362            int size = projections.size();
363            projections.subList(0, offset > size ? size : offset).clear();
364            size = projections.size();
365            if (limit < size) {
366                projections.subList(limit, size).clear();
367            }
368        }
369        // TODO DISTINCT
370
371        if (log.isTraceEnabled() && !projections.isEmpty()) {
372            log.trace("Mem:    -> " + projections.size());
373        }
374        return new PartialList<>(projections, totalSize);
375    }
376
377    @Override
378    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
379        if (log.isTraceEnabled()) {
380            log.trace("Mem: QUERY " + evaluator);
381        }
382        evaluator.parse();
383        List<String> ids = new ArrayList<>();
384        for (State state : states.values()) {
385            List<Map<String, Serializable>> matches = evaluator.matches(state);
386            if (!matches.isEmpty()) {
387                String id = matches.get(0).get(ECM_UUID).toString();
388                ids.add(id);
389            }
390        }
391        return new ScrollResultImpl(NOSCROLL_ID, ids);
392    }
393
394    @Override
395    public ScrollResult scroll(String scrollId) {
396        if (NOSCROLL_ID.equals(scrollId)) {
397            // Id are already in memory, they are returned as a single batch
398            return ScrollResultImpl.emptyResult();
399        }
400        throw new NuxeoException("Unknown or timed out scrollId");
401    }
402
403    /**
404     * Applies a {@link StateDiff} in-place onto a base {@link State}.
405     * <p>
406     * Uses thread-safe datastructures.
407     */
408    public static void applyDiff(State state, StateDiff stateDiff) {
409        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
410            applyDiff(state, en.getKey(), en.getValue());
411        }
412    }
413
414    /**
415     * Applies a key/value diff in-place onto a base {@link State}.
416     * <p>
417     * Uses thread-safe datastructures.
418     */
419    protected static void applyDiff(State state, String key, Serializable value) {
420        if (value instanceof StateDiff) {
421            Serializable old = state.get(key);
422            if (old == null) {
423                old = new State(true); // thread-safe
424                state.put(key, old);
425                // enter the next if
426            }
427            if (!(old instanceof State)) {
428                throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
429            }
430            applyDiff((State) old, (StateDiff) value);
431        } else if (value instanceof ListDiff) {
432            state.put(key, applyDiff(state.get(key), (ListDiff) value));
433        } else if (value instanceof Delta) {
434            Delta delta = (Delta) value;
435            Number oldValue = (Number) state.get(key);
436            Number newValue;
437            if (oldValue == null) {
438                newValue = delta.getFullValue();
439            } else {
440                newValue = delta.add(oldValue);
441            }
442            state.put(key, newValue);
443        } else {
444            state.put(key, StateHelper.deepCopy(value, true)); // thread-safe
445        }
446    }
447
448    /**
449     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
450     * <p>
451     * Uses thread-safe datastructures.
452     */
453    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
454        // internally work on a list
455        // TODO this is costly, use a separate code path for arrays
456        Class<?> arrayComponentType = null;
457        if (listDiff.isArray && value != null) {
458            if (!(value instanceof Object[])) {
459                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
460            }
461            arrayComponentType = ((Object[]) value).getClass().getComponentType();
462            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
463        }
464        if (value == null) {
465            value = new CopyOnWriteArrayList<>();
466        }
467        if (!(value instanceof List)) {
468            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
469        }
470        @SuppressWarnings("unchecked")
471        List<Serializable> list = (List<Serializable>) value;
472        if (listDiff.diff != null) {
473            int i = 0;
474            for (Object diffElem : listDiff.diff) {
475                if (i >= list.size()) {
476                    // TODO log error applying diff to shorter list
477                    break;
478                }
479                if (diffElem instanceof StateDiff) {
480                    applyDiff((State) list.get(i), (StateDiff) diffElem);
481                } else if (diffElem != NOP) {
482                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
483                }
484                i++;
485            }
486        }
487        if (listDiff.rpush != null) {
488            // deepCopy of what we'll add
489            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
490            for (Object v : listDiff.rpush) {
491                add.add(StateHelper.deepCopy(v, true)); // thread-safe
492            }
493            // update CopyOnWriteArrayList in one step
494            list.addAll(add);
495        }
496        // convert back to array if needed
497        if (listDiff.isArray) {
498            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
499        } else {
500            return list.isEmpty() ? null : (Serializable) list;
501        }
502    }
503
504    /* synchronized */
505    @Override
506    public synchronized Lock getLock(String id) {
507        State state = states.get(id);
508        if (state == null) {
509            // document not found
510            throw new DocumentNotFoundException(id);
511        }
512        String owner = (String) state.get(KEY_LOCK_OWNER);
513        if (owner == null) {
514            return null;
515        }
516        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
517        return new Lock(owner, created);
518    }
519
520    /* synchronized */
521    @Override
522    public synchronized Lock setLock(String id, Lock lock) {
523        State state = states.get(id);
524        if (state == null) {
525            // document not found
526            throw new DocumentNotFoundException(id);
527        }
528        String owner = (String) state.get(KEY_LOCK_OWNER);
529        if (owner != null) {
530            // return old lock
531            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
532            return new Lock(owner, created);
533        }
534        state.put(KEY_LOCK_OWNER, lock.getOwner());
535        state.put(KEY_LOCK_CREATED, lock.getCreated());
536        return null;
537    }
538
539    /* synchronized */
540    @Override
541    public synchronized Lock removeLock(String id, String owner) {
542        State state = states.get(id);
543        if (state == null) {
544            // document not found
545            throw new DocumentNotFoundException(id);
546        }
547        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
548        if (oldOwner == null) {
549            // no previous lock
550            return null;
551        }
552        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
553        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
554            // existing mismatched lock, flag failure
555            return new Lock(oldOwner, oldCreated, true);
556        }
557        // remove lock
558        state.put(KEY_LOCK_OWNER, null);
559        state.put(KEY_LOCK_CREATED, null);
560        // return old lock
561        return new Lock(oldOwner, oldCreated);
562    }
563
564    @Override
565    public void closeLockManager() {
566    }
567
568    @Override
569    public void clearLockManagerCaches() {
570    }
571
572    protected List<List<String>> binaryPaths;
573
574    @Override
575    protected void initBlobsPaths() {
576        MemBlobFinder finder = new MemBlobFinder();
577        finder.visit();
578        binaryPaths = finder.binaryPaths;
579    }
580
581    protected static class MemBlobFinder extends BlobFinder {
582        protected List<List<String>> binaryPaths = new ArrayList<>();
583
584        @Override
585        protected void recordBlobPath() {
586            binaryPaths.add(new ArrayList<>(path));
587        }
588    }
589
590    @Override
591    public void markReferencedBinaries() {
592        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
593        for (State state : states.values()) {
594            for (List<String> path : binaryPaths) {
595                markReferencedBinaries(state, path, 0, blobManager);
596            }
597        }
598    }
599
600    protected void markReferencedBinaries(State state, List<String> path, int start, DocumentBlobManager blobManager) {
601        for (int i = start; i < path.size(); i++) {
602            String name = path.get(i);
603            Serializable value = state.get(name);
604            if (value instanceof State) {
605                state = (State) value;
606            } else {
607                if (value instanceof List) {
608                    @SuppressWarnings("unchecked")
609                    List<Object> list = (List<Object>) value;
610                    for (Object v : list) {
611                        if (v instanceof State) {
612                            markReferencedBinaries((State) v, path, i + 1, blobManager);
613                        } else {
614                            markReferencedBinary(v, blobManager);
615                        }
616                    }
617                }
618                state = null;
619                break;
620            }
621        }
622        if (state != null) {
623            Serializable data = state.get(KEY_BLOB_DATA);
624            markReferencedBinary(data, blobManager);
625        }
626    }
627
628    protected void markReferencedBinary(Object value, DocumentBlobManager blobManager) {
629        if (!(value instanceof String)) {
630            return;
631        }
632        String key = (String) value;
633        blobManager.markReferencedBinary(key, repositoryName);
634    }
635
636}