001/*
002 * (C) Copyright 2016-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.dbs;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
024
025import java.io.Serializable;
026import java.security.SecureRandom;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.Comparator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.Set;
035import java.util.concurrent.TimeUnit;
036import java.util.stream.Stream;
037
038import org.apache.commons.lang3.StringUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.api.Lock;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.PartialList;
044import org.nuxeo.ecm.core.api.ScrollResult;
045import org.nuxeo.ecm.core.api.repository.FulltextConfiguration;
046import org.nuxeo.ecm.core.blob.BlobManager;
047import org.nuxeo.ecm.core.model.LockManager;
048import org.nuxeo.ecm.core.model.Session;
049import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
050import org.nuxeo.ecm.core.storage.State;
051import org.nuxeo.ecm.core.storage.State.StateDiff;
052import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
053import org.nuxeo.runtime.metrics.MetricsService;
054
055import com.codahale.metrics.MetricRegistry;
056import com.codahale.metrics.SharedMetricRegistries;
057import com.google.common.cache.Cache;
058import com.google.common.cache.CacheBuilder;
059import com.google.common.collect.ImmutableMap;
060import com.google.common.collect.Ordering;
061
062/**
063 * The DBS Cache layer used to cache some method call of real repository
064 *
065 * @since 8.10
066 */
067public class DBSCachingRepository implements DBSRepository {
068
069    private static final Log log = LogFactory.getLog(DBSCachingRepository.class);
070
071    private static final Random RANDOM = new SecureRandom();
072
073    private final DBSRepository repository;
074
075    private final Cache<String, State> cache;
076
077    private final Cache<String, String> childCache;
078
079    private DBSClusterInvalidator clusterInvalidator;
080
081    private final DBSInvalidations invalidations;
082
083    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
084
085    public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) {
086        this.repository = repository;
087        // Init caches
088        cache = newCache(descriptor);
089        registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache"));
090        childCache = newCache(descriptor);
091        registry.registerAll(
092                GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache"));
093        if (log.isInfoEnabled()) {
094            log.info(String.format("DBS cache activated on '%s' repository", repository.getName()));
095        }
096        invalidations = new DBSInvalidations();
097        if (descriptor.isClusteringEnabled()) {
098            initClusterInvalidator(descriptor);
099        }
100    }
101
102    protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) {
103        CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
104        builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats();
105        if (descriptor.cacheConcurrencyLevel != null) {
106            builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue());
107        }
108        if (descriptor.cacheMaxSize != null) {
109            builder = builder.maximumSize(descriptor.cacheMaxSize.longValue());
110        }
111        return builder.build();
112    }
113
114    protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) {
115        String nodeId = descriptor.clusterNodeId;
116        if (StringUtils.isBlank(nodeId)) {
117            nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE));
118            log.warn("Missing cluster node id configuration, please define it explicitly "
119                    + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId);
120        } else {
121            nodeId = nodeId.trim();
122        }
123        clusterInvalidator = createClusterInvalidator(descriptor);
124        clusterInvalidator.initialize(nodeId, getName());
125    }
126
127    protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) {
128        Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass;
129        if (klass == null) {
130            throw new NuxeoException(
131                    "Unable to get cluster invalidator class from descriptor whereas clustering is enabled");
132        }
133        try {
134            return klass.newInstance();
135        } catch (ReflectiveOperationException e) {
136            throw new NuxeoException(e);
137        }
138
139    }
140
141    public void begin() {
142        repository.begin();
143        processReceivedInvalidations();
144    }
145
146    @Override
147    public void commit() {
148        repository.commit();
149        sendInvalidationsToOther();
150        processReceivedInvalidations();
151    }
152
153    @Override
154    public void rollback() {
155        repository.rollback();
156    }
157
158    @Override
159    public void shutdown() {
160        repository.shutdown();
161        // Clear caches
162        cache.invalidateAll();
163        childCache.invalidateAll();
164        // Remove metrics
165        String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache");
166        String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache");
167        registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName));
168        if (log.isInfoEnabled()) {
169            log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName()));
170        }
171        // Send invalidations
172        if (clusterInvalidator != null) {
173            clusterInvalidator.sendInvalidations(new DBSInvalidations(true));
174        }
175
176    }
177
178    @Override
179    public State readState(String id) {
180        State state = cache.getIfPresent(id);
181        if (state == null) {
182            state = repository.readState(id);
183            if (state != null) {
184                putInCache(state);
185            }
186        }
187        return state;
188    }
189
190    @Override
191    public State readPartialState(String id, Collection<String> keys) {
192        // bypass caches, as the goal of this method is to not trash caches for one-shot reads
193        return repository.readPartialState(id, keys);
194    }
195
196    @Override
197    public List<State> readStates(List<String> ids) {
198        ImmutableMap<String, State> statesMap = cache.getAllPresent(ids);
199        List<String> idsToRetrieve = new ArrayList<>(ids);
200        idsToRetrieve.removeAll(statesMap.keySet());
201        // Read missing states from repository
202        List<State> states = repository.readStates(idsToRetrieve);
203        // Cache them
204        states.forEach(this::putInCache);
205        // Add previous cached one
206        states.addAll(statesMap.values());
207        // Sort them
208        states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids)));
209        return states;
210    }
211
212    @Override
213    public void createState(State state) {
214        repository.createState(state);
215        // don't cache new state, it is inefficient on mass import
216    }
217
218    @Override
219    public void createStates(List<State> states) {
220        repository.createStates(states);
221        // don't cache new states, it is inefficient on mass import
222    }
223
224    @Override
225    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
226        repository.updateState(id, diff, changeTokenUpdater);
227        invalidate(id);
228    }
229
230    @Override
231    public void deleteStates(Set<String> ids) {
232        repository.deleteStates(ids);
233        invalidateAll(ids);
234    }
235
236    @Override
237    public State readChildState(String parentId, String name, Set<String> ignored) {
238        processReceivedInvalidations();
239
240        String childCacheKey = computeChildCacheKey(parentId, name);
241        String stateId = childCache.getIfPresent(childCacheKey);
242        if (stateId != null) {
243            State state = cache.getIfPresent(stateId);
244            if (state != null) {
245                // As we don't have invalidation for childCache we need to check if retrieved state is the right one
246                // and not a previous document which was moved or renamed
247                if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) {
248                    return state;
249                } else {
250                    // We can invalidate the entry in cache as the document seemed to be moved or renamed
251                    childCache.invalidate(childCacheKey);
252                }
253            }
254        }
255        State state = repository.readChildState(parentId, name, ignored);
256        putInCache(state);
257        return state;
258    }
259
260    private void putInCache(State state) {
261        if (state != null) {
262            String stateId = state.get(KEY_ID).toString();
263            cache.put(stateId, state);
264            Object stateParentId = state.get(KEY_PARENT_ID);
265            if (stateParentId != null) {
266                childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId);
267            }
268        }
269    }
270
271    private String computeChildCacheKey(String parentId, String name) {
272        return parentId + '_' + name;
273    }
274
275    private void invalidate(String id) {
276        invalidateAll(Collections.singleton(id));
277    }
278
279    private void invalidateAll(Collection<String> ids) {
280        cache.invalidateAll(ids);
281        if (clusterInvalidator != null) {
282            synchronized (invalidations) {
283                invalidations.addAll(ids);
284            }
285        }
286    }
287
288    protected void sendInvalidationsToOther() {
289        synchronized (invalidations) {
290            if (!invalidations.isEmpty()) {
291                if (clusterInvalidator != null) {
292                    clusterInvalidator.sendInvalidations(invalidations);
293                }
294                invalidations.clear();
295            }
296        }
297    }
298
299    protected void processReceivedInvalidations() {
300        if (clusterInvalidator != null) {
301            DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations();
302            if (invalidations.all) {
303                cache.invalidateAll();
304                childCache.invalidateAll();
305            } else if (invalidations.ids != null) {
306                cache.invalidateAll(invalidations.ids);
307            }
308        }
309    }
310
311    @Override
312    public BlobManager getBlobManager() {
313        return repository.getBlobManager();
314    }
315
316    @Override
317    public FulltextConfiguration getFulltextConfiguration() {
318        return repository.getFulltextConfiguration();
319    }
320
321    @Override
322    public boolean isFulltextDisabled() {
323        return repository.isFulltextDisabled();
324    }
325
326    @Override
327    public boolean isFulltextSearchDisabled() {
328        return repository.isFulltextSearchDisabled();
329    }
330
331    @Override
332    public boolean isChangeTokenEnabled() {
333        return repository.isChangeTokenEnabled();
334    }
335
336    @Override
337    public String getRootId() {
338        return repository.getRootId();
339    }
340
341    @Override
342    public String generateNewId() {
343        return repository.generateNewId();
344    }
345
346    @Override
347    public boolean hasChild(String parentId, String name, Set<String> ignored) {
348        return repository.hasChild(parentId, name, ignored);
349    }
350
351    @Override
352    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
353        return repository.queryKeyValue(key, value, ignored);
354    }
355
356    @Override
357    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
358        return repository.queryKeyValue(key1, value1, key2, value2, ignored);
359    }
360
361    @Override
362    public Stream<State> getDescendants(String id, Set<String> keys) {
363        return repository.getDescendants(id, keys);
364    }
365
366    @Override
367    public Stream<State> getDescendants(String id, Set<String> keys, int limit) {
368        return repository.getDescendants(id, keys, limit);
369    }
370
371    @Override
372    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
373        return repository.queryKeyValuePresence(key, value, ignored);
374    }
375
376    @Override
377    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
378            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
379        return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo);
380    }
381
382    @Override
383    public LockManager getLockManager() {
384        return repository.getLockManager();
385    }
386
387    @Override
388    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
389        return repository.scroll(evaluator, batchSize, keepAliveSeconds);
390    }
391
392    @Override
393    public ScrollResult<String> scroll(String scrollId) {
394        return repository.scroll(scrollId);
395    }
396
397    @Override
398    public Lock getLock(String id) {
399        return repository.getLock(id);
400    }
401
402    @Override
403    public Lock setLock(String id, Lock lock) {
404        return repository.setLock(id, lock);
405    }
406
407    @Override
408    public Lock removeLock(String id, String owner) {
409        return repository.removeLock(id, owner);
410    }
411
412    @Override
413    public void closeLockManager() {
414        repository.closeLockManager();
415    }
416
417    @Override
418    public void clearLockManagerCaches() {
419        repository.clearLockManagerCaches();
420    }
421
422    @Override
423    public String getName() {
424        return repository.getName();
425    }
426
427    @Override
428    public Session getSession() {
429        if (repository instanceof DBSRepositoryBase) {
430            return ((DBSRepositoryBase) repository).getSession(this);
431        }
432        return repository.getSession();
433    }
434
435    @Override
436    public int getActiveSessionsCount() {
437        return repository.getActiveSessionsCount();
438    }
439
440    @Override
441    public void markReferencedBinaries() {
442        repository.markReferencedBinaries();
443    }
444
445}