001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
032
033import java.io.Serializable;
034import java.lang.reflect.Array;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Calendar;
038import java.util.Collections;
039import java.util.List;
040import java.util.Map;
041import java.util.Map.Entry;
042import java.util.Set;
043import java.util.UUID;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.atomic.AtomicLong;
047
048import org.apache.commons.logging.Log;
049import org.apache.commons.logging.LogFactory;
050import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
051import org.nuxeo.ecm.core.api.DocumentNotFoundException;
052import org.nuxeo.ecm.core.api.Lock;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.api.PartialList;
055import org.nuxeo.ecm.core.api.model.Delta;
056import org.nuxeo.ecm.core.blob.BlobManager;
057import org.nuxeo.ecm.core.model.LockManager;
058import org.nuxeo.ecm.core.model.Repository;
059import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
060import org.nuxeo.ecm.core.storage.State;
061import org.nuxeo.ecm.core.storage.State.ListDiff;
062import org.nuxeo.ecm.core.storage.State.StateDiff;
063import org.nuxeo.ecm.core.storage.StateHelper;
064import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
065import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
066import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
067import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
068import org.nuxeo.runtime.api.Framework;
069
070/**
071 * In-memory implementation of a {@link Repository}.
072 * <p>
073 * Internally, the repository is a map from id to document object.
074 * <p>
075 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
076 * for the description of the document.
077 *
078 * @since 5.9.4
079 */
080public class MemRepository extends DBSRepositoryBase {
081
082    private static final Log log = LogFactory.getLog(MemRepository.class);
083
084    // for debug
085    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
086
087    /**
088     * The content of the repository, a map of document id -> object.
089     */
090    protected Map<String, State> states;
091
092    public MemRepository(MemRepositoryDescriptor descriptor) {
093        super(descriptor.name, descriptor.getFulltextDescriptor());
094        initRepository();
095    }
096
097    @Override
098    public void shutdown() {
099        super.shutdown();
100        states = null;
101    }
102
103    protected void initRepository() {
104        states = new ConcurrentHashMap<>();
105        initRoot();
106    }
107
108    @Override
109    public String generateNewId() {
110        if (DEBUG_UUIDS) {
111            return "UUID_" + temporaryIdCounter.incrementAndGet();
112        } else {
113            return UUID.randomUUID().toString();
114        }
115    }
116
117    @Override
118    public State readState(String id) {
119        State state = states.get(id);
120        if (state != null) {
121            if (log.isTraceEnabled()) {
122                log.trace("Mem: READ  " + id + ": " + state);
123            }
124        }
125        return state;
126    }
127
128    @Override
129    public List<State> readStates(List<String> ids) {
130        List<State> list = new ArrayList<>();
131        for (String id : ids) {
132            list.add(readState(id));
133        }
134        return list;
135    }
136
137    @Override
138    public void createState(State state) {
139        String id = (String) state.get(KEY_ID);
140        if (log.isTraceEnabled()) {
141            log.trace("Mem: CREATE " + id + ": " + state);
142        }
143        if (states.containsKey(id)) {
144            throw new NuxeoException("Already exists: " + id);
145        }
146        state = StateHelper.deepCopy(state, true); // thread-safe
147        StateHelper.resetDeltas(state);
148        states.put(id, state);
149    }
150
151    @Override
152    public void updateState(String id, StateDiff diff) {
153        if (log.isTraceEnabled()) {
154            log.trace("Mem: UPDATE " + id + ": " + diff);
155        }
156        State state = states.get(id);
157        if (state == null) {
158            throw new ConcurrentUpdateException("Missing: " + id);
159        }
160        applyDiff(state, diff);
161    }
162
163    @Override
164    public void deleteStates(Set<String> ids) {
165        if (log.isTraceEnabled()) {
166            log.trace("Mem: REMOVE " + ids);
167        }
168        for (String id : ids) {
169            if (states.remove(id) == null) {
170                log.debug("Missing on remove: " + id);
171            }
172        }
173    }
174
175    @Override
176    public State readChildState(String parentId, String name, Set<String> ignored) {
177        // TODO optimize by maintaining a parent/child index
178        for (State state : states.values()) {
179            if (ignored.contains(state.get(KEY_ID))) {
180                continue;
181            }
182            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
183                continue;
184            }
185            if (!name.equals(state.get(KEY_NAME))) {
186                continue;
187            }
188            return state;
189        }
190        return null;
191    }
192
193    @Override
194    public boolean hasChild(String parentId, String name, Set<String> ignored) {
195        return readChildState(parentId, name, ignored) != null;
196    }
197
198    @Override
199    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
200        if (log.isTraceEnabled()) {
201            log.trace("Mem: QUERY " + key + " = " + value);
202        }
203        List<State> list = new ArrayList<>();
204        for (State state : states.values()) {
205            String id = (String) state.get(KEY_ID);
206            if (ignored.contains(id)) {
207                continue;
208            }
209            if (!value.equals(state.get(key))) {
210                continue;
211            }
212            list.add(state);
213        }
214        if (log.isTraceEnabled() && !list.isEmpty()) {
215            log.trace("Mem:    -> " + list.size());
216        }
217        return list;
218    }
219
220    @Override
221    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
222        if (log.isTraceEnabled()) {
223            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
224        }
225        List<State> list = new ArrayList<>();
226        for (State state : states.values()) {
227            String id = (String) state.get(KEY_ID);
228            if (ignored.contains(id)) {
229                continue;
230            }
231            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
232                continue;
233            }
234            list.add(state);
235        }
236        if (log.isTraceEnabled() && !list.isEmpty()) {
237            log.trace("Mem:    -> " + list.size());
238        }
239        return list;
240    }
241
242    @Override
243    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
244            Map<String, Object[]> targetProxies) {
245        if (log.isTraceEnabled()) {
246            log.trace("Mem: QUERY " + key + " = " + value);
247        }
248        STATE: for (State state : states.values()) {
249            Object[] array = (Object[]) state.get(key);
250            String id = (String) state.get(KEY_ID);
251            if (array != null) {
252                for (Object v : array) {
253                    if (value.equals(v)) {
254                        ids.add(id);
255                        if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
256                            String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
257                            proxyTargets.put(id, targetId);
258                        }
259                        if (targetProxies != null) {
260                            Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
261                            if (proxyIds != null) {
262                                targetProxies.put(id, proxyIds);
263                            }
264                        }
265                        continue STATE;
266                    }
267                }
268            }
269        }
270        if (log.isTraceEnabled() && !ids.isEmpty()) {
271            log.trace("Mem:    -> " + ids.size());
272        }
273    }
274
275    @Override
276    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
277        if (log.isTraceEnabled()) {
278            log.trace("Mem: QUERY " + key + " = " + value);
279        }
280        for (State state : states.values()) {
281            String id = (String) state.get(KEY_ID);
282            if (ignored.contains(id)) {
283                continue;
284            }
285            if (value.equals(state.get(key))) {
286                if (log.isTraceEnabled()) {
287                    log.trace("Mem:    -> present");
288                }
289                return true;
290            }
291        }
292        if (log.isTraceEnabled()) {
293            log.trace("Mem:    -> absent");
294        }
295        return false;
296    }
297
298    @Override
299    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
300            OrderByClause orderByClause, boolean selectDocuments, int limit, int offset, int countUpTo) {
301        if (log.isTraceEnabled()) {
302            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
303        }
304        evaluator.parse();
305        List<Map<String, Serializable>> projections = new ArrayList<>();
306        for (State state : states.values()) {
307            List<Map<String, Serializable>> matches = evaluator.matches(state);
308            if (!matches.isEmpty()) {
309                if (selectDocuments) {
310                    projections.add(matches.get(0));
311                } else {
312                    projections.addAll(matches);
313                }
314            }
315        }
316        // ORDER BY
317        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
318        if (orderByClause != null) {
319            Collections.sort(projections, new OrderByComparator(orderByClause));
320        }
321        // LIMIT / OFFSET
322        int totalSize = projections.size();
323        if (countUpTo == -1) {
324            // count full size
325        } else if (countUpTo == 0) {
326            // no count
327            totalSize = -1; // not counted
328        } else {
329            // count only if less than countUpTo
330            if (totalSize > countUpTo) {
331                totalSize = -2; // truncated
332            }
333        }
334        if (limit != 0) {
335            int size = projections.size();
336            projections.subList(0, offset > size ? size : offset).clear();
337            size = projections.size();
338            if (limit < size) {
339                projections.subList(limit, size).clear();
340            }
341        }
342        // TODO DISTINCT
343
344        if (log.isTraceEnabled() && !projections.isEmpty()) {
345            log.trace("Mem:    -> " + projections.size());
346        }
347        return new PartialList<>(projections, totalSize);
348    }
349
350    /**
351     * Applies a {@link StateDiff} in-place onto a base {@link State}.
352     * <p>
353     * Uses thread-safe datastructures.
354     */
355    public static void applyDiff(State state, StateDiff stateDiff) {
356        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
357            String key = en.getKey();
358            Serializable diffElem = en.getValue();
359            if (diffElem instanceof StateDiff) {
360                Serializable old = state.get(key);
361                if (old == null) {
362                    old = new State(true); // thread-safe
363                    state.put(key, old);
364                    // enter the next if
365                }
366                if (!(old instanceof State)) {
367                    throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
368                }
369                applyDiff((State) old, (StateDiff) diffElem);
370            } else if (diffElem instanceof ListDiff) {
371                state.put(key, applyDiff(state.get(key), (ListDiff) diffElem));
372            } else if (diffElem instanceof Delta) {
373                Delta delta = (Delta) diffElem;
374                Number oldValue = (Number) state.get(key);
375                Number value;
376                if (oldValue == null) {
377                    value = delta.getFullValue();
378                } else {
379                    value = delta.add(oldValue);
380                }
381                state.put(key, value);
382            } else {
383                state.put(key, diffElem);
384            }
385        }
386    }
387
388    /**
389     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
390     * <p>
391     * Uses thread-safe datastructures.
392     */
393    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
394        // internally work on a list
395        // TODO this is costly, use a separate code path for arrays
396        Class<?> arrayComponentType = null;
397        if (listDiff.isArray && value != null) {
398            if (!(value instanceof Object[])) {
399                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
400            }
401            arrayComponentType = ((Object[]) value).getClass().getComponentType();
402            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
403        }
404        if (value == null) {
405            value = new CopyOnWriteArrayList<>();
406        }
407        if (!(value instanceof List)) {
408            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
409        }
410        @SuppressWarnings("unchecked")
411        List<Serializable> list = (List<Serializable>) value;
412        if (listDiff.diff != null) {
413            int i = 0;
414            for (Object diffElem : listDiff.diff) {
415                if (i >= list.size()) {
416                    // TODO log error applying diff to shorter list
417                    break;
418                }
419                if (diffElem instanceof StateDiff) {
420                    applyDiff((State) list.get(i), (StateDiff) diffElem);
421                } else if (diffElem != NOP) {
422                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
423                }
424                i++;
425            }
426        }
427        if (listDiff.rpush != null) {
428            // deepCopy of what we'll add
429            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
430            for (Object v : listDiff.rpush) {
431                add.add(StateHelper.deepCopy(v, true)); // thread-safe
432            }
433            // update CopyOnWriteArrayList in one step
434            list.addAll(add);
435        }
436        // convert back to array if needed
437        if (listDiff.isArray) {
438            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
439        } else {
440            return list.isEmpty() ? null : (Serializable) list;
441        }
442    }
443
444    /* synchronized */
445    @Override
446    public synchronized Lock getLock(String id) {
447        State state = states.get(id);
448        if (state == null) {
449            // document not found
450            throw new DocumentNotFoundException(id);
451        }
452        String owner = (String) state.get(KEY_LOCK_OWNER);
453        if (owner == null) {
454            return null;
455        }
456        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
457        return new Lock(owner, created);
458    }
459
460    /* synchronized */
461    @Override
462    public synchronized Lock setLock(String id, Lock lock) {
463        State state = states.get(id);
464        if (state == null) {
465            // document not found
466            throw new DocumentNotFoundException(id);
467        }
468        String owner = (String) state.get(KEY_LOCK_OWNER);
469        if (owner != null) {
470            // return old lock
471            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
472            return new Lock(owner, created);
473        }
474        state.put(KEY_LOCK_OWNER, lock.getOwner());
475        state.put(KEY_LOCK_CREATED, lock.getCreated());
476        return null;
477    }
478
479    /* synchronized */
480    @Override
481    public synchronized Lock removeLock(String id, String owner) {
482        State state = states.get(id);
483        if (state == null) {
484            // document not found
485            throw new DocumentNotFoundException(id);
486        }
487        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
488        if (oldOwner == null) {
489            // no previous lock
490            return null;
491        }
492        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
493        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
494            // existing mismatched lock, flag failure
495            return new Lock(oldOwner, oldCreated, true);
496        }
497        // remove lock
498        state.put(KEY_LOCK_OWNER, null);
499        state.put(KEY_LOCK_CREATED, null);
500        // return old lock
501        return new Lock(oldOwner, oldCreated);
502    }
503
504    @Override
505    public void closeLockManager() {
506    }
507
508    @Override
509    public void clearLockManagerCaches() {
510    }
511
512    protected List<List<String>> binaryPaths;
513
514    @Override
515    protected void initBlobsPaths() {
516        MemBlobFinder finder = new MemBlobFinder();
517        finder.visit();
518        binaryPaths = finder.binaryPaths;
519    }
520
521    protected static class MemBlobFinder extends BlobFinder {
522        protected List<List<String>> binaryPaths = new ArrayList<>();
523
524        @Override
525        protected void recordBlobPath() {
526            binaryPaths.add(new ArrayList<>(path));
527        }
528    }
529
530    @Override
531    public void markReferencedBinaries() {
532        BlobManager blobManager = Framework.getService(BlobManager.class);
533        for (State state : states.values()) {
534            for (List<String> path : binaryPaths) {
535                markReferencedBinaries(state, path, 0, blobManager);
536            }
537        }
538    }
539
540    protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) {
541        for (int i = start; i < path.size(); i++) {
542            String name = path.get(i);
543            Serializable value = state.get(name);
544            if (value instanceof State) {
545                state = (State) value;
546            } else {
547                if (value instanceof List) {
548                    @SuppressWarnings("unchecked")
549                    List<Object> list = (List<Object>) value;
550                    for (Object v : list) {
551                        if (v instanceof State) {
552                            markReferencedBinaries((State) v, path, i + 1, blobManager);
553                        } else {
554                            markReferencedBinary(v, blobManager);
555                        }
556                    }
557                }
558                state = null;
559                break;
560            }
561        }
562        if (state != null) {
563            Serializable data = state.get(KEY_BLOB_DATA);
564            markReferencedBinary(data, blobManager);
565        }
566    }
567
568    protected void markReferencedBinary(Object value, BlobManager blobManager) {
569        if (!(value instanceof String)) {
570            return;
571        }
572        String key = (String) value;
573        blobManager.markReferencedBinary(key, repositoryName);
574    }
575
576}