001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
032
033import java.io.Serializable;
034import java.lang.reflect.Array;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Calendar;
038import java.util.Collections;
039import java.util.List;
040import java.util.Map;
041import java.util.Map.Entry;
042import java.util.Set;
043import java.util.UUID;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.atomic.AtomicLong;
047
048import javax.resource.spi.ConnectionManager;
049
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
053import org.nuxeo.ecm.core.api.DocumentNotFoundException;
054import org.nuxeo.ecm.core.api.Lock;
055import org.nuxeo.ecm.core.api.NuxeoException;
056import org.nuxeo.ecm.core.api.PartialList;
057import org.nuxeo.ecm.core.api.model.Delta;
058import org.nuxeo.ecm.core.blob.BlobManager;
059import org.nuxeo.ecm.core.model.LockManager;
060import org.nuxeo.ecm.core.model.Repository;
061import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
062import org.nuxeo.ecm.core.storage.State;
063import org.nuxeo.ecm.core.storage.State.ListDiff;
064import org.nuxeo.ecm.core.storage.State.StateDiff;
065import org.nuxeo.ecm.core.storage.StateHelper;
066import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
067import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
068import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
069import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
070import org.nuxeo.runtime.api.Framework;
071
072/**
073 * In-memory implementation of a {@link Repository}.
074 * <p>
075 * Internally, the repository is a map from id to document object.
076 * <p>
077 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
078 * for the description of the document.
079 *
080 * @since 5.9.4
081 */
082public class MemRepository extends DBSRepositoryBase {
083
084    private static final Log log = LogFactory.getLog(MemRepository.class);
085
086    // for debug
087    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
088
089    /**
090     * The content of the repository, a map of document id -> object.
091     */
092    protected Map<String, State> states;
093
094    public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) {
095        super(cm, descriptor.name, descriptor.getFulltextDescriptor());
096        initRepository();
097    }
098
099    @Override
100    public void shutdown() {
101        super.shutdown();
102        states = null;
103    }
104
105    protected void initRepository() {
106        states = new ConcurrentHashMap<>();
107        initRoot();
108    }
109
110    @Override
111    public String generateNewId() {
112        if (DEBUG_UUIDS) {
113            return "UUID_" + temporaryIdCounter.incrementAndGet();
114        } else {
115            return UUID.randomUUID().toString();
116        }
117    }
118
119    @Override
120    public State readState(String id) {
121        State state = states.get(id);
122        if (state != null) {
123            if (log.isTraceEnabled()) {
124                log.trace("Mem: READ  " + id + ": " + state);
125            }
126        }
127        return state;
128    }
129
130    @Override
131    public List<State> readStates(List<String> ids) {
132        List<State> list = new ArrayList<>();
133        for (String id : ids) {
134            list.add(readState(id));
135        }
136        return list;
137    }
138
139    @Override
140    public void createState(State state) {
141        String id = (String) state.get(KEY_ID);
142        if (log.isTraceEnabled()) {
143            log.trace("Mem: CREATE " + id + ": " + state);
144        }
145        if (states.containsKey(id)) {
146            throw new NuxeoException("Already exists: " + id);
147        }
148        state = StateHelper.deepCopy(state, true); // thread-safe
149        StateHelper.resetDeltas(state);
150        states.put(id, state);
151    }
152
153    @Override
154    public void updateState(String id, StateDiff diff) {
155        if (log.isTraceEnabled()) {
156            log.trace("Mem: UPDATE " + id + ": " + diff);
157        }
158        State state = states.get(id);
159        if (state == null) {
160            throw new ConcurrentUpdateException("Missing: " + id);
161        }
162        applyDiff(state, diff);
163    }
164
165    @Override
166    public void deleteStates(Set<String> ids) {
167        if (log.isTraceEnabled()) {
168            log.trace("Mem: REMOVE " + ids);
169        }
170        for (String id : ids) {
171            if (states.remove(id) == null) {
172                log.debug("Missing on remove: " + id);
173            }
174        }
175    }
176
177    @Override
178    public State readChildState(String parentId, String name, Set<String> ignored) {
179        // TODO optimize by maintaining a parent/child index
180        for (State state : states.values()) {
181            if (ignored.contains(state.get(KEY_ID))) {
182                continue;
183            }
184            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
185                continue;
186            }
187            if (!name.equals(state.get(KEY_NAME))) {
188                continue;
189            }
190            return state;
191        }
192        return null;
193    }
194
195    @Override
196    public boolean hasChild(String parentId, String name, Set<String> ignored) {
197        return readChildState(parentId, name, ignored) != null;
198    }
199
200    @Override
201    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
202        if (log.isTraceEnabled()) {
203            log.trace("Mem: QUERY " + key + " = " + value);
204        }
205        List<State> list = new ArrayList<>();
206        for (State state : states.values()) {
207            String id = (String) state.get(KEY_ID);
208            if (ignored.contains(id)) {
209                continue;
210            }
211            if (!value.equals(state.get(key))) {
212                continue;
213            }
214            list.add(state);
215        }
216        if (log.isTraceEnabled() && !list.isEmpty()) {
217            log.trace("Mem:    -> " + list.size());
218        }
219        return list;
220    }
221
222    @Override
223    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
224        if (log.isTraceEnabled()) {
225            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
226        }
227        List<State> list = new ArrayList<>();
228        for (State state : states.values()) {
229            String id = (String) state.get(KEY_ID);
230            if (ignored.contains(id)) {
231                continue;
232            }
233            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
234                continue;
235            }
236            list.add(state);
237        }
238        if (log.isTraceEnabled() && !list.isEmpty()) {
239            log.trace("Mem:    -> " + list.size());
240        }
241        return list;
242    }
243
244    @Override
245    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
246            Map<String, Object[]> targetProxies) {
247        if (log.isTraceEnabled()) {
248            log.trace("Mem: QUERY " + key + " = " + value);
249        }
250        STATE: for (State state : states.values()) {
251            Object[] array = (Object[]) state.get(key);
252            String id = (String) state.get(KEY_ID);
253            if (array != null) {
254                for (Object v : array) {
255                    if (value.equals(v)) {
256                        ids.add(id);
257                        if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
258                            String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
259                            proxyTargets.put(id, targetId);
260                        }
261                        if (targetProxies != null) {
262                            Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
263                            if (proxyIds != null) {
264                                targetProxies.put(id, proxyIds);
265                            }
266                        }
267                        continue STATE;
268                    }
269                }
270            }
271        }
272        if (log.isTraceEnabled() && !ids.isEmpty()) {
273            log.trace("Mem:    -> " + ids.size());
274        }
275    }
276
277    @Override
278    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
279        if (log.isTraceEnabled()) {
280            log.trace("Mem: QUERY " + key + " = " + value);
281        }
282        for (State state : states.values()) {
283            String id = (String) state.get(KEY_ID);
284            if (ignored.contains(id)) {
285                continue;
286            }
287            if (value.equals(state.get(key))) {
288                if (log.isTraceEnabled()) {
289                    log.trace("Mem:    -> present");
290                }
291                return true;
292            }
293        }
294        if (log.isTraceEnabled()) {
295            log.trace("Mem:    -> absent");
296        }
297        return false;
298    }
299
300    @Override
301    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
302            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
303        if (log.isTraceEnabled()) {
304            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
305        }
306        evaluator.parse();
307        List<Map<String, Serializable>> projections = new ArrayList<>();
308        for (State state : states.values()) {
309            List<Map<String, Serializable>> matches = evaluator.matches(state);
310            if (!matches.isEmpty()) {
311                if (distinctDocuments) {
312                    projections.add(matches.get(0));
313                } else {
314                    projections.addAll(matches);
315                }
316            }
317        }
318        // ORDER BY
319        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
320        if (orderByClause != null) {
321            Collections.sort(projections, new OrderByComparator(orderByClause));
322        }
323        // LIMIT / OFFSET
324        int totalSize = projections.size();
325        if (countUpTo == -1) {
326            // count full size
327        } else if (countUpTo == 0) {
328            // no count
329            totalSize = -1; // not counted
330        } else {
331            // count only if less than countUpTo
332            if (totalSize > countUpTo) {
333                totalSize = -2; // truncated
334            }
335        }
336        if (limit != 0) {
337            int size = projections.size();
338            projections.subList(0, offset > size ? size : offset).clear();
339            size = projections.size();
340            if (limit < size) {
341                projections.subList(limit, size).clear();
342            }
343        }
344        // TODO DISTINCT
345
346        if (log.isTraceEnabled() && !projections.isEmpty()) {
347            log.trace("Mem:    -> " + projections.size());
348        }
349        return new PartialList<>(projections, totalSize);
350    }
351
352    /**
353     * Applies a {@link StateDiff} in-place onto a base {@link State}.
354     * <p>
355     * Uses thread-safe datastructures.
356     */
357    public static void applyDiff(State state, StateDiff stateDiff) {
358        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
359            String key = en.getKey();
360            Serializable diffElem = en.getValue();
361            if (diffElem instanceof StateDiff) {
362                Serializable old = state.get(key);
363                if (old == null) {
364                    old = new State(true); // thread-safe
365                    state.put(key, old);
366                    // enter the next if
367                }
368                if (!(old instanceof State)) {
369                    throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
370                }
371                applyDiff((State) old, (StateDiff) diffElem);
372            } else if (diffElem instanceof ListDiff) {
373                state.put(key, applyDiff(state.get(key), (ListDiff) diffElem));
374            } else if (diffElem instanceof Delta) {
375                Delta delta = (Delta) diffElem;
376                Number oldValue = (Number) state.get(key);
377                Number value;
378                if (oldValue == null) {
379                    value = delta.getFullValue();
380                } else {
381                    value = delta.add(oldValue);
382                }
383                state.put(key, value);
384            } else {
385                state.put(key, StateHelper.deepCopy(diffElem, true)); // thread-safe
386            }
387        }
388    }
389
390    /**
391     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
392     * <p>
393     * Uses thread-safe datastructures.
394     */
395    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
396        // internally work on a list
397        // TODO this is costly, use a separate code path for arrays
398        Class<?> arrayComponentType = null;
399        if (listDiff.isArray && value != null) {
400            if (!(value instanceof Object[])) {
401                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
402            }
403            arrayComponentType = ((Object[]) value).getClass().getComponentType();
404            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
405        }
406        if (value == null) {
407            value = new CopyOnWriteArrayList<>();
408        }
409        if (!(value instanceof List)) {
410            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
411        }
412        @SuppressWarnings("unchecked")
413        List<Serializable> list = (List<Serializable>) value;
414        if (listDiff.diff != null) {
415            int i = 0;
416            for (Object diffElem : listDiff.diff) {
417                if (i >= list.size()) {
418                    // TODO log error applying diff to shorter list
419                    break;
420                }
421                if (diffElem instanceof StateDiff) {
422                    applyDiff((State) list.get(i), (StateDiff) diffElem);
423                } else if (diffElem != NOP) {
424                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
425                }
426                i++;
427            }
428        }
429        if (listDiff.rpush != null) {
430            // deepCopy of what we'll add
431            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
432            for (Object v : listDiff.rpush) {
433                add.add(StateHelper.deepCopy(v, true)); // thread-safe
434            }
435            // update CopyOnWriteArrayList in one step
436            list.addAll(add);
437        }
438        // convert back to array if needed
439        if (listDiff.isArray) {
440            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
441        } else {
442            return list.isEmpty() ? null : (Serializable) list;
443        }
444    }
445
446    /* synchronized */
447    @Override
448    public synchronized Lock getLock(String id) {
449        State state = states.get(id);
450        if (state == null) {
451            // document not found
452            throw new DocumentNotFoundException(id);
453        }
454        String owner = (String) state.get(KEY_LOCK_OWNER);
455        if (owner == null) {
456            return null;
457        }
458        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
459        return new Lock(owner, created);
460    }
461
462    /* synchronized */
463    @Override
464    public synchronized Lock setLock(String id, Lock lock) {
465        State state = states.get(id);
466        if (state == null) {
467            // document not found
468            throw new DocumentNotFoundException(id);
469        }
470        String owner = (String) state.get(KEY_LOCK_OWNER);
471        if (owner != null) {
472            // return old lock
473            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
474            return new Lock(owner, created);
475        }
476        state.put(KEY_LOCK_OWNER, lock.getOwner());
477        state.put(KEY_LOCK_CREATED, lock.getCreated());
478        return null;
479    }
480
481    /* synchronized */
482    @Override
483    public synchronized Lock removeLock(String id, String owner) {
484        State state = states.get(id);
485        if (state == null) {
486            // document not found
487            throw new DocumentNotFoundException(id);
488        }
489        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
490        if (oldOwner == null) {
491            // no previous lock
492            return null;
493        }
494        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
495        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
496            // existing mismatched lock, flag failure
497            return new Lock(oldOwner, oldCreated, true);
498        }
499        // remove lock
500        state.put(KEY_LOCK_OWNER, null);
501        state.put(KEY_LOCK_CREATED, null);
502        // return old lock
503        return new Lock(oldOwner, oldCreated);
504    }
505
506    @Override
507    public void closeLockManager() {
508    }
509
510    @Override
511    public void clearLockManagerCaches() {
512    }
513
514    protected List<List<String>> binaryPaths;
515
516    @Override
517    protected void initBlobsPaths() {
518        MemBlobFinder finder = new MemBlobFinder();
519        finder.visit();
520        binaryPaths = finder.binaryPaths;
521    }
522
523    protected static class MemBlobFinder extends BlobFinder {
524        protected List<List<String>> binaryPaths = new ArrayList<>();
525
526        @Override
527        protected void recordBlobPath() {
528            binaryPaths.add(new ArrayList<>(path));
529        }
530    }
531
532    @Override
533    public void markReferencedBinaries() {
534        BlobManager blobManager = Framework.getService(BlobManager.class);
535        for (State state : states.values()) {
536            for (List<String> path : binaryPaths) {
537                markReferencedBinaries(state, path, 0, blobManager);
538            }
539        }
540    }
541
542    protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) {
543        for (int i = start; i < path.size(); i++) {
544            String name = path.get(i);
545            Serializable value = state.get(name);
546            if (value instanceof State) {
547                state = (State) value;
548            } else {
549                if (value instanceof List) {
550                    @SuppressWarnings("unchecked")
551                    List<Object> list = (List<Object>) value;
552                    for (Object v : list) {
553                        if (v instanceof State) {
554                            markReferencedBinaries((State) v, path, i + 1, blobManager);
555                        } else {
556                            markReferencedBinary(v, blobManager);
557                        }
558                    }
559                }
560                state = null;
561                break;
562            }
563        }
564        if (state != null) {
565            Serializable data = state.get(KEY_BLOB_DATA);
566            markReferencedBinary(data, blobManager);
567        }
568    }
569
570    protected void markReferencedBinary(Object value, BlobManager blobManager) {
571        if (!(value instanceof String)) {
572            return;
573        }
574        String key = (String) value;
575        blobManager.markReferencedBinary(key, repositoryName);
576    }
577
578}