001/*
002 * (C) Copyright 2016-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.dbs;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.stream.Stream;
032
033import org.nuxeo.ecm.core.api.Lock;
034import org.nuxeo.ecm.core.api.PartialList;
035import org.nuxeo.ecm.core.api.ScrollResult;
036import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
037import org.nuxeo.ecm.core.storage.State;
038import org.nuxeo.ecm.core.storage.State.StateDiff;
039import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
040
041import com.google.common.cache.Cache;
042import com.google.common.collect.ImmutableMap;
043
044/**
045 * The DBS Cache layer used to cache some method call of real repository
046 *
047 * @since 11.1 (introduced in 8.10 as DBSCachingRepository)
048 */
049public class DBSCachingConnection implements DBSConnection {
050
051    protected final DBSConnection connection;
052
053    private final Cache<String, State> cache;
054
055    private final Cache<String, String> childCache;
056
057    /**
058     * The local invalidations, due to writes to this connection, that should be propagated to other connections (and
059     * other cluster nodes) at post-commit time.
060     * <p>
061     * {@code null} if the repository is not transactional and there is no cluster.
062     */
063    private final DBSInvalidations invalidations;
064
065    /**
066     * The cluster invalidator, sending invalidations to other cluster nodes.
067     * <p>
068     * {@code null} if there is no cluster.
069     */
070    private final DBSClusterInvalidator clusterInvalidator;
071
072    /**
073     * The queue of invalidations received from other connections, to be processed at pre-transaction time.
074     * <p>
075     * {@code null} if the repository is not transactional.
076     */
077    private final DBSInvalidationsQueue invalidationsQueue;
078
079    /**
080     * The propagator of invalidations to other connections.
081     * <p>
082     * {@code null} if the repository is not transactional.
083     */
084    private final DBSInvalidationsPropagator invalidationsPropagator;
085
086    public DBSCachingConnection(DBSConnection connection, DBSCachingRepository repository) {
087        this.connection = connection;
088        // Init caches
089        if (repository.supportsTransactions()) {
090            // connection-local cache
091            cache = repository.newCache(false);
092            childCache = repository.newChildCache(false);
093        } else {
094            // no transaction, use a repository-wide cache
095            cache = repository.getCache();
096            childCache = repository.getChildCache();
097        }
098        // local invalidations
099        invalidationsPropagator = repository.getInvalidationsPropagator();
100        if (invalidationsPropagator == null) {
101            invalidationsQueue = null;
102        } else {
103            invalidationsQueue = new DBSInvalidationsQueue("dbs-" + this);
104            invalidationsPropagator.addQueue(invalidationsQueue);
105        }
106        // cluster invalidations
107        clusterInvalidator = repository.getClusterInvalidator();
108        // collected invalidations
109        if (invalidationsPropagator == null && clusterInvalidator == null) {
110            // no transactional backend and no cluster
111            invalidations = null;
112        } else {
113            invalidations = new DBSInvalidations();
114        }
115    }
116
117    @Override
118    public void close() {
119        connection.close();
120        if (invalidationsPropagator != null) {
121            invalidationsPropagator.removeQueue(invalidationsQueue);
122        }
123        if (cache != null) {
124            // Clear caches
125            cache.invalidateAll();
126            childCache.invalidateAll();
127        }
128        // Send invalidations
129        if (clusterInvalidator != null) {
130            clusterInvalidator.sendInvalidations(new DBSInvalidations(true));
131        }
132    }
133
134    @Override
135    public void begin() {
136        connection.begin();
137        processReceivedInvalidations();
138    }
139
140    @Override
141    public void commit() {
142        connection.commit();
143        sendInvalidationsToOthers();
144        processReceivedInvalidations();
145    }
146
147    @Override
148    public void rollback() {
149        connection.rollback();
150    }
151
152    @Override
153    public State readState(String id) {
154        State state = cache.getIfPresent(id);
155        if (state == null) {
156            state = connection.readState(id);
157            if (state != null) {
158                putInCache(state);
159            }
160        }
161        return state;
162    }
163
164    @Override
165    public State readPartialState(String id, Collection<String> keys) {
166        // bypass caches, as the goal of this method is to not trash caches for one-shot reads
167        return connection.readPartialState(id, keys);
168    }
169
170    @Override
171    public List<State> readStates(List<String> ids) {
172        ImmutableMap<String, State> statesMap = cache.getAllPresent(ids);
173        List<String> idsToRetrieve = new ArrayList<>(ids);
174        idsToRetrieve.removeAll(statesMap.keySet());
175        // Read missing states from repository
176        List<State> states = connection.readStates(idsToRetrieve);
177        // Cache them
178        states.forEach(this::putInCache);
179        // Add previous cached one
180        states.addAll(statesMap.values());
181        return states;
182    }
183
184    @Override
185    public void createState(State state) {
186        connection.createState(state);
187        // don't cache new state, it is inefficient on mass import
188    }
189
190    @Override
191    public void createStates(List<State> states) {
192        connection.createStates(states);
193        // don't cache new states, it is inefficient on mass import
194    }
195
196    @Override
197    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
198        connection.updateState(id, diff, changeTokenUpdater);
199        invalidate(id);
200    }
201
202    @Override
203    public void deleteStates(Set<String> ids) {
204        connection.deleteStates(ids);
205        invalidate(ids);
206    }
207
208    @Override
209    public State readChildState(String parentId, String name, Set<String> ignored) {
210        processReceivedInvalidations();
211
212        String childCacheKey = computeChildCacheKey(parentId, name);
213        String stateId = childCache.getIfPresent(childCacheKey);
214        if (stateId != null) {
215            State state = cache.getIfPresent(stateId);
216            if (state != null) {
217                // As we don't have invalidation for childCache we need to check if retrieved state is the right one
218                // and not a previous document which was moved or renamed
219                if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) {
220                    return state;
221                } else {
222                    // We can invalidate the entry in cache as the document seemed to be moved or renamed
223                    childCache.invalidate(childCacheKey);
224                }
225            }
226        }
227        State state = connection.readChildState(parentId, name, ignored);
228        putInCache(state);
229        return state;
230    }
231
232    private void putInCache(State state) {
233        if (state != null) {
234            String stateId = state.get(KEY_ID).toString();
235            cache.put(stateId, state);
236            Object stateParentId = state.get(KEY_PARENT_ID);
237            if (stateParentId != null) {
238                childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId);
239            }
240        }
241    }
242
243    private String computeChildCacheKey(String parentId, String name) {
244        return parentId + '_' + name;
245    }
246
247    private void invalidate(String id) {
248        invalidate(List.of(id));
249    }
250
251    private void invalidate(Collection<String> ids) {
252        cache.invalidateAll(ids);
253        if (invalidations != null) {
254            invalidations.addAll(ids);
255        }
256    }
257
258    protected void sendInvalidationsToOthers() {
259        if (invalidations != null && !invalidations.isEmpty()) {
260            if (clusterInvalidator != null) {
261                // send to other cluster nodes
262                clusterInvalidator.sendInvalidations(invalidations);
263            }
264            if (invalidationsPropagator != null) {
265                // send to other connections
266                invalidationsPropagator.propagateInvalidations(invalidations, invalidationsQueue);
267            }
268            invalidations.clear();
269        }
270    }
271
272    protected void processReceivedInvalidations() {
273        DBSInvalidations invals;
274        // invalidations from other cluster nodes
275        if (clusterInvalidator != null) {
276            invals = clusterInvalidator.receiveInvalidations();
277            // send cluster invalidations to all other connections
278            if (invals != null && !invals.isEmpty() && invalidationsPropagator != null) {
279                invalidationsPropagator.propagateInvalidations(invals, invalidationsQueue);
280            }
281        } else {
282            invals = null;
283        }
284        // invalidations from other connections
285        if (invalidationsQueue != null) {
286            DBSInvalidations inv = invalidationsQueue.getInvalidations();
287            if (invals == null) {
288                invals = inv;
289            } else {
290                invals.add(inv);
291            }
292        }
293        // apply invalidations to the cache (connection-local or repository-wide)
294        if (invals != null && !invals.isEmpty()) {
295            if (invals.all) {
296                cache.invalidateAll();
297                childCache.invalidateAll();
298            } else if (invals.ids != null) {
299                cache.invalidateAll(invals.ids);
300            }
301        }
302    }
303
304    @Override
305    public String getRootId() {
306        return connection.getRootId();
307    }
308
309    @Override
310    public String generateNewId() {
311        return connection.generateNewId();
312    }
313
314    @Override
315    public boolean hasChild(String parentId, String name, Set<String> ignored) {
316        return connection.hasChild(parentId, name, ignored);
317    }
318
319    @Override
320    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
321        return connection.queryKeyValue(key, value, ignored);
322    }
323
324    @Override
325    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
326        return connection.queryKeyValue(key1, value1, key2, value2, ignored);
327    }
328
329    @Override
330    public Stream<State> getDescendants(String id, Set<String> keys) {
331        return connection.getDescendants(id, keys);
332    }
333
334    @Override
335    public Stream<State> getDescendants(String id, Set<String> keys, int limit) {
336        return connection.getDescendants(id, keys, limit);
337    }
338
339    @Override
340    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
341        return connection.queryKeyValuePresence(key, value, ignored);
342    }
343
344    @Override
345    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
346            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
347        return connection.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo);
348    }
349
350    @Override
351    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
352        return connection.scroll(evaluator, batchSize, keepAliveSeconds);
353    }
354
355    @Override
356    public ScrollResult<String> scroll(String scrollId) {
357        return connection.scroll(scrollId);
358    }
359
360    @Override
361    public Lock getLock(String id) {
362        return connection.getLock(id);
363    }
364
365    @Override
366    public Lock setLock(String id, Lock lock) {
367        return connection.setLock(id, lock);
368    }
369
370    @Override
371    public Lock removeLock(String id, String owner) {
372        return connection.removeLock(id, owner);
373    }
374
375    @Override
376    public List<State> queryKeyValueWithOperator(String key1, Object value1, String key2, DBSQueryOperator operator,
377            Object value2, Set<String> ignored) {
378        return connection.queryKeyValueWithOperator(key1, value1, key2, operator, value2, ignored);
379    }
380
381}