001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
032
033import java.io.Serializable;
034import java.lang.reflect.Array;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Calendar;
038import java.util.Collections;
039import java.util.List;
040import java.util.Map;
041import java.util.Map.Entry;
042import java.util.Set;
043import java.util.UUID;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.atomic.AtomicLong;
047
048import javax.resource.spi.ConnectionManager;
049
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
053import org.nuxeo.ecm.core.api.DocumentNotFoundException;
054import org.nuxeo.ecm.core.api.Lock;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.ecm.core.api.PartialList;
057import org.nuxeo.ecm.core.api.model.Delta;
058import org.nuxeo.ecm.core.blob.BlobManager;
059import org.nuxeo.ecm.core.model.LockManager;
060import org.nuxeo.ecm.core.model.Repository;
061import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
062import org.nuxeo.ecm.core.storage.State;
063import org.nuxeo.ecm.core.storage.State.ListDiff;
064import org.nuxeo.ecm.core.storage.State.StateDiff;
065import org.nuxeo.ecm.core.storage.StateHelper;
066import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
067import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
068import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
069import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase.IdType;
070import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
071import org.nuxeo.runtime.api.Framework;
072
073/**
074 * In-memory implementation of a {@link Repository}.
075 * <p>
076 * Internally, the repository is a map from id to document object.
077 * <p>
078 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
079 * for the description of the document.
080 *
081 * @since 5.9.4
082 */
083public class MemRepository extends DBSRepositoryBase {
084
085    private static final Log log = LogFactory.getLog(MemRepository.class);
086
087    // for debug
088    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
089
090    /**
091     * The content of the repository, a map of document id -> object.
092     */
093    protected Map<String, State> states;
094
095    public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) {
096        super(cm, descriptor.name, descriptor);
097        initRepository();
098    }
099
100    @Override
101    public List<IdType> getAllowedIdTypes() {
102        return Collections.singletonList(IdType.varchar);
103    }
104
105    @Override
106    public void shutdown() {
107        super.shutdown();
108        states = null;
109    }
110
111    protected void initRepository() {
112        states = new ConcurrentHashMap<>();
113        initRoot();
114    }
115
116    @Override
117    public String generateNewId() {
118        if (DEBUG_UUIDS) {
119            return "UUID_" + temporaryIdCounter.incrementAndGet();
120        } else {
121            return UUID.randomUUID().toString();
122        }
123    }
124
125    @Override
126    public State readState(String id) {
127        State state = states.get(id);
128        if (state != null) {
129            if (log.isTraceEnabled()) {
130                log.trace("Mem: READ  " + id + ": " + state);
131            }
132        }
133        return state;
134    }
135
136    @Override
137    public List<State> readStates(List<String> ids) {
138        List<State> list = new ArrayList<>();
139        for (String id : ids) {
140            list.add(readState(id));
141        }
142        return list;
143    }
144
145    @Override
146    public void createState(State state) {
147        String id = (String) state.get(KEY_ID);
148        if (log.isTraceEnabled()) {
149            log.trace("Mem: CREATE " + id + ": " + state);
150        }
151        if (states.containsKey(id)) {
152            throw new NuxeoException("Already exists: " + id);
153        }
154        state = StateHelper.deepCopy(state, true); // thread-safe
155        StateHelper.resetDeltas(state);
156        states.put(id, state);
157    }
158
159    @Override
160    public void updateState(String id, StateDiff diff) {
161        if (log.isTraceEnabled()) {
162            log.trace("Mem: UPDATE " + id + ": " + diff);
163        }
164        State state = states.get(id);
165        if (state == null) {
166            throw new ConcurrentUpdateException("Missing: " + id);
167        }
168        applyDiff(state, diff);
169    }
170
171    @Override
172    public void deleteStates(Set<String> ids) {
173        if (log.isTraceEnabled()) {
174            log.trace("Mem: REMOVE " + ids);
175        }
176        for (String id : ids) {
177            if (states.remove(id) == null) {
178                log.debug("Missing on remove: " + id);
179            }
180        }
181    }
182
183    @Override
184    public State readChildState(String parentId, String name, Set<String> ignored) {
185        // TODO optimize by maintaining a parent/child index
186        for (State state : states.values()) {
187            if (ignored.contains(state.get(KEY_ID))) {
188                continue;
189            }
190            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
191                continue;
192            }
193            if (!name.equals(state.get(KEY_NAME))) {
194                continue;
195            }
196            return state;
197        }
198        return null;
199    }
200
201    @Override
202    public boolean hasChild(String parentId, String name, Set<String> ignored) {
203        return readChildState(parentId, name, ignored) != null;
204    }
205
206    @Override
207    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
208        if (log.isTraceEnabled()) {
209            log.trace("Mem: QUERY " + key + " = " + value);
210        }
211        List<State> list = new ArrayList<>();
212        for (State state : states.values()) {
213            String id = (String) state.get(KEY_ID);
214            if (ignored.contains(id)) {
215                continue;
216            }
217            if (!value.equals(state.get(key))) {
218                continue;
219            }
220            list.add(state);
221        }
222        if (log.isTraceEnabled() && !list.isEmpty()) {
223            log.trace("Mem:    -> " + list.size());
224        }
225        return list;
226    }
227
228    @Override
229    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
230        if (log.isTraceEnabled()) {
231            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
232        }
233        List<State> list = new ArrayList<>();
234        for (State state : states.values()) {
235            String id = (String) state.get(KEY_ID);
236            if (ignored.contains(id)) {
237                continue;
238            }
239            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
240                continue;
241            }
242            list.add(state);
243        }
244        if (log.isTraceEnabled() && !list.isEmpty()) {
245            log.trace("Mem:    -> " + list.size());
246        }
247        return list;
248    }
249
250    @Override
251    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
252            Map<String, Object[]> targetProxies) {
253        if (log.isTraceEnabled()) {
254            log.trace("Mem: QUERY " + key + " = " + value);
255        }
256        STATE: for (State state : states.values()) {
257            Object[] array = (Object[]) state.get(key);
258            String id = (String) state.get(KEY_ID);
259            if (array != null) {
260                for (Object v : array) {
261                    if (value.equals(v)) {
262                        ids.add(id);
263                        if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
264                            String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
265                            proxyTargets.put(id, targetId);
266                        }
267                        if (targetProxies != null) {
268                            Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
269                            if (proxyIds != null) {
270                                targetProxies.put(id, proxyIds);
271                            }
272                        }
273                        continue STATE;
274                    }
275                }
276            }
277        }
278        if (log.isTraceEnabled() && !ids.isEmpty()) {
279            log.trace("Mem:    -> " + ids.size());
280        }
281    }
282
283    @Override
284    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
285        if (log.isTraceEnabled()) {
286            log.trace("Mem: QUERY " + key + " = " + value);
287        }
288        for (State state : states.values()) {
289            String id = (String) state.get(KEY_ID);
290            if (ignored.contains(id)) {
291                continue;
292            }
293            if (value.equals(state.get(key))) {
294                if (log.isTraceEnabled()) {
295                    log.trace("Mem:    -> present");
296                }
297                return true;
298            }
299        }
300        if (log.isTraceEnabled()) {
301            log.trace("Mem:    -> absent");
302        }
303        return false;
304    }
305
306    @Override
307    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
308            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
309        if (log.isTraceEnabled()) {
310            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
311        }
312        evaluator.parse();
313        List<Map<String, Serializable>> projections = new ArrayList<>();
314        for (State state : states.values()) {
315            List<Map<String, Serializable>> matches = evaluator.matches(state);
316            if (!matches.isEmpty()) {
317                if (distinctDocuments) {
318                    projections.add(matches.get(0));
319                } else {
320                    projections.addAll(matches);
321                }
322            }
323        }
324        // ORDER BY
325        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
326        if (orderByClause != null) {
327            Collections.sort(projections, new OrderByComparator(orderByClause));
328        }
329        // LIMIT / OFFSET
330        int totalSize = projections.size();
331        if (countUpTo == -1) {
332            // count full size
333        } else if (countUpTo == 0) {
334            // no count
335            totalSize = -1; // not counted
336        } else {
337            // count only if less than countUpTo
338            if (totalSize > countUpTo) {
339                totalSize = -2; // truncated
340            }
341        }
342        if (limit != 0) {
343            int size = projections.size();
344            projections.subList(0, offset > size ? size : offset).clear();
345            size = projections.size();
346            if (limit < size) {
347                projections.subList(limit, size).clear();
348            }
349        }
350        // TODO DISTINCT
351
352        if (log.isTraceEnabled() && !projections.isEmpty()) {
353            log.trace("Mem:    -> " + projections.size());
354        }
355        return new PartialList<>(projections, totalSize);
356    }
357
358    /**
359     * Applies a {@link StateDiff} in-place onto a base {@link State}.
360     * <p>
361     * Uses thread-safe datastructures.
362     */
363    public static void applyDiff(State state, StateDiff stateDiff) {
364        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
365            String key = en.getKey();
366            Serializable diffElem = en.getValue();
367            if (diffElem instanceof StateDiff) {
368                Serializable old = state.get(key);
369                if (old == null) {
370                    old = new State(true); // thread-safe
371                    state.put(key, old);
372                    // enter the next if
373                }
374                if (!(old instanceof State)) {
375                    throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
376                }
377                applyDiff((State) old, (StateDiff) diffElem);
378            } else if (diffElem instanceof ListDiff) {
379                state.put(key, applyDiff(state.get(key), (ListDiff) diffElem));
380            } else if (diffElem instanceof Delta) {
381                Delta delta = (Delta) diffElem;
382                Number oldValue = (Number) state.get(key);
383                Number value;
384                if (oldValue == null) {
385                    value = delta.getFullValue();
386                } else {
387                    value = delta.add(oldValue);
388                }
389                state.put(key, value);
390            } else {
391                state.put(key, StateHelper.deepCopy(diffElem, true)); // thread-safe
392            }
393        }
394    }
395
396    /**
397     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
398     * <p>
399     * Uses thread-safe datastructures.
400     */
401    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
402        // internally work on a list
403        // TODO this is costly, use a separate code path for arrays
404        Class<?> arrayComponentType = null;
405        if (listDiff.isArray && value != null) {
406            if (!(value instanceof Object[])) {
407                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
408            }
409            arrayComponentType = ((Object[]) value).getClass().getComponentType();
410            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
411        }
412        if (value == null) {
413            value = new CopyOnWriteArrayList<>();
414        }
415        if (!(value instanceof List)) {
416            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
417        }
418        @SuppressWarnings("unchecked")
419        List<Serializable> list = (List<Serializable>) value;
420        if (listDiff.diff != null) {
421            int i = 0;
422            for (Object diffElem : listDiff.diff) {
423                if (i >= list.size()) {
424                    // TODO log error applying diff to shorter list
425                    break;
426                }
427                if (diffElem instanceof StateDiff) {
428                    applyDiff((State) list.get(i), (StateDiff) diffElem);
429                } else if (diffElem != NOP) {
430                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
431                }
432                i++;
433            }
434        }
435        if (listDiff.rpush != null) {
436            // deepCopy of what we'll add
437            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
438            for (Object v : listDiff.rpush) {
439                add.add(StateHelper.deepCopy(v, true)); // thread-safe
440            }
441            // update CopyOnWriteArrayList in one step
442            list.addAll(add);
443        }
444        // convert back to array if needed
445        if (listDiff.isArray) {
446            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
447        } else {
448            return list.isEmpty() ? null : (Serializable) list;
449        }
450    }
451
452    /* synchronized */
453    @Override
454    public synchronized Lock getLock(String id) {
455        State state = states.get(id);
456        if (state == null) {
457            // document not found
458            throw new DocumentNotFoundException(id);
459        }
460        String owner = (String) state.get(KEY_LOCK_OWNER);
461        if (owner == null) {
462            return null;
463        }
464        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
465        return new Lock(owner, created);
466    }
467
468    /* synchronized */
469    @Override
470    public synchronized Lock setLock(String id, Lock lock) {
471        State state = states.get(id);
472        if (state == null) {
473            // document not found
474            throw new DocumentNotFoundException(id);
475        }
476        String owner = (String) state.get(KEY_LOCK_OWNER);
477        if (owner != null) {
478            // return old lock
479            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
480            return new Lock(owner, created);
481        }
482        state.put(KEY_LOCK_OWNER, lock.getOwner());
483        state.put(KEY_LOCK_CREATED, lock.getCreated());
484        return null;
485    }
486
487    /* synchronized */
488    @Override
489    public synchronized Lock removeLock(String id, String owner) {
490        State state = states.get(id);
491        if (state == null) {
492            // document not found
493            throw new DocumentNotFoundException(id);
494        }
495        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
496        if (oldOwner == null) {
497            // no previous lock
498            return null;
499        }
500        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
501        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
502            // existing mismatched lock, flag failure
503            return new Lock(oldOwner, oldCreated, true);
504        }
505        // remove lock
506        state.put(KEY_LOCK_OWNER, null);
507        state.put(KEY_LOCK_CREATED, null);
508        // return old lock
509        return new Lock(oldOwner, oldCreated);
510    }
511
512    @Override
513    public void closeLockManager() {
514    }
515
516    @Override
517    public void clearLockManagerCaches() {
518    }
519
520    protected List<List<String>> binaryPaths;
521
522    @Override
523    protected void initBlobsPaths() {
524        MemBlobFinder finder = new MemBlobFinder();
525        finder.visit();
526        binaryPaths = finder.binaryPaths;
527    }
528
529    protected static class MemBlobFinder extends BlobFinder {
530        protected List<List<String>> binaryPaths = new ArrayList<>();
531
532        @Override
533        protected void recordBlobPath() {
534            binaryPaths.add(new ArrayList<>(path));
535        }
536    }
537
538    @Override
539    public void markReferencedBinaries() {
540        BlobManager blobManager = Framework.getService(BlobManager.class);
541        for (State state : states.values()) {
542            for (List<String> path : binaryPaths) {
543                markReferencedBinaries(state, path, 0, blobManager);
544            }
545        }
546    }
547
548    protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) {
549        for (int i = start; i < path.size(); i++) {
550            String name = path.get(i);
551            Serializable value = state.get(name);
552            if (value instanceof State) {
553                state = (State) value;
554            } else {
555                if (value instanceof List) {
556                    @SuppressWarnings("unchecked")
557                    List<Object> list = (List<Object>) value;
558                    for (Object v : list) {
559                        if (v instanceof State) {
560                            markReferencedBinaries((State) v, path, i + 1, blobManager);
561                        } else {
562                            markReferencedBinary(v, blobManager);
563                        }
564                    }
565                }
566                state = null;
567                break;
568            }
569        }
570        if (state != null) {
571            Serializable data = state.get(KEY_BLOB_DATA);
572            markReferencedBinary(data, blobManager);
573        }
574    }
575
576    protected void markReferencedBinary(Object value, BlobManager blobManager) {
577        if (!(value instanceof String)) {
578            return;
579        }
580        String key = (String) value;
581        blobManager.markReferencedBinary(key, repositoryName);
582    }
583
584}