001/*
002 * (C) Copyright 2014-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.mem;
020
021import static org.nuxeo.ecm.core.query.sql.NXQL.ECM_UUID;
022import static org.nuxeo.ecm.core.storage.State.NOP;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ANCESTOR_IDS;
024import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
025import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_CREATED;
026import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_LOCK_OWNER;
027import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
028import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
029
030import java.io.Serializable;
031import java.lang.reflect.Array;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Calendar;
035import java.util.Collection;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Objects;
040import java.util.Set;
041import java.util.concurrent.CopyOnWriteArrayList;
042import java.util.stream.Stream;
043
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
047import org.nuxeo.ecm.core.api.DocumentNotFoundException;
048import org.nuxeo.ecm.core.api.Lock;
049import org.nuxeo.ecm.core.api.NuxeoException;
050import org.nuxeo.ecm.core.api.PartialList;
051import org.nuxeo.ecm.core.api.ScrollResult;
052import org.nuxeo.ecm.core.api.ScrollResultImpl;
053import org.nuxeo.ecm.core.api.lock.LockManager;
054import org.nuxeo.ecm.core.api.model.Delta;
055import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
056import org.nuxeo.ecm.core.storage.State;
057import org.nuxeo.ecm.core.storage.State.ListDiff;
058import org.nuxeo.ecm.core.storage.State.StateDiff;
059import org.nuxeo.ecm.core.storage.StateHelper;
060import org.nuxeo.ecm.core.storage.dbs.DBSConnection;
061import org.nuxeo.ecm.core.storage.dbs.DBSConnectionBase;
062import org.nuxeo.ecm.core.storage.dbs.DBSExpressionEvaluator;
063import org.nuxeo.ecm.core.storage.dbs.DBSSession.OrderByComparator;
064import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
065
066/**
067 * In-memory implementation of a {@link DBSConnection}.
068 *
069 * @since 11.1 (introduced in 5.9.4 as MemRepository)
070 */
071public class MemConnection extends DBSConnectionBase {
072
073    private static final Log log = LogFactory.getLog(MemRepository.class);
074
075    protected static final String NOSCROLL_ID = "noscroll";
076
077    // the global state, from the repository (thread-safe map)
078    protected Map<String, State> states;
079
080    public MemConnection(MemRepository repository) {
081        super(repository);
082        states = repository.states;
083    }
084
085    @Override
086    public void begin() {
087        // nothing
088    }
089
090    @Override
091    public void commit() {
092        // nothing
093    }
094
095    @Override
096    public void rollback() {
097        // nothing
098    }
099
100    protected void initRepository() {
101        initRoot();
102    }
103
104    @Override
105    public void close() {
106        // nothing
107    }
108
109    @Override
110    public String generateNewId() {
111        return ((MemRepository) repository).generateNewId();
112    }
113
114    @Override
115    public State readState(String id) {
116        return readPartialState(id, null);
117    }
118
119    @Override
120    public State readPartialState(String id, Collection<String> keys) {
121        if (id == null) {
122            return null;
123        }
124        State state = states.get(id);
125        if (state != null) {
126            if (keys != null && !keys.isEmpty()) {
127                State partialState = new State();
128                for (String key : keys) {
129                    Serializable value = state.get(key);
130                    if (value != null) {
131                        partialState.put(key, value);
132                    }
133                }
134                state = partialState;
135            }
136            if (log.isTraceEnabled()) {
137                log.trace("Mem: READ  " + id + ": " + state);
138            }
139        }
140        return state;
141    }
142
143    @Override
144    public List<State> readStates(List<String> ids) {
145        List<State> list = new ArrayList<>();
146        for (String id : ids) {
147            list.add(readState(id));
148        }
149        return list;
150    }
151
152    @Override
153    public void createState(State state) {
154        String id = (String) state.get(KEY_ID);
155        if (log.isTraceEnabled()) {
156            log.trace("Mem: CREATE " + id + ": " + state);
157        }
158        if (states.containsKey(id)) {
159            throw new NuxeoException("Already exists: " + id);
160        }
161        state = StateHelper.deepCopy(state, true); // thread-safe
162        StateHelper.resetDeltas(state);
163        states.put(id, state);
164    }
165
166    @Override
167    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
168        if (log.isTraceEnabled()) {
169            log.trace("Mem: UPDATE " + id + ": " + diff);
170        }
171        State state = states.get(id);
172        if (state == null) {
173            throw new ConcurrentUpdateException("Missing: " + id);
174        }
175        synchronized (state) {
176            // synchronization needed for atomic change token
177            if (changeTokenUpdater != null) {
178                for (Entry<String, Serializable> en : changeTokenUpdater.getConditions().entrySet()) {
179                    if (!Objects.equals(state.get(en.getKey()), en.getValue())) {
180                        throw new ConcurrentUpdateException((String) state.get(KEY_ID));
181                    }
182                }
183                for (Entry<String, Serializable> en : changeTokenUpdater.getUpdates().entrySet()) {
184                    applyDiff(state, en.getKey(), en.getValue());
185                }
186            }
187            applyDiff(state, diff);
188        }
189    }
190
191    @Override
192    public void deleteStates(Set<String> ids) {
193        if (log.isTraceEnabled()) {
194            log.trace("Mem: REMOVE " + ids);
195        }
196        for (String id : ids) {
197            if (states.remove(id) == null) {
198                log.debug("Missing on remove: " + id);
199            }
200        }
201    }
202
203    @Override
204    public State readChildState(String parentId, String name, Set<String> ignored) {
205        // TODO optimize by maintaining a parent/child index
206        for (State state : states.values()) {
207            if (ignored.contains(state.get(KEY_ID))) {
208                continue;
209            }
210            if (!parentId.equals(state.get(KEY_PARENT_ID))) {
211                continue;
212            }
213            if (!name.equals(state.get(KEY_NAME))) {
214                continue;
215            }
216            return state;
217        }
218        return null;
219    }
220
221    @Override
222    public boolean hasChild(String parentId, String name, Set<String> ignored) {
223        return readChildState(parentId, name, ignored) != null;
224    }
225
226    @Override
227    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
228        if (log.isTraceEnabled()) {
229            log.trace("Mem: QUERY " + key + " = " + value);
230        }
231        List<State> list = new ArrayList<>();
232        for (State state : states.values()) {
233            String id = (String) state.get(KEY_ID);
234            if (ignored.contains(id)) {
235                continue;
236            }
237            if (!value.equals(state.get(key))) {
238                continue;
239            }
240            list.add(state);
241        }
242        if (log.isTraceEnabled() && !list.isEmpty()) {
243            log.trace("Mem:    -> " + list.size());
244        }
245        return list;
246    }
247
248    @Override
249    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
250        if (log.isTraceEnabled()) {
251            log.trace("Mem: QUERY " + key1 + " = " + value1 + " AND " + key2 + " = " + value2);
252        }
253        List<State> list = new ArrayList<>();
254        for (State state : states.values()) {
255            String id = (String) state.get(KEY_ID);
256            if (ignored.contains(id)) {
257                continue;
258            }
259            if (!(value1.equals(state.get(key1)) && value2.equals(state.get(key2)))) {
260                continue;
261            }
262            list.add(state);
263        }
264        if (log.isTraceEnabled() && !list.isEmpty()) {
265            log.trace("Mem:    -> " + list.size());
266        }
267        return list;
268    }
269
270    @Override
271    public Stream<State> getDescendants(String rootId, Set<String> keys) {
272        return getDescendants(rootId, keys, 0);
273    }
274
275    @Override
276    public Stream<State> getDescendants(String rootId, Set<String> keys, int limit) {
277        if (log.isTraceEnabled()) {
278            log.trace("Mem: QUERY " + KEY_ANCESTOR_IDS + " = " + rootId);
279        }
280        Stream<State> stream = states.values() //
281                                     .stream()
282                                     .filter(state -> hasAncestor(state, rootId));
283        if (limit != 0) {
284            stream = stream.limit(limit);
285        }
286        return stream;
287    }
288
289    protected static boolean hasAncestor(State state, String id) {
290        Object[] array = (Object[]) state.get(KEY_ANCESTOR_IDS);
291        return array == null ? false : Arrays.asList(array).contains(id);
292    }
293
294    @Override
295    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
296        if (log.isTraceEnabled()) {
297            log.trace("Mem: QUERY " + key + " = " + value);
298        }
299        for (State state : states.values()) {
300            String id = (String) state.get(KEY_ID);
301            if (ignored.contains(id)) {
302                continue;
303            }
304            if (value.equals(state.get(key))) {
305                if (log.isTraceEnabled()) {
306                    log.trace("Mem:    -> present");
307                }
308                return true;
309            }
310        }
311        if (log.isTraceEnabled()) {
312            log.trace("Mem:    -> absent");
313        }
314        return false;
315    }
316
317    @Override
318    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
319            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
320        if (log.isTraceEnabled()) {
321            log.trace("Mem: QUERY " + evaluator + " OFFSET " + offset + " LIMIT " + limit);
322        }
323        evaluator.parse();
324        List<Map<String, Serializable>> projections = new ArrayList<>();
325        for (State state : states.values()) {
326            List<Map<String, Serializable>> matches = evaluator.matches(state);
327            if (!matches.isEmpty()) {
328                if (distinctDocuments) {
329                    projections.add(matches.get(0));
330                } else {
331                    projections.addAll(matches);
332                }
333            }
334        }
335        // ORDER BY
336        // orderByClause may be null and different from evaluator.getOrderByClause() in case we want to post-filter
337        if (orderByClause != null) {
338            projections.sort(new OrderByComparator(orderByClause));
339        }
340        // LIMIT / OFFSET
341        int totalSize = projections.size();
342        if (countUpTo == -1) {
343            // count full size
344        } else if (countUpTo == 0) {
345            // no count
346            totalSize = -1; // not counted
347        } else {
348            // count only if less than countUpTo
349            if (totalSize > countUpTo) {
350                totalSize = -2; // truncated
351            }
352        }
353        if (limit != 0) {
354            int size = projections.size();
355            projections.subList(0, offset > size ? size : offset).clear();
356            size = projections.size();
357            if (limit < size) {
358                projections.subList(limit, size).clear();
359            }
360        }
361        // TODO DISTINCT
362
363        if (log.isTraceEnabled() && !projections.isEmpty()) {
364            log.trace("Mem:    -> " + projections.size());
365        }
366        return new PartialList<>(projections, totalSize);
367    }
368
369    @Override
370    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
371        if (log.isTraceEnabled()) {
372            log.trace("Mem: QUERY " + evaluator);
373        }
374        evaluator.parse();
375        List<String> ids = new ArrayList<>();
376        for (State state : states.values()) {
377            List<Map<String, Serializable>> matches = evaluator.matches(state);
378            if (!matches.isEmpty()) {
379                String id = matches.get(0).get(ECM_UUID).toString();
380                ids.add(id);
381            }
382        }
383        return new ScrollResultImpl<>(NOSCROLL_ID, ids);
384    }
385
386    @Override
387    public ScrollResult<String> scroll(String scrollId) {
388        if (NOSCROLL_ID.equals(scrollId)) {
389            // Id are already in memory, they are returned as a single batch
390            return ScrollResultImpl.emptyResult();
391        }
392        throw new NuxeoException("Unknown or timed out scrollId");
393    }
394
395    /**
396     * Applies a {@link StateDiff} in-place onto a base {@link State}.
397     * <p>
398     * Uses thread-safe datastructures.
399     */
400    public static void applyDiff(State state, StateDiff stateDiff) {
401        for (Entry<String, Serializable> en : stateDiff.entrySet()) {
402            applyDiff(state, en.getKey(), en.getValue());
403        }
404    }
405
406    /**
407     * Applies a key/value diff in-place onto a base {@link State}.
408     * <p>
409     * Uses thread-safe datastructures.
410     */
411    protected static void applyDiff(State state, String key, Serializable value) {
412        if (value instanceof StateDiff) {
413            Serializable old = state.get(key);
414            if (old == null) {
415                old = new State(true); // thread-safe
416                state.put(key, old);
417                // enter the next if
418            }
419            if (!(old instanceof State)) {
420                throw new UnsupportedOperationException("Cannot apply StateDiff on non-State: " + old);
421            }
422            applyDiff((State) old, (StateDiff) value);
423        } else if (value instanceof ListDiff) {
424            state.put(key, applyDiff(state.get(key), (ListDiff) value));
425        } else if (value instanceof Delta) {
426            Delta delta = (Delta) value;
427            Number oldValue = (Number) state.get(key);
428            Number newValue;
429            if (oldValue == null) {
430                newValue = delta.getFullValue();
431            } else {
432                newValue = delta.add(oldValue);
433            }
434            state.put(key, newValue);
435        } else {
436            state.put(key, StateHelper.deepCopy(value, true)); // thread-safe
437        }
438    }
439
440    /**
441     * Applies a {@link ListDiff} onto an array or {@link List}, and returns the resulting value.
442     * <p>
443     * Uses thread-safe datastructures.
444     */
445    public static Serializable applyDiff(Serializable value, ListDiff listDiff) {
446        // internally work on a list
447        // TODO this is costly, use a separate code path for arrays
448        Class<?> arrayComponentType = null;
449        if (listDiff.isArray && value != null) {
450            if (!(value instanceof Object[])) {
451                throw new UnsupportedOperationException("Cannot apply ListDiff on non-array: " + value);
452            }
453            arrayComponentType = ((Object[]) value).getClass().getComponentType();
454            value = new CopyOnWriteArrayList<>(Arrays.asList((Object[]) value));
455        }
456        if (value == null) {
457            value = new CopyOnWriteArrayList<>();
458        }
459        if (!(value instanceof List)) {
460            throw new UnsupportedOperationException("Cannot apply ListDiff on non-List: " + value);
461        }
462        @SuppressWarnings("unchecked")
463        List<Serializable> list = (List<Serializable>) value;
464        if (listDiff.diff != null) {
465            int i = 0;
466            for (Object diffElem : listDiff.diff) {
467                if (i >= list.size()) {
468                    // TODO log error applying diff to shorter list
469                    break;
470                }
471                if (diffElem instanceof StateDiff) {
472                    applyDiff((State) list.get(i), (StateDiff) diffElem);
473                } else if (diffElem != NOP) {
474                    list.set(i, StateHelper.deepCopy(diffElem, true)); // thread-safe
475                }
476                i++;
477            }
478        }
479        if (listDiff.rpush != null) {
480            // deepCopy of what we'll add
481            List<Serializable> add = new ArrayList<>(listDiff.rpush.size());
482            for (Object v : listDiff.rpush) {
483                add.add(StateHelper.deepCopy(v, true)); // thread-safe
484            }
485            // update CopyOnWriteArrayList in one step
486            list.addAll(add);
487        }
488        if (listDiff.pull != null) {
489            list.removeAll(listDiff.pull);
490        }
491        if (list.isEmpty()) {
492            return null;
493        }
494        // convert back to array if needed
495        if (listDiff.isArray) {
496            if (arrayComponentType == null) {
497                // initial value was null (empty) so we couldn't get the type
498                // instead get the type from the first new element (added through rpush)
499                arrayComponentType = list.get(0).getClass();
500            }
501            return list.toArray((Object[]) Array.newInstance(arrayComponentType, list.size()));
502        } else {
503            return (Serializable) list;
504        }
505    }
506
507    /* synchronized */
508    @Override
509    public synchronized Lock getLock(String id) {
510        State state = states.get(id);
511        if (state == null) {
512            // document not found
513            throw new DocumentNotFoundException(id);
514        }
515        String owner = (String) state.get(KEY_LOCK_OWNER);
516        if (owner == null) {
517            return null;
518        }
519        Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
520        return new Lock(owner, created);
521    }
522
523    /* synchronized */
524    @Override
525    public synchronized Lock setLock(String id, Lock lock) {
526        State state = states.get(id);
527        if (state == null) {
528            // document not found
529            throw new DocumentNotFoundException(id);
530        }
531        String owner = (String) state.get(KEY_LOCK_OWNER);
532        if (owner != null) {
533            // return old lock
534            Calendar created = (Calendar) state.get(KEY_LOCK_CREATED);
535            return new Lock(owner, created);
536        }
537        state.put(KEY_LOCK_OWNER, lock.getOwner());
538        state.put(KEY_LOCK_CREATED, lock.getCreated());
539        return null;
540    }
541
542    /* synchronized */
543    @Override
544    public synchronized Lock removeLock(String id, String owner) {
545        State state = states.get(id);
546        if (state == null) {
547            // document not found
548            throw new DocumentNotFoundException(id);
549        }
550        String oldOwner = (String) state.get(KEY_LOCK_OWNER);
551        if (oldOwner == null) {
552            // no previous lock
553            return null;
554        }
555        Calendar oldCreated = (Calendar) state.get(KEY_LOCK_CREATED);
556        if (!LockManager.canLockBeRemoved(oldOwner, owner)) {
557            // existing mismatched lock, flag failure
558            return new Lock(oldOwner, oldCreated, true);
559        }
560        // remove lock
561        state.put(KEY_LOCK_OWNER, null);
562        state.put(KEY_LOCK_CREATED, null);
563        // return old lock
564        return new Lock(oldOwner, oldCreated);
565    }
566
567    @Override
568    public List<State> queryKeyValueWithOperator(String key1, Object value1, String key2, DBSQueryOperator operator,
569            Object value2, Set<String> ignored) {
570        throw new UnsupportedOperationException();
571    }
572
573}