001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.dbs;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.TimeUnit;
035
036import org.apache.commons.lang.StringUtils;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.api.Lock;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.api.PartialList;
042import org.nuxeo.ecm.core.api.ScrollResult;
043import org.nuxeo.ecm.core.blob.BlobManager;
044import org.nuxeo.ecm.core.model.LockManager;
045import org.nuxeo.ecm.core.model.Session;
046import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
047import org.nuxeo.ecm.core.storage.FulltextConfiguration;
048import org.nuxeo.ecm.core.storage.State;
049import org.nuxeo.ecm.core.storage.State.StateDiff;
050import org.nuxeo.runtime.metrics.MetricsService;
051
052import com.codahale.metrics.MetricRegistry;
053import com.codahale.metrics.SharedMetricRegistries;
054import com.google.common.cache.Cache;
055import com.google.common.cache.CacheBuilder;
056import com.google.common.collect.ImmutableMap;
057import com.google.common.collect.Ordering;
058
059/**
060 * The DBS Cache layer used to cache some method call of real repository
061 *
062 * @since 8.10
063 */
064public class DBSCachingRepository implements DBSRepository {
065
066    private static final Log log = LogFactory.getLog(DBSCachingRepository.class);
067
068    private static final Random RANDOM = new Random();
069
070    private final DBSRepository repository;
071
072    private final Cache<String, State> cache;
073
074    private final Cache<String, String> childCache;
075
076    private DBSClusterInvalidator clusterInvalidator;
077
078    private final DBSInvalidations invalidations;
079
080    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
081
082    public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) {
083        this.repository = repository;
084        // Init caches
085        cache = newCache(descriptor);
086        registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache"));
087        childCache = newCache(descriptor);
088        registry.registerAll(
089                GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache"));
090        if (log.isInfoEnabled()) {
091            log.info(String.format("DBS cache activated on '%s' repository", repository.getName()));
092        }
093        invalidations = new DBSInvalidations();
094        if (descriptor.isClusteringEnabled()) {
095            initClusterInvalidator(descriptor);
096        }
097    }
098
099    protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) {
100        CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
101        builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES);
102        if (descriptor.cacheConcurrencyLevel != null) {
103            builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue());
104        }
105        if (descriptor.cacheMaxSize != null) {
106            builder = builder.maximumSize(descriptor.cacheMaxSize.longValue());
107        }
108        return builder.build();
109    }
110
111    protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) {
112        String nodeId = descriptor.clusterNodeId;
113        if (StringUtils.isBlank(nodeId)) {
114            nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE));
115            log.warn("Missing cluster node id configuration, please define it explicitly "
116                    + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId);
117        } else {
118            nodeId = nodeId.trim();
119        }
120        clusterInvalidator = createClusterInvalidator(descriptor);
121        clusterInvalidator.initialize(nodeId, this);
122    }
123
124    protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) {
125        Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass;
126        if (klass == null) {
127            throw new NuxeoException(
128                    "Unable to get cluster invalidator class from descriptor whereas clustering is enabled");
129        }
130        try {
131            return klass.newInstance();
132        } catch (ReflectiveOperationException e) {
133            throw new NuxeoException(e);
134        }
135
136    }
137
138    public void begin() {
139        repository.begin();
140        processReceivedInvalidations();
141    }
142
143    @Override
144    public void commit() {
145        repository.commit();
146        sendInvalidationsToOther();
147        processReceivedInvalidations();
148    }
149
150    @Override
151    public void rollback() {
152        repository.rollback();
153    }
154
155    @Override
156    public void shutdown() {
157        repository.shutdown();
158        // Clear caches
159        cache.invalidateAll();
160        childCache.invalidateAll();
161        // Remove metrics
162        String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache");
163        String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache");
164        registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName));
165        if (log.isInfoEnabled()) {
166            log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName()));
167        }
168        // Send invalidations
169        if (clusterInvalidator != null) {
170            clusterInvalidator.sendInvalidations(new DBSInvalidations(true));
171        }
172
173    }
174
175    @Override
176    public State readState(String id) {
177        State state = cache.getIfPresent(id);
178        if (state == null) {
179            state = repository.readState(id);
180            if (state != null) {
181                putInCache(state);
182            }
183        }
184        return state;
185    }
186
187    @Override
188    public List<State> readStates(List<String> ids) {
189        ImmutableMap<String, State> statesMap = cache.getAllPresent(ids);
190        List<String> idsToRetrieve = new ArrayList<>(ids);
191        idsToRetrieve.removeAll(statesMap.keySet());
192        // Read missing states from repository
193        List<State> states = repository.readStates(idsToRetrieve);
194        // Cache them
195        states.forEach(this::putInCache);
196        // Add previous cached one
197        states.addAll(statesMap.values());
198        // Sort them
199        states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids)));
200        return states;
201    }
202
203    @Override
204    public void createState(State state) {
205        repository.createState(state);
206        // TODO check if it's relevant
207        putInCache(state);
208    }
209
210    @Override
211    public void createStates(List<State> states) {
212        repository.createStates(states);
213        states.forEach(this::putInCache);
214    }
215
216    @Override
217    public void updateState(String id, StateDiff diff) {
218        repository.updateState(id, diff);
219        invalidate(id);
220    }
221
222    @Override
223    public void deleteStates(Set<String> ids) {
224        repository.deleteStates(ids);
225        invalidateAll(ids);
226    }
227
228    @Override
229    public State readChildState(String parentId, String name, Set<String> ignored) {
230        processReceivedInvalidations();
231
232        String childCacheKey = computeChildCacheKey(parentId, name);
233        String stateId = childCache.getIfPresent(childCacheKey);
234        if (stateId != null) {
235            State state = cache.getIfPresent(stateId);
236            if (state != null) {
237                // As we don't have invalidation for childCache we need to check if retrieved state is the right one
238                // and not a previous document which was moved or renamed
239                if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) {
240                    return state;
241                } else {
242                    // We can invalidate the entry in cache as the document seemed to be moved or renamed
243                    childCache.invalidate(childCacheKey);
244                }
245            }
246        }
247        State state = repository.readChildState(parentId, name, ignored);
248        putInCache(state);
249        return state;
250    }
251
252    private void putInCache(State state) {
253        if (state != null) {
254            String stateId = state.get(KEY_ID).toString();
255            cache.put(stateId, state);
256            Object stateParentId = state.get(KEY_PARENT_ID);
257            if (stateParentId != null) {
258                childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId);
259            }
260        }
261    }
262
263    private String computeChildCacheKey(String parentId, String name) {
264        return parentId + '_' + name;
265    }
266
267    private void invalidate(String id) {
268        invalidateAll(Collections.singleton(id));
269    }
270
271    private void invalidateAll(Collection<String> ids) {
272        cache.invalidateAll(ids);
273        if (clusterInvalidator != null) {
274            DBSInvalidations invalidations = new DBSInvalidations();
275            invalidations.addAll(ids);
276        }
277    }
278
279    protected void sendInvalidationsToOther() {
280        if (!invalidations.isEmpty()) {
281            synchronized (invalidations) {
282                if (clusterInvalidator != null) {
283                    clusterInvalidator.sendInvalidations(invalidations);
284                }
285                invalidations.clear();
286            }
287        }
288    }
289
290    protected void processReceivedInvalidations() {
291        if (clusterInvalidator != null) {
292            DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations();
293            if (invalidations.all) {
294                cache.invalidateAll();
295                childCache.invalidateAll();
296            } else if (invalidations.ids != null) {
297                cache.invalidateAll(invalidations.ids);
298            }
299        }
300    }
301
302    @Override
303    public BlobManager getBlobManager() {
304        return repository.getBlobManager();
305    }
306
307    @Override
308    public FulltextConfiguration getFulltextConfiguration() {
309        return repository.getFulltextConfiguration();
310    }
311
312    @Override
313    public boolean isFulltextDisabled() {
314        return repository.isFulltextDisabled();
315    }
316
317    @Override
318    public String getRootId() {
319        return repository.getRootId();
320    }
321
322    @Override
323    public String generateNewId() {
324        return repository.generateNewId();
325    }
326
327    @Override
328    public boolean hasChild(String parentId, String name, Set<String> ignored) {
329        return repository.hasChild(parentId, name, ignored);
330    }
331
332    @Override
333    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
334        return repository.queryKeyValue(key, value, ignored);
335    }
336
337    @Override
338    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
339        return repository.queryKeyValue(key1, value1, key2, value2, ignored);
340    }
341
342    @Override
343    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
344            Map<String, Object[]> targetProxies) {
345        repository.queryKeyValueArray(key, value, ids, proxyTargets, targetProxies);
346    }
347
348    @Override
349    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
350        return repository.queryKeyValuePresence(key, value, ignored);
351    }
352
353    @Override
354    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
355            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
356        return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo);
357    }
358
359    @Override
360    public LockManager getLockManager() {
361        return repository.getLockManager();
362    }
363
364    @Override
365    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
366        return repository.scroll(evaluator, batchSize, keepAliveSeconds);
367    }
368
369    @Override
370    public ScrollResult scroll(String scrollId) {
371        return repository.scroll(scrollId);
372    }
373
374    @Override
375    public Lock getLock(String id) {
376        return repository.getLock(id);
377    }
378
379    @Override
380    public Lock setLock(String id, Lock lock) {
381        return repository.setLock(id, lock);
382    }
383
384    @Override
385    public Lock removeLock(String id, String owner) {
386        return repository.removeLock(id, owner);
387    }
388
389    @Override
390    public void closeLockManager() {
391        repository.closeLockManager();
392    }
393
394    @Override
395    public void clearLockManagerCaches() {
396        repository.clearLockManagerCaches();
397    }
398
399    @Override
400    public String getName() {
401        return repository.getName();
402    }
403
404    @Override
405    public Session getSession() {
406        if (repository instanceof DBSRepositoryBase) {
407            return ((DBSRepositoryBase) repository).getSession(this);
408        }
409        return repository.getSession();
410    }
411
412    @Override
413    public int getActiveSessionsCount() {
414        return repository.getActiveSessionsCount();
415    }
416
417    @Override
418    public void markReferencedBinaries() {
419        repository.markReferencedBinaries();
420    }
421
422}