001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.storage.dbs;
020
021import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_ID;
022import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_NAME;
023import static org.nuxeo.ecm.core.storage.dbs.DBSDocument.KEY_PARENT_ID;
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.TimeUnit;
035
036import org.apache.commons.lang.StringUtils;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.api.Lock;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.api.PartialList;
042import org.nuxeo.ecm.core.api.ScrollResult;
043import org.nuxeo.ecm.core.blob.BlobManager;
044import org.nuxeo.ecm.core.model.LockManager;
045import org.nuxeo.ecm.core.model.Session;
046import org.nuxeo.ecm.core.query.sql.model.OrderByClause;
047import org.nuxeo.ecm.core.storage.FulltextConfiguration;
048import org.nuxeo.ecm.core.storage.State;
049import org.nuxeo.ecm.core.storage.State.StateDiff;
050import org.nuxeo.ecm.core.storage.dbs.DBSTransactionState.ChangeTokenUpdater;
051import org.nuxeo.runtime.metrics.MetricsService;
052
053import com.codahale.metrics.MetricRegistry;
054import com.codahale.metrics.SharedMetricRegistries;
055import com.google.common.cache.Cache;
056import com.google.common.cache.CacheBuilder;
057import com.google.common.collect.ImmutableMap;
058import com.google.common.collect.Ordering;
059
060/**
061 * The DBS Cache layer used to cache some method call of real repository
062 *
063 * @since 8.10
064 */
065public class DBSCachingRepository implements DBSRepository {
066
067    private static final Log log = LogFactory.getLog(DBSCachingRepository.class);
068
069    private static final Random RANDOM = new Random();
070
071    private final DBSRepository repository;
072
073    private final Cache<String, State> cache;
074
075    private final Cache<String, String> childCache;
076
077    private DBSClusterInvalidator clusterInvalidator;
078
079    private final DBSInvalidations invalidations;
080
081    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
082
083    public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) {
084        this.repository = repository;
085        // Init caches
086        cache = newCache(descriptor);
087        registry.registerAll(GuavaCacheMetric.of(cache, "nuxeo", "repositories", repository.getName(), "cache"));
088        childCache = newCache(descriptor);
089        registry.registerAll(
090                GuavaCacheMetric.of(childCache, "nuxeo", "repositories", repository.getName(), "childCache"));
091        if (log.isInfoEnabled()) {
092            log.info(String.format("DBS cache activated on '%s' repository", repository.getName()));
093        }
094        invalidations = new DBSInvalidations();
095        if (descriptor.isClusteringEnabled()) {
096            initClusterInvalidator(descriptor);
097        }
098    }
099
100    protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) {
101        CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
102        builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats();
103        if (descriptor.cacheConcurrencyLevel != null) {
104            builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue());
105        }
106        if (descriptor.cacheMaxSize != null) {
107            builder = builder.maximumSize(descriptor.cacheMaxSize.longValue());
108        }
109        return builder.build();
110    }
111
112    protected void initClusterInvalidator(DBSRepositoryDescriptor descriptor) {
113        String nodeId = descriptor.clusterNodeId;
114        if (StringUtils.isBlank(nodeId)) {
115            nodeId = String.valueOf(RANDOM.nextInt(Integer.MAX_VALUE));
116            log.warn("Missing cluster node id configuration, please define it explicitly "
117                    + "(usually through repository.clustering.id). Using random cluster node id instead: " + nodeId);
118        } else {
119            nodeId = nodeId.trim();
120        }
121        clusterInvalidator = createClusterInvalidator(descriptor);
122        clusterInvalidator.initialize(nodeId, getName());
123    }
124
125    protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) {
126        Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass;
127        if (klass == null) {
128            throw new NuxeoException(
129                    "Unable to get cluster invalidator class from descriptor whereas clustering is enabled");
130        }
131        try {
132            return klass.newInstance();
133        } catch (ReflectiveOperationException e) {
134            throw new NuxeoException(e);
135        }
136
137    }
138
139    public void begin() {
140        repository.begin();
141        processReceivedInvalidations();
142    }
143
144    @Override
145    public void commit() {
146        repository.commit();
147        sendInvalidationsToOther();
148        processReceivedInvalidations();
149    }
150
151    @Override
152    public void rollback() {
153        repository.rollback();
154    }
155
156    @Override
157    public void shutdown() {
158        repository.shutdown();
159        // Clear caches
160        cache.invalidateAll();
161        childCache.invalidateAll();
162        // Remove metrics
163        String cacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "cache");
164        String childCacheName = MetricRegistry.name("nuxeo", "repositories", repository.getName(), "childCache");
165        registry.removeMatching((name, metric) -> name.startsWith(cacheName) || name.startsWith(childCacheName));
166        if (log.isInfoEnabled()) {
167            log.info(String.format("DBS cache deactivated on '%s' repository", repository.getName()));
168        }
169        // Send invalidations
170        if (clusterInvalidator != null) {
171            clusterInvalidator.sendInvalidations(new DBSInvalidations(true));
172        }
173
174    }
175
176    @Override
177    public State readState(String id) {
178        State state = cache.getIfPresent(id);
179        if (state == null) {
180            state = repository.readState(id);
181            if (state != null) {
182                putInCache(state);
183            }
184        }
185        return state;
186    }
187
188    @Override
189    public List<State> readStates(List<String> ids) {
190        ImmutableMap<String, State> statesMap = cache.getAllPresent(ids);
191        List<String> idsToRetrieve = new ArrayList<>(ids);
192        idsToRetrieve.removeAll(statesMap.keySet());
193        // Read missing states from repository
194        List<State> states = repository.readStates(idsToRetrieve);
195        // Cache them
196        states.forEach(this::putInCache);
197        // Add previous cached one
198        states.addAll(statesMap.values());
199        // Sort them
200        states.sort(Comparator.comparing(state -> state.get(KEY_ID).toString(), Ordering.explicit(ids)));
201        return states;
202    }
203
204    @Override
205    public void createState(State state) {
206        repository.createState(state);
207        // don't cache new state, it is inefficient on mass import
208    }
209
210    @Override
211    public void createStates(List<State> states) {
212        repository.createStates(states);
213        // don't cache new states, it is inefficient on mass import
214    }
215
216    @Override
217    public void updateState(String id, StateDiff diff, ChangeTokenUpdater changeTokenUpdater) {
218        repository.updateState(id, diff, changeTokenUpdater);
219        invalidate(id);
220    }
221
222    @Override
223    public void deleteStates(Set<String> ids) {
224        repository.deleteStates(ids);
225        invalidateAll(ids);
226    }
227
228    @Override
229    public State readChildState(String parentId, String name, Set<String> ignored) {
230        processReceivedInvalidations();
231
232        String childCacheKey = computeChildCacheKey(parentId, name);
233        String stateId = childCache.getIfPresent(childCacheKey);
234        if (stateId != null) {
235            State state = cache.getIfPresent(stateId);
236            if (state != null) {
237                // As we don't have invalidation for childCache we need to check if retrieved state is the right one
238                // and not a previous document which was moved or renamed
239                if (parentId.equals(state.get(KEY_PARENT_ID)) && name.equals(state.get(KEY_NAME))) {
240                    return state;
241                } else {
242                    // We can invalidate the entry in cache as the document seemed to be moved or renamed
243                    childCache.invalidate(childCacheKey);
244                }
245            }
246        }
247        State state = repository.readChildState(parentId, name, ignored);
248        putInCache(state);
249        return state;
250    }
251
252    private void putInCache(State state) {
253        if (state != null) {
254            String stateId = state.get(KEY_ID).toString();
255            cache.put(stateId, state);
256            Object stateParentId = state.get(KEY_PARENT_ID);
257            if (stateParentId != null) {
258                childCache.put(computeChildCacheKey(stateParentId.toString(), state.get(KEY_NAME).toString()), stateId);
259            }
260        }
261    }
262
263    private String computeChildCacheKey(String parentId, String name) {
264        return parentId + '_' + name;
265    }
266
267    private void invalidate(String id) {
268        invalidateAll(Collections.singleton(id));
269    }
270
271    private void invalidateAll(Collection<String> ids) {
272        cache.invalidateAll(ids);
273        if (clusterInvalidator != null) {
274            synchronized (invalidations) {
275                invalidations.addAll(ids);
276            }
277        }
278    }
279
280    protected void sendInvalidationsToOther() {
281        synchronized (invalidations) {
282            if (!invalidations.isEmpty()) {
283                if (clusterInvalidator != null) {
284                    clusterInvalidator.sendInvalidations(invalidations);
285                }
286                invalidations.clear();
287            }
288        }
289    }
290
291    protected void processReceivedInvalidations() {
292        if (clusterInvalidator != null) {
293            DBSInvalidations invalidations = clusterInvalidator.receiveInvalidations();
294            if (invalidations.all) {
295                cache.invalidateAll();
296                childCache.invalidateAll();
297            } else if (invalidations.ids != null) {
298                cache.invalidateAll(invalidations.ids);
299            }
300        }
301    }
302
303    @Override
304    public BlobManager getBlobManager() {
305        return repository.getBlobManager();
306    }
307
308    @Override
309    public FulltextConfiguration getFulltextConfiguration() {
310        return repository.getFulltextConfiguration();
311    }
312
313    @Override
314    public boolean isFulltextDisabled() {
315        return repository.isFulltextDisabled();
316    }
317
318    @Override
319    public boolean isChangeTokenEnabled() {
320        return repository.isChangeTokenEnabled();
321    }
322
323    @Override
324    public String getRootId() {
325        return repository.getRootId();
326    }
327
328    @Override
329    public String generateNewId() {
330        return repository.generateNewId();
331    }
332
333    @Override
334    public boolean hasChild(String parentId, String name, Set<String> ignored) {
335        return repository.hasChild(parentId, name, ignored);
336    }
337
338    @Override
339    public List<State> queryKeyValue(String key, Object value, Set<String> ignored) {
340        return repository.queryKeyValue(key, value, ignored);
341    }
342
343    @Override
344    public List<State> queryKeyValue(String key1, Object value1, String key2, Object value2, Set<String> ignored) {
345        return repository.queryKeyValue(key1, value1, key2, value2, ignored);
346    }
347
348    @Override
349    public void queryKeyValueArray(String key, Object value, Set<String> ids, Map<String, String> proxyTargets,
350            Map<String, Object[]> targetProxies) {
351        repository.queryKeyValueArray(key, value, ids, proxyTargets, targetProxies);
352    }
353
354    @Override
355    public boolean queryKeyValuePresence(String key, String value, Set<String> ignored) {
356        return repository.queryKeyValuePresence(key, value, ignored);
357    }
358
359    @Override
360    public PartialList<Map<String, Serializable>> queryAndFetch(DBSExpressionEvaluator evaluator,
361            OrderByClause orderByClause, boolean distinctDocuments, int limit, int offset, int countUpTo) {
362        return repository.queryAndFetch(evaluator, orderByClause, distinctDocuments, limit, offset, countUpTo);
363    }
364
365    @Override
366    public LockManager getLockManager() {
367        return repository.getLockManager();
368    }
369
370    @Override
371    public ScrollResult scroll(DBSExpressionEvaluator evaluator, int batchSize, int keepAliveSeconds) {
372        return repository.scroll(evaluator, batchSize, keepAliveSeconds);
373    }
374
375    @Override
376    public ScrollResult scroll(String scrollId) {
377        return repository.scroll(scrollId);
378    }
379
380    @Override
381    public Lock getLock(String id) {
382        return repository.getLock(id);
383    }
384
385    @Override
386    public Lock setLock(String id, Lock lock) {
387        return repository.setLock(id, lock);
388    }
389
390    @Override
391    public Lock removeLock(String id, String owner) {
392        return repository.removeLock(id, owner);
393    }
394
395    @Override
396    public void closeLockManager() {
397        repository.closeLockManager();
398    }
399
400    @Override
401    public void clearLockManagerCaches() {
402        repository.clearLockManagerCaches();
403    }
404
405    @Override
406    public String getName() {
407        return repository.getName();
408    }
409
410    @Override
411    public Session getSession() {
412        if (repository instanceof DBSRepositoryBase) {
413            return ((DBSRepositoryBase) repository).getSession(this);
414        }
415        return repository.getSession();
416    }
417
418    @Override
419    public int getActiveSessionsCount() {
420        return repository.getActiveSessionsCount();
421    }
422
423    @Override
424    public void markReferencedBinaries() {
425        repository.markReferencedBinaries();
426    }
427
428}