001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static java.lang.Boolean.TRUE;
022import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID;
023import static org.nuxeo.ecm.core.storage.State.NOP;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_BLOB_DATA;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_IS_PROXY;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
029import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
030import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
031import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_IDS;
032import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PROXY_TARGET_ID;
033
034import java.io.Serializable;
035import java.lang.reflect.Array;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.Calendar;
039import java.util.Collections;
040import java.util.List;
041import java.util.Map;
042import java.util.Map.Entry;
043import java.util.Set;
044import java.util.UUID;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.CopyOnWriteArrayList;
047import java.util.concurrent.atomic.AtomicLong;
048
049import javax.resource.spi.ConnectionManager;
050
051import org.apache.commons.logging.Log;
052import org.apache.commons.logging.LogFactory;
053import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
054import org.nuxeo.ecm.core.api.DocumentNotFoundException;
055import org.nuxeo.ecm.core.api.Lock;
056import org.nuxeo.ecm.core.api.NuxeoException;
057import org.nuxeo.ecm.core.api.PartialList;
058import org.nuxeo.ecm.core.api.ScrollResult;
059import org.nuxeo.ecm.core.api.ScrollResultImpl;
060import org.nuxeo.ecm.core.api.model.Delta;
061import org.nuxeo.ecm.core.blob.BlobManager;
062import org.nuxeo.ecm.core.model.LockManager;
063import org.nuxeo.ecm.core.model.Repository;
064import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
065import org.nuxeo.ecm.core.storage.State;
066import org.nuxeo.ecm.core.storage.State.ListDiff;
067import org.nuxeo.ecm.core.storage.State.StateDiff;
068import org.nuxeo.ecm.core.storage.StateHelper;
069import org.nuxeo.ecm.core.storage.dbs.DBSDocument;
070import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
071import org.nuxeo.ecm.core.storage.dbs.DBSRepositoryBase;
072import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
073import org.nuxeo.runtime.api.Framework;
074
075/**
076 * In-memory implementation of a {@link Repository}.
077 * <p>
078 * Internally, the repository is a map from id to document object.
079 * <p>
080 * A document object is a JSON-like document stored as a Map recursively containing the data, see {@link DBSDocument}
081 * for the description of the document.
082 *
083 * @since 5.9.4
084 */
085public class MemRepository extends DBSRepositoryBase {
086
087    private static final Log log = LogFactory.getLog(MemRepository.class);
088
089    protected static final String NOSCROLL_ID = "noscroll";
090
091    // for debug
092    private final AtomicLong temporaryIdCounter = new AtomicLong(0);
093
094    /**
095     * The content of the repository, a map of document id -> object.
096     */
097    protected Map<String, State> states;
098
099    public MemRepository(ConnectionManager cm, MemRepositoryDescriptor descriptor) {
100        super(cm, descriptor.name, descriptor);
101        initRepository();
102    }
103
104    @Override
105    public List<IdType> getAllowedIdTypes() {
106        return Collections.singletonList(IdType.varchar);
107    }
108
109    @Override
110    public void shutdown() {
111        super.shutdown();
112        states = null;
113    }
114
115    protected void initRepository() {
116        states = new ConcurrentHashMap<>();
117        initRoot();
118    }
119
120    @Override
121    public String generateNewId() {
122        if (DEBUG_UUIDS) {
123            return "UUID_" + temporaryIdCounter.incrementAndGet();
124        } else {
125            return UUID.randomUUID().toString();
126        }
127    }
128
129    @Override
130    public State readState(String id) {
131        State state = states.get(id);
132        if (state != null) {
133            if (log.isTraceEnabled()) {
134                log.trace("Mem: READ  " + id + ": " + state);
135            }
136        }
137        return state;
138    }
139
140    @Override
141    public List<State> readStates(List<String> ids) {
142        List<State> list = new ArrayList<>();
143        for (String id : ids) {
144            list.add(readState(id));
145        }
146        return list;
147    }
148
149    @Override
150    public void createState(State state) {
151        String id = (String) state.get(KEY_ID);
152        if (log.isTraceEnabled()) {
153            log.trace("Mem: CREATE " + id + ": " + state);
154        }
155        if (states.containsKey(id)) {
156            throw new NuxeoException("Already exists: " + id);
157        }
158        state = StateHelper.deepCopy(state, true); // thread-safe
159        StateHelper.resetDeltas(state);
160        states.put(id, state);
161    }
162
163    @Override
164    public void updateState(String id, StateDiff diff) {
165        if (log.isTraceEnabled()) {
166            log.trace("Mem: UPDATE " + id + ": " + diff);
167        }
168        State state = states.get(id);
169        if (state == null) {
170            throw new ConcurrentUpdateException("Missing: " + id);
171        }
172        applyDiff(state, diff);
173    }
174
175    @Override
176    public void deleteStates(Set<String> ids) {
177        if (log.isTraceEnabled()) {
178            log.trace("Mem: REMOVE " + ids);
179        }
180        for (String id : ids) {
181            if (states.remove(id) == null) {
182                log.debug("Missing on remove: " + id);
183            }
184        }
185    }
186
187    @Override
188    public State readChildState(String parentId, String name, Set<String> ignored) {
189        // TODO optimize by maintaining a parent/child index
190        for (State state : states.values()) {
191            if (ignored.contains(state.get(KEY_ID))) {
192                continue;
193            }
194            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
195                continue;
196            }
197            if (!name.equals(state.get(KEY_NAME))) {
198                continue;
199            }
200            return state;
201        }
202        return null;
203    }
204
205    @Override
206    public boolean hasChild(String parentId, String name, Set<String> ignored) {
207        return readChildState(parentId, name, ignored) != null;
208    }
209
210    @Override
211    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
212        if (log.isTraceEnabled()) {
213            log.trace("Mem: QUERY " + key + " = " + value);
214        }
215        List<State> list = new ArrayList<>();
216        for (State state : states.values()) {
217            String id = (String) state.get(KEY_ID);
218            if (ignored.contains(id)) {
219                continue;
220            }
221            if (!value.equals(state.get(key))) {
222                continue;
223            }
224            list.add(state);
225        }
226        if (log.isTraceEnabled() && !list.isEmpty()) {
227            log.trace("Mem:    -> " + list.size());
228        }
229        return list;
230    }
231
232    @Override
233    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
234        if (log.isTraceEnabled()) {
235            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
236        }
237        List<State> list = new ArrayList<>();
238        for (State state : states.values()) {
239            String id = (String) state.get(KEY_ID);
240            if (ignored.contains(id)) {
241                continue;
242            }
243            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
244                continue;
245            }
246            list.add(state);
247        }
248        if (log.isTraceEnabled() && !list.isEmpty()) {
249            log.trace("Mem:    -> " + list.size());
250        }
251        return list;
252    }
253
254    @Override
255    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
256            Map<String, Object[]> targetProxies) {
257        if (log.isTraceEnabled()) {
258            log.trace("Mem: QUERY " + key + " = " + value);
259        }
260        STATE: for (State state : states.values()) {
261            Object[] array = (Object[]) state.get(key);
262            String id = (String) state.get(KEY_ID);
263            if (array != null) {
264                for (Object v : array) {
265                    if (value.equals(v)) {
266                        ids.add(id);
267                        if (proxyTargets != null && TRUE.equals(state.get(KEY_IS_PROXY))) {
268                            String targetId = (String) state.get(KEY_PROXY_TARGET_ID);
269                            proxyTargets.put(id, targetId);
270                        }
271                        if (targetProxies != null) {
272                            Object[] proxyIds = (Object[]) state.get(KEY_PROXY_IDS);
273                            if (proxyIds != null) {
274                                targetProxies.put(id, proxyIds);
275                            }
276                        }
277                        continue STATE;
278                    }
279                }
280            }
281        }
282        if (log.isTraceEnabled() && !ids.isEmpty()) {
283            log.trace("Mem:    -> " + ids.size());
284        }
285    }
286
287    @Override
288    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
289        if (log.isTraceEnabled()) {
290            log.trace("Mem: QUERY " + key + " = " + value);
291        }
292        for (State state : states.values()) {
293            String id = (String) state.get(KEY_ID);
294            if (ignored.contains(id)) {
295                continue;
296            }
297            if (value.equals(state.get(key))) {
298                if (log.isTraceEnabled()) {
299                    log.trace("Mem:    -> present");
300                }
301                return true;
302            }
303        }
304        if (log.isTraceEnabled()) {
305            log.trace("Mem:    -> absent");
306        }
307        return false;
308    }
309
310    @Override
311    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
312            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
313        if (log.isTraceEnabled()) {
314            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
315        }
316        evaluator.parse();
317        List<Map<String, Serializable>> projections = new ArrayList<>();
318        for (State state : states.values()) {
319            List<Map<String, Serializable>> matches = evaluator.matches(state);
320            if (!matches.isEmpty()) {
321                if (distinctDocuments) {
322                    projections.add(matches.get(0));
323                } else {
324                    projections.addAll(matches);
325                }
326            }
327        }
328        // ORDER BY
329        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
330        if (orderByClause != null) {
331            Collections.sort(projections, new OrderByComparator(orderByClause));
332        }
333        // LIMIT / OFFSET
334        int totalSize = projections.size();
335        if (countUpTo == -1) {
336            // count full size
337        } else if (countUpTo == 0) {
338            // no count
339            totalSize = -1; // not counted
340        } else {
341            // count only if less than countUpTo
342            if (totalSize > countUpTo) {
343                totalSize = -2; // truncated
344            }
345        }
346        if (limit != 0) {
347            int size = projections.size();
348            projections.subList(0, offset > size ? size : offset).clear();
349            size = projections.size();
350            if (limit < size) {
351                projections.subList(limit, size).clear();
352            }
353        }
354        // TODO DISTINCT
355
356        if (log.isTraceEnabled() && !projections.isEmpty()) {
357            log.trace("Mem:    -> " + projections.size());
358        }
359        return new PartialList<>(projections, totalSize);
360    }
361
362    @Override
363    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
364        if (log.isTraceEnabled()) {
365            log.trace("Mem: QUERY " + evaluator);
366        }
367        evaluator.parse();
368        List<String> ids = new ArrayList<>();
369        for (State state : states.values()) {
370            List<Map<String, Serializable>> matches = evaluator.matches(state);
371            if (!matches.isEmpty()) {
372                String id = matches.get(0).get(ECM_UUID).toString();
373                ids.add(id);
374            }
375        }
376        return new ScrollResultImpl(NOSCROLL_ID, ids);
377    }
378
379    @Override
380    public ScrollResult scroll(String scrollId) {
381        if (NOSCROLL_ID.equals(scrollId)) {
382            // Id are already in memory, they are returned as a single batch
383            return ScrollResultImpl.emptyResult();
384        }
385        throw new NuxeoException("Unknown or timed out scrollId");
386    }
387
388    /**
389     * Applies a {@link StateDiff} in-place onto a base {@link State}.
390     * <p>
391     * Uses thread-safe datastructures.
392     */
393    public static void applyDiff(State state, StateDiff stateDiff) {
394        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
395            String key = en.getKey();
396            Serializable diffElem = en.getValue();
397            if (diffElem instanceof StateDiff) {
398                Serializable old = state.get(key);
399                if (old == null) {
400                    old = new State(true); // thread-safe
401                    state.put(key, old);
402                    // enter the next if
403                }
404                if (!(old instanceof State)) {
405                    throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
406                }
407                applyDiff((State) old, (StateDiff) diffElem);
408            } else if (diffElem instanceof ListDiff) {
409                state.put(key, applyDiff(state.get(key), (ListDiff) diffElem));
410            } else if (diffElem instanceof Delta) {
411                Delta delta = (Delta) diffElem;
412                Number oldValue = (Number) state.get(key);
413                Number value;
414                if (oldValue == null) {
415                    value = delta.getFullValue();
416                } else {
417                    value = delta.add(oldValue);
418                }
419                state.put(key, value);
420            } else {
421                state.put(key, StateHelper.deepCopy(diffElem, true)); // thread-safe
422            }
423        }
424    }
425
426    /**
427     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
428     * <p>
429     * Uses thread-safe datastructures.
430     */
431    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
432        // internally work on a list
433        // TODO this is costly, use a separate code path for arrays
434        Class<?> arrayComponentType = null;
435        if (listDiff.isArray && value != null) {
436            if (!(value instanceof Object[])) {
437                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
438            }
439            arrayComponentType = ((Object[]) value).getClass().getComponentType();
440            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
441        }
442        if (value == null) {
443            value = new CopyOnWriteArrayList<>();
444        }
445        if (!(value instanceof List)) {
446            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
447        }
448        @SuppressWarnings("unchecked")
449        List<Serializable> list = (List<Serializable>) value;
450        if (listDiff.diff != null) {
451            int i = 0;
452            for (Object diffElem : listDiff.diff) {
453                if (i >= list.size()) {
454                    // TODO log error applying diff to shorter list
455                    break;
456                }
457                if (diffElem instanceof StateDiff) {
458                    applyDiff((State) list.get(i), (StateDiff) diffElem);
459                } else if (diffElem != NOP) {
460                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
461                }
462                i++;
463            }
464        }
465        if (listDiff.rpush != null) {
466            // deepCopy of what we'll add
467            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
468            for (Object v : listDiff.rpush) {
469                add.add(StateHelper.deepCopy(v, true)); // thread-safe
470            }
471            // update CopyOnWriteArrayList in one step
472            list.addAll(add);
473        }
474        // convert back to array if needed
475        if (listDiff.isArray) {
476            return list.isEmpty() ? null : list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
477        } else {
478            return list.isEmpty() ? null : (Serializable) list;
479        }
480    }
481
482    /* synchronized */
483    @Override
484    public synchronized Lock getLock(String id) {
485        State state = states.get(id);
486        if (state == null) {
487            // document not found
488            throw new DocumentNotFoundException(id);
489        }
490        String owner = (String) state.get(KEY_LOCK_OWNER);
491        if (owner == null) {
492            return null;
493        }
494        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
495        return new Lock(owner, created);
496    }
497
498    /* synchronized */
499    @Override
500    public synchronized Lock setLock(String id, Lock lock) {
501        State state = states.get(id);
502        if (state == null) {
503            // document not found
504            throw new DocumentNotFoundException(id);
505        }
506        String owner = (String) state.get(KEY_LOCK_OWNER);
507        if (owner != null) {
508            // return old lock
509            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
510            return new Lock(owner, created);
511        }
512        state.put(KEY_LOCK_OWNER, lock.getOwner());
513        state.put(KEY_LOCK_CREATED, lock.getCreated());
514        return null;
515    }
516
517    /* synchronized */
518    @Override
519    public synchronized Lock removeLock(String id, String owner) {
520        State state = states.get(id);
521        if (state == null) {
522            // document not found
523            throw new DocumentNotFoundException(id);
524        }
525        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
526        if (oldOwner == null) {
527            // no previous lock
528            return null;
529        }
530        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
531        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
532            // existing mismatched lock, flag failure
533            return new Lock(oldOwner, oldCreated, true);
534        }
535        // remove lock
536        state.put(KEY_LOCK_OWNER, null);
537        state.put(KEY_LOCK_CREATED, null);
538        // return old lock
539        return new Lock(oldOwner, oldCreated);
540    }
541
542    @Override
543    public void closeLockManager() {
544    }
545
546    @Override
547    public void clearLockManagerCaches() {
548    }
549
550    protected List<List<String>> binaryPaths;
551
552    @Override
553    protected void initBlobsPaths() {
554        MemBlobFinder finder = new MemBlobFinder();
555        finder.visit();
556        binaryPaths = finder.binaryPaths;
557    }
558
559    protected static class MemBlobFinder extends BlobFinder {
560        protected List<List<String>> binaryPaths = new ArrayList<>();
561
562        @Override
563        protected void recordBlobPath() {
564            binaryPaths.add(new ArrayList<>(path));
565        }
566    }
567
568    @Override
569    public void markReferencedBinaries() {
570        BlobManager blobManager = Framework.getService(BlobManager.class);
571        for (State state : states.values()) {
572            for (List<String> path : binaryPaths) {
573                markReferencedBinaries(state, path, 0, blobManager);
574            }
575        }
576    }
577
578    protected void markReferencedBinaries(State state, List<String> path, int start, BlobManager blobManager) {
579        for (int i = start; i < path.size(); i++) {
580            String name = path.get(i);
581            Serializable value = state.get(name);
582            if (value instanceof State) {
583                state = (State) value;
584            } else {
585                if (value instanceof List) {
586                    @SuppressWarnings("unchecked")
587                    List<Object> list = (List<Object>) value;
588                    for (Object v : list) {
589                        if (v instanceof State) {
590                            markReferencedBinaries((State) v, path, i + 1, blobManager);
591                        } else {
592                            markReferencedBinary(v, blobManager);
593                        }
594                    }
595                }
596                state = null;
597                break;
598            }
599        }
600        if (state != null) {
601            Serializable data = state.get(KEY_BLOB_DATA);
602            markReferencedBinary(data, blobManager);
603        }
604    }
605
606    protected void markReferencedBinary(Object value, BlobManager blobManager) {
607        if (!(value instanceof String)) {
608            return;
609        }
610        String key = (String) value;
611        blobManager.markReferencedBinary(key, repositoryName);
612    }
613
614}