001/*
002 * (C) Copyright 2016-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.dbs;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.TimeUnit;
035import java.util.stream.Stream;
036
037import org.apache.commons.lang.StringUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.Lock;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.PartialList;
043import org.nuxeo.ecm.core.api.ScrollResult;
044import org.nuxeo.ecm.core.blob.BlobManager;
045import org.nuxeo.ecm.core.model.LockManager;
046import org.nuxeo.ecm.core.model.Session;
047import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
048import org.nuxeo.ecm.core.storage.FulltextConfiguration;
049import org.nuxeo.ecm.core.storage.State;
050import org.nuxeo.ecm.core.storage.State.StateDiff;
051import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
052import org.nuxeo.runtime.metrics.MetricsService;
053
054import com.codahale.metrics.MetricRegistry;
055import com.codahale.metrics.SharedMetricRegistries;
056import com.google.common.cache.Cache;
057import com.google.common.cache.CacheBuilder;
058import com.google.common.collect.ImmutableMap;
059import com.google.common.collect.Ordering;
060
061/**
062 * The DBS Cache layer used to cache some method call of real repository
063 *
064 * @since 8.10
065 */
066public class DBSCachingRepository implements DBSRepository {
067
068    private static final Log log = LogFactory.getLog(DBSCachingRepository.class);
069
070    private static final Random RANDOM = new Random();
071
072    private final DBSRepository repository;
073
074    private final Cache<String, State> cache;
075
076    private final Cache<String, String> childCache;
077
078    private DBSClusterInvalidator clusterInvalidator;
079
080    private final DBSInvalidations invalidations;
081
082    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
083
084    public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) {
085        this.repository = repository;
086        // Init caches
087        cache = newCache(descriptor);
088        registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache"));
089        childCache = newCache(descriptor);
090        registry.registerAll(
091                GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache"));
092        if (log.isInfoEnabled()) {
093            log.info(String.format("DBS cache activated on '%s' repository", repository.getName()));
094        }
095        invalidations = new DBSInvalidations();
096        if (descriptor.isClusteringEnabled()) {
097            initClusterInvalidator(descriptor);
098        }
099    }
100
101    protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) {
102        CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
103        builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats();
104        if (descriptor.cacheConcurrencyLevel != null) {
105            builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue());
106        }
107        if (descriptor.cacheMaxSize != null) {
108            builder = builder.maximumSize(descriptor.cacheMaxSize.longValue());
109        }
110        return builder.build();
111    }
112
113    protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) {
114        String nodeId = descriptor.clusterNodeId;
115        if (StringUtils.isBlank(nodeId)) {
116            nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE));
117            log.warn("Missing cluster node id configuration, please define it explicitly "
118                    + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId);
119        } else {
120            nodeId = nodeId.trim();
121        }
122        clusterInvalidator = createClusterInvalidator(descriptor);
123        clusterInvalidator.initialize(nodeId, getName());
124    }
125
126    protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) {
127        Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass;
128        if (klass == null) {
129            throw new NuxeoException(
130                    "Unable to get cluster invalidator class from descriptor whereas clustering is enabled");
131        }
132        try {
133            return klass.newInstance();
134        } catch (ReflectiveOperationException e) {
135            throw new NuxeoException(e);
136        }
137
138    }
139
140    public void begin() {
141        repository.begin();
142        processReceivedInvalidations();
143    }
144
145    @Override
146    public void commit() {
147        repository.commit();
148        sendInvalidationsToOther();
149        processReceivedInvalidations();
150    }
151
152    @Override
153    public void rollback() {
154        repository.rollback();
155    }
156
157    @Override
158    public void shutdown() {
159        repository.shutdown();
160        // Clear caches
161        cache.invalidateAll();
162        childCache.invalidateAll();
163        // Remove metrics
164        String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache");
165        String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache");
166        registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName));
167        if (log.isInfoEnabled()) {
168            log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName()));
169        }
170        // Send invalidations
171        if (clusterInvalidator != null) {
172            clusterInvalidator.sendInvalidations(new DBSInvalidations(true));
173        }
174
175    }
176
177    @Override
178    public State readState(String id) {
179        State state = cache.getIfPresent(id);
180        if (state == null) {
181            state = repository.readState(id);
182            if (state != null) {
183                putInCache(state);
184            }
185        }
186        return state;
187    }
188
189    @Override
190    public State readPartialState(String id, Collection<String> keys) {
191        // bypass caches, as the goal of this method is to not trash caches for one-shot reads
192        return repository.readPartialState(id, keys);
193    }
194
195    @Override
196    public List<State> readStates(List<String> ids) {
197        ImmutableMap<String, State> statesMap = cache.getAllPresent(ids);
198        List<String> idsToRetrieve = new ArrayList<>(ids);
199        idsToRetrieve.removeAll(statesMap.keySet());
200        // Read missing states from repository
201        List<State> states = repository.readStates(idsToRetrieve);
202        // Cache them
203        states.forEach(this::putInCache);
204        // Add previous cached one
205        states.addAll(statesMap.values());
206        // Sort them
207        states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids)));
208        return states;
209    }
210
211    @Override
212    public void createState(State state) {
213        repository.createState(state);
214        // don't cache new state, it is inefficient on mass import
215    }
216
217    @Override
218    public void createStates(List<State> states) {
219        repository.createStates(states);
220        // don't cache new states, it is inefficient on mass import
221    }
222
223    @Override
224    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
225        repository.updateState(id, diff, changeTokenUpdater);
226        invalidate(id);
227    }
228
229    @Override
230    public void deleteStates(Set<String> ids) {
231        repository.deleteStates(ids);
232        invalidateAll(ids);
233    }
234
235    @Override
236    public State readChildState(String parentId, String name, Set<String> ignored) {
237        processReceivedInvalidations();
238
239        String childCacheKey = computeChildCacheKey(parentId, name);
240        String stateId = childCache.getIfPresent(childCacheKey);
241        if (stateId != null) {
242            State state = cache.getIfPresent(stateId);
243            if (state != null) {
244                // As we don't have invalidation for childCache we need to check if retrieved state is the right one
245                // and not a previous document which was moved or renamed
246                if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) {
247                    return state;
248                } else {
249                    // We can invalidate the entry in cache as the document seemed to be moved or renamed
250                    childCache.invalidate(childCacheKey);
251                }
252            }
253        }
254        State state = repository.readChildState(parentId, name, ignored);
255        putInCache(state);
256        return state;
257    }
258
259    private void putInCache(State state) {
260        if (state != null) {
261            String stateId = state.get(KEY_ID).toString();
262            cache.put(stateId, state);
263            Object stateParentId = state.get(KEY_PARENT_ID);
264            if (stateParentId != null) {
265                childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId);
266            }
267        }
268    }
269
270    private String computeChildCacheKey(String parentId, String name) {
271        return parentId + '_' + name;
272    }
273
274    private void invalidate(String id) {
275        invalidateAll(Collections.singleton(id));
276    }
277
278    private void invalidateAll(Collection<String> ids) {
279        cache.invalidateAll(ids);
280        if (clusterInvalidator != null) {
281            synchronized (invalidations) {
282                invalidations.addAll(ids);
283            }
284        }
285    }
286
287    protected void sendInvalidationsToOther() {
288        synchronized (invalidations) {
289            if (!invalidations.isEmpty()) {
290                if (clusterInvalidator != null) {
291                    clusterInvalidator.sendInvalidations(invalidations);
292                }
293                invalidations.clear();
294            }
295        }
296    }
297
298    protected void processReceivedInvalidations() {
299        if (clusterInvalidator != null) {
300            DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations();
301            if (invalidations.all) {
302                cache.invalidateAll();
303                childCache.invalidateAll();
304            } else if (invalidations.ids != null) {
305                cache.invalidateAll(invalidations.ids);
306            }
307        }
308    }
309
310    @Override
311    public BlobManager getBlobManager() {
312        return repository.getBlobManager();
313    }
314
315    @Override
316    public FulltextConfiguration getFulltextConfiguration() {
317        return repository.getFulltextConfiguration();
318    }
319
320    @Override
321    public boolean isFulltextDisabled() {
322        return repository.isFulltextDisabled();
323    }
324
325    @Override
326    public boolean isChangeTokenEnabled() {
327        return repository.isChangeTokenEnabled();
328    }
329
330    @Override
331    public String getRootId() {
332        return repository.getRootId();
333    }
334
335    @Override
336    public String generateNewId() {
337        return repository.generateNewId();
338    }
339
340    @Override
341    public boolean hasChild(String parentId, String name, Set<String> ignored) {
342        return repository.hasChild(parentId, name, ignored);
343    }
344
345    @Override
346    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
347        return repository.queryKeyValue(key, value, ignored);
348    }
349
350    @Override
351    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
352        return repository.queryKeyValue(key1, value1, key2, value2, ignored);
353    }
354
355    @Override
356    public Stream<State> getDescendants(String id, Set<String> keys) {
357        return repository.getDescendants(id, keys);
358    }
359
360    @Override
361    public Stream<State> getDescendants(String id, Set<String> keys, int limit) {
362        return repository.getDescendants(id, keys, limit);
363    }
364
365    @Override
366    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
367        return repository.queryKeyValuePresence(key, value, ignored);
368    }
369
370    @Override
371    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
372            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
373        return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo);
374    }
375
376    @Override
377    public LockManager getLockManager() {
378        return repository.getLockManager();
379    }
380
381    @Override
382    public ScrollResult<String> scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
383        return repository.scroll(evaluator, batchSize, keepAliveSeconds);
384    }
385
386    @Override
387    public ScrollResult<String> scroll(String scrollId) {
388        return repository.scroll(scrollId);
389    }
390
391    @Override
392    public Lock getLock(String id) {
393        return repository.getLock(id);
394    }
395
396    @Override
397    public Lock setLock(String id, Lock lock) {
398        return repository.setLock(id, lock);
399    }
400
401    @Override
402    public Lock removeLock(String id, String owner) {
403        return repository.removeLock(id, owner);
404    }
405
406    @Override
407    public void closeLockManager() {
408        repository.closeLockManager();
409    }
410
411    @Override
412    public void clearLockManagerCaches() {
413        repository.clearLockManagerCaches();
414    }
415
416    @Override
417    public String getName() {
418        return repository.getName();
419    }
420
421    @Override
422    public Session getSession() {
423        if (repository instanceof DBSRepositoryBase) {
424            return ((DBSRepositoryBase) repository).getSession(this);
425        }
426        return repository.getSession();
427    }
428
429    @Override
430    public int getActiveSessionsCount() {
431        return repository.getActiveSessionsCount();
432    }
433
434    @Override
435    public void markReferencedBinaries() {
436        repository.markReferencedBinaries();
437    }
438
439}