001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
032
033import java.io.Serializable;
034import java.lang.reflect.Array;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Calendar;
038import java.util.Collections;
039import java.util.List;
040import java.util.Map;
041import java.util.Map.Entry;
042import java.util.Set;
043import java.util.UUID;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.atomic.AtomicLong;
047
048import org.apache.commons.logging.Log;
049import org.apache.commons.logging.LogFactory;
050import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
051import org.nuxeo.ecm.core.api.DocumentNotFoundException;
052import org.nuxeo.ecm.core.api.Lock;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.api.PartialList;
055import org.nuxeo.ecm.core.api.model.Delta;
056import org.nuxeo.ecm.core.blob.BlobManager;
057import org.nuxeo.ecm.core.model.LockManager;
058import org.nuxeo.ecm.core.model.Repository;
059import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
060import org.nuxeo.ecm.core.storage.State;
061import org.nuxeo.ecm.core.storage.State.ListDiff;
062import org.nuxeo.ecm.core.storage.State.StateDiff;
063import org.nuxeo.ecm.core.storage.StateHelper;
064import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
065import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
066import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
067import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
068import org.nuxeo.runtime.api.Framework;
069
070/**
071 * In-memory implementation of a {@link Repository}.
072 * <p>
073 * Internally, the repository is a map from id to document object.
074 * <p>
075 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
076 * for the description of the document.
077 *
078 * @since 5.9.4
079 */
080public class MemRepository extends DBSRepositoryBase {
081
082    private static final Log log = LogFactory.getLog(MemRepository.class);
083
084    // for debug
085    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
086
087    /**
088     * The content of the repository, a map of document id -> object.
089     */
090    protected Map<String, State> states;
091
092    public MemRepository(MemRepositoryDescriptor descriptor) {
093        super(descriptor.name, descriptor.getFulltextDescriptor());
094        initRepository();
095    }
096
097    @Override
098    public void shutdown() {
099        super.shutdown();
100        states = null;
101    }
102
103    protected void initRepository() {
104        states = new ConcurrentHashMap<>();
105        initRoot();
106    }
107
108    @Override
109    public String generateNewId() {
110        if (DEBUG_UUIDS) {
111            return "UUID_" + temporaryIdCounter.incrementAndGet();
112        } else {
113            return UUID.randomUUID().toString();
114        }
115    }
116
117    @Override
118    public State readState(String id) {
119        State state = states.get(id);
120        if (state != null) {
121            if (log.isTraceEnabled()) {
122                log.trace("Mem: READ  " + id + ": " + state);
123            }
124        }
125        return state;
126    }
127
128    @Override
129    public List<State> readStates(List<String> ids) {
130        List<State> list = new ArrayList<>();
131        for (String id : ids) {
132            list.add(readState(id));
133        }
134        return list;
135    }
136
137    @Override
138    public void createState(State state) {
139        String id = (String) state.get(KEY_ID);
140        if (log.isTraceEnabled()) {
141            log.trace("Mem: CREATE " + id + ": " + state);
142        }
143        if (states.containsKey(id)) {
144            throw new NuxeoException("Already exists: " + id);
145        }
146        state = StateHelper.deepCopy(state, true); // thread-safe
147        StateHelper.resetDeltas(state);
148        states.put(id, state);
149    }
150
151    @Override
152    public void updateState(String id, StateDiff diff) {
153        if (log.isTraceEnabled()) {
154            log.trace("Mem: UPDATE " + id + ": " + diff);
155        }
156        State state = states.get(id);
157        if (state == null) {
158            throw new ConcurrentUpdateException("Missing: " + id);
159        }
160        applyDiff(state, diff);
161    }
162
163    @Override
164    public void deleteStates(Set<String> ids) {
165        if (log.isTraceEnabled()) {
166            log.trace("Mem: REMOVE " + ids);
167        }
168        for (String id : ids) {
169            if (states.remove(id) == null) {
170                log.debug("Missing on remove: " + id);
171            }
172        }
173    }
174
175    @Override
176    public State readChildState(String parentId, String name, Set<String> ignored) {
177        // TODO optimize by maintaining a parent/child index
178        for (State state : states.values()) {
179            if (ignored.contains(state.get(KEY_ID))) {
180                continue;
181            }
182            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
183                continue;
184            }
185            if (!name.equals(state.get(KEY_NAME))) {
186                continue;
187            }
188            return state;
189        }
190        return null;
191    }
192
193    @Override
194    public boolean hasChild(String parentId, String name, Set<String> ignored) {
195        return readChildState(parentId, name, ignored) != null;
196    }
197
198    @Override
199    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
200        if (log.isTraceEnabled()) {
201            log.trace("Mem: QUERY " + key + " = " + value);
202        }
203        List<State> list = new ArrayList<>();
204        for (State state : states.values()) {
205            String id = (String) state.get(KEY_ID);
206            if (ignored.contains(id)) {
207                continue;
208            }
209            if (!value.equals(state.get(key))) {
210                continue;
211            }
212            list.add(state);
213        }
214        if (log.isTraceEnabled() && !list.isEmpty()) {
215            log.trace("Mem:    -> " + list.size());
216        }
217        return list;
218    }
219
220    @Override
221    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
222        if (log.isTraceEnabled()) {
223            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
224        }
225        List<State> list = new ArrayList<>();
226        for (State state : states.values()) {
227            String id = (String) state.get(KEY_ID);
228            if (ignored.contains(id)) {
229                continue;
230            }
231            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
232                continue;
233            }
234            list.add(state);
235        }
236        if (log.isTraceEnabled() && !list.isEmpty()) {
237            log.trace("Mem:    -> " + list.size());
238        }
239        return list;
240    }
241
242    @Override
243    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
244            Map<String, Object[]> targetProxies) {
245        if (log.isTraceEnabled()) {
246            log.trace("Mem: QUERY " + key + " = " + value);
247        }
248        STATE: for (State state : states.values()) {
249            Object[] array = (Object[]) state.get(key);
250            String id = (String) state.get(KEY_ID);
251            if (array != null) {
252                for (Object v : array) {
253                    if (value.equals(v)) {
254                        ids.add(id);
255                        if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
256                            String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
257                            proxyTargets.put(id, targetId);
258                        }
259                        if (targetProxies != null) {
260                            Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
261                            if (proxyIds != null) {
262                                targetProxies.put(id, proxyIds);
263                            }
264                        }
265                        continue STATE;
266                    }
267                }
268            }
269        }
270        if (log.isTraceEnabled() && !ids.isEmpty()) {
271            log.trace("Mem:    -> " + ids.size());
272        }
273    }
274
275    @Override
276    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
277        if (log.isTraceEnabled()) {
278            log.trace("Mem: QUERY " + key + " = " + value);
279        }
280        for (State state : states.values()) {
281            String id = (String) state.get(KEY_ID);
282            if (ignored.contains(id)) {
283                continue;
284            }
285            if (value.equals(state.get(key))) {
286                if (log.isTraceEnabled()) {
287                    log.trace("Mem:    -> present");
288                }
289                return true;
290            }
291        }
292        if (log.isTraceEnabled()) {
293            log.trace("Mem:    -> absent");
294        }
295        return false;
296    }
297
298    @Override
299    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
300            OrderByClause orderByClause, int limit, int offset, int countUpTo) {
301        if (log.isTraceEnabled()) {
302            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
303        }
304        evaluator.parse();
305        boolean wildcardProjection = evaluator.hasWildcardProjection();
306        List<Map<String, Serializable>> projections = new ArrayList<>();
307        for (State state : states.values()) {
308            List<Map<String, Serializable>> matches = evaluator.matches(state);
309            if (!matches.isEmpty()) {
310                if (wildcardProjection) {
311                    // all projections are relevant
312                    projections.addAll(matches);
313                } else {
314                    // no wildcard in projection, all projections are identical, keep the first
315                    projections.add(matches.get(0));
316                }
317            }
318        }
319        // ORDER BY
320        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
321        if (orderByClause != null) {
322            Collections.sort(projections, new OrderByComparator(orderByClause));
323        }
324        // LIMIT / OFFSET
325        int totalSize = projections.size();
326        if (countUpTo == -1) {
327            // count full size
328        } else if (countUpTo == 0) {
329            // no count
330            totalSize = -1; // not counted
331        } else {
332            // count only if less than countUpTo
333            if (totalSize > countUpTo) {
334                totalSize = -2; // truncated
335            }
336        }
337        if (limit != 0) {
338            int size = projections.size();
339            projections.subList(0, offset > size ? size : offset).clear();
340            size = projections.size();
341            if (limit < size) {
342                projections.subList(limit, size).clear();
343            }
344        }
345        // TODO DISTINCT
346
347        if (log.isTraceEnabled() && !projections.isEmpty()) {
348            log.trace("Mem:    -> " + projections.size());
349        }
350        return new PartialList<>(projections, totalSize);
351    }
352
353    /**
354     * Applies a {@link StateDiff} in-place onto a base {@link State}.
355     * <p>
356     * Uses thread-safe datastructures.
357     */
358    public static void applyDiff(State state, StateDiff stateDiff) {
359        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
360            String key = en.getKey();
361            Serializable diffElem = en.getValue();
362            if (diffElem instanceof StateDiff) {
363                Serializable old = state.get(key);
364                if (old == null) {
365                    old = new State(true); // thread-safe
366                    state.put(key, old);
367                    // enter the next if
368                }
369                if (!(old instanceof State)) {
370                    throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
371                }
372                applyDiff((State) old, (StateDiff) diffElem);
373            } else if (diffElem instanceof ListDiff) {
374                state.put(key, applyDiff(state.get(key), (ListDiff) diffElem));
375            } else if (diffElem instanceof Delta) {
376                Delta delta = (Delta) diffElem;
377                Number oldValue = (Number) state.get(key);
378                Number value;
379                if (oldValue == null) {
380                    value = delta.getFullValue();
381                } else {
382                    value = delta.add(oldValue);
383                }
384                state.put(key, value);
385            } else {
386                state.put(key, diffElem);
387            }
388        }
389    }
390
391    /**
392     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
393     * <p>
394     * Uses thread-safe datastructures.
395     */
396    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
397        // internally work on a list
398        // TODO this is costly, use a separate code path for arrays
399        Class<?> arrayComponentType = null;
400        if (listDiff.isArray && value != null) {
401            if (!(value instanceof Object[])) {
402                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
403            }
404            arrayComponentType = ((Object[]) value).getClass().getComponentType();
405            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
406        }
407        if (value == null) {
408            value = new CopyOnWriteArrayList<>();
409        }
410        if (!(value instanceof List)) {
411            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
412        }
413        @SuppressWarnings("unchecked")
414        List<Serializable> list = (List<Serializable>) value;
415        if (listDiff.diff != null) {
416            int i = 0;
417            for (Object diffElem : listDiff.diff) {
418                if (i >= list.size()) {
419                    // TODO log error applying diff to shorter list
420                    break;
421                }
422                if (diffElem instanceof StateDiff) {
423                    applyDiff((State) list.get(i), (StateDiff) diffElem);
424                } else if (diffElem != NOP) {
425                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
426                }
427                i++;
428            }
429        }
430        if (listDiff.rpush != null) {
431            // deepCopy of what we'll add
432            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
433            for (Object v : listDiff.rpush) {
434                add.add(StateHelper.deepCopy(v, true)); // thread-safe
435            }
436            // update CopyOnWriteArrayList in one step
437            list.addAll(add);
438        }
439        // convert back to array if needed
440        if (listDiff.isArray) {
441            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
442        } else {
443            return list.isEmpty() ? null : (Serializable) list;
444        }
445    }
446
447    /* synchronized */
448    @Override
449    public synchronized Lock getLock(String id) {
450        State state = states.get(id);
451        if (state == null) {
452            // document not found
453            throw new DocumentNotFoundException(id);
454        }
455        String owner = (String) state.get(KEY_LOCK_OWNER);
456        if (owner == null) {
457            return null;
458        }
459        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
460        return new Lock(owner, created);
461    }
462
463    /* synchronized */
464    @Override
465    public synchronized Lock setLock(String id, Lock lock) {
466        State state = states.get(id);
467        if (state == null) {
468            // document not found
469            throw new DocumentNotFoundException(id);
470        }
471        String owner = (String) state.get(KEY_LOCK_OWNER);
472        if (owner != null) {
473            // return old lock
474            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
475            return new Lock(owner, created);
476        }
477        state.put(KEY_LOCK_OWNER, lock.getOwner());
478        state.put(KEY_LOCK_CREATED, lock.getCreated());
479        return null;
480    }
481
482    /* synchronized */
483    @Override
484    public synchronized Lock removeLock(String id, String owner) {
485        State state = states.get(id);
486        if (state == null) {
487            // document not found
488            throw new DocumentNotFoundException(id);
489        }
490        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
491        if (oldOwner == null) {
492            // no previous lock
493            return null;
494        }
495        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
496        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
497            // existing mismatched lock, flag failure
498            return new Lock(oldOwner, oldCreated, true);
499        }
500        // remove lock
501        state.put(KEY_LOCK_OWNER, null);
502        state.put(KEY_LOCK_CREATED, null);
503        // return old lock
504        return new Lock(oldOwner, oldCreated);
505    }
506
507    @Override
508    public void closeLockManager() {
509    }
510
511    @Override
512    public void clearLockManagerCaches() {
513    }
514
515    protected List<List<String>> binaryPaths;
516
517    @Override
518    protected void initBlobsPaths() {
519        MemBlobFinder finder = new MemBlobFinder();
520        finder.visit();
521        binaryPaths = finder.binaryPaths;
522    }
523
524    protected static class MemBlobFinder extends BlobFinder {
525        protected List<List<String>> binaryPaths = new ArrayList<>();
526
527        @Override
528        protected void recordBlobPath() {
529            binaryPaths.add(new ArrayList<>(path));
530        }
531    }
532
533    @Override
534    public void markReferencedBinaries() {
535        BlobManager blobManager = Framework.getService(BlobManager.class);
536        for (State state : states.values()) {
537            for (List<String> path : binaryPaths) {
538                markReferencedBinaries(state, path, 0, blobManager);
539            }
540        }
541    }
542
543    protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) {
544        for (int i = start; i < path.size(); i++) {
545            String name = path.get(i);
546            Serializable value = state.get(name);
547            if (value instanceof State) {
548                state = (State) value;
549            } else {
550                if (value instanceof List) {
551                    @SuppressWarnings("unchecked")
552                    List<Object> list = (List<Object>) value;
553                    for (Object v : list) {
554                        if (v instanceof State) {
555                            markReferencedBinaries((State) v, path, i + 1, blobManager);
556                        } else {
557                            markReferencedBinary(v, blobManager);
558                        }
559                    }
560                }
561                state = null;
562                break;
563            }
564        }
565        if (state != null) {
566            Serializable data = state.get(KEY_BLOB_DATA);
567            markReferencedBinary(data, blobManager);
568        }
569    }
570
571    protected void markReferencedBinary(Object value, BlobManager blobManager) {
572        if (!(value instanceof String)) {
573            return;
574        }
575        String key = (String) value;
576        blobManager.markReferencedBinary(key, repositoryName);
577    }
578
579}