001/*
002 * (C) Copyright 2016-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.dbs;
021
022import java.util.concurrent.TimeUnit;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.api.Lock;
027import org.nuxeo.ecm.core.api.NuxeoException;
028import org.nuxeo.ecm.core.api.lock.LockManager;
029import org.nuxeo.ecm.core.api.repository.FulltextConfiguration;
030import org.nuxeo.ecm.core.blob.BlobManager;
031import org.nuxeo.ecm.core.model.Session;
032import org.nuxeo.ecm.core.storage.State;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.cluster.ClusterService;
035import org.nuxeo.runtime.metrics.MetricsService;
036
037import com.google.common.cache.Cache;
038import com.google.common.cache.CacheBuilder;
039
040import io.dropwizard.metrics5.MetricName;
041import io.dropwizard.metrics5.MetricRegistry;
042import io.dropwizard.metrics5.SharedMetricRegistries;
043
044/**
045 * The DBS Cache layer used to cache some method call of real repository
046 *
047 * @since 8.10
048 */
049public class DBSCachingRepository implements DBSRepository {
050
051    private static final Log log = LogFactory.getLog(DBSCachingRepository.class);
052
053    protected static final String METRIC_CACHE_NAME = "nuxeo.repositories.repository.cache";
054
055    protected static final String METRIC_CHILD_CACHE_NAME = "nuxeo.repositories.repository.childCache";
056
057    private final DBSRepository repository;
058
059    protected final Cache<String, State> cache;
060
061    protected final Cache<String, String> childCache;
062
063    protected final DBSRepositoryDescriptor descriptor;
064
065    protected final DBSInvalidationsPropagator invalidationsPropagator;
066
067    protected final DBSClusterInvalidator clusterInvalidator;
068
069    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName());
070
071    public DBSCachingRepository(DBSRepository repository, DBSRepositoryDescriptor descriptor) {
072        this.repository = repository;
073        this.descriptor = descriptor;
074        // Init caches
075        if (supportsTransactions()) {
076            // each connection will have its own cache
077            cache = null;
078            childCache = null;
079        } else {
080            // one global cache held by the repository
081            cache = newCache(true);
082            childCache = newChildCache(true);
083        }
084        if (log.isInfoEnabled()) {
085            log.info(String.format("DBS cache activated on '%s' repository", getName()));
086        }
087        invalidationsPropagator = initInvalidationsPropagator();
088        clusterInvalidator = initClusterInvalidator(descriptor);
089    }
090
091    protected Cache<String, State> getCache() {
092        return cache;
093    }
094
095    protected Cache<String, String> getChildCache() {
096        return childCache;
097    }
098
099    protected DBSInvalidationsPropagator getInvalidationsPropagator() {
100        return invalidationsPropagator;
101    }
102
103    protected DBSClusterInvalidator getClusterInvalidator() {
104        return clusterInvalidator;
105    }
106
107    protected Cache<String, State> newCache(boolean metrics) {
108        Cache<String, State> c = newCache(descriptor);
109        if (metrics) {
110            registry.registerAll(GuavaCacheMetric.of(c,
111                    MetricName.build(METRIC_CACHE_NAME).tagged("repository", repository.getName())));
112        }
113        return c;
114    }
115
116    protected Cache<String, String> newChildCache(boolean metrics) {
117        Cache<String, String> c = newCache(descriptor);
118        if (metrics) {
119            registry.registerAll(GuavaCacheMetric.of(c,
120                    MetricName.build(METRIC_CHILD_CACHE_NAME).tagged("repository", repository.getName())));
121        }
122        return c;
123    }
124
125    protected void removeCacheMetrics() {
126        registry.removeMatching((name, metric) -> name.getKey().startsWith(METRIC_CACHE_NAME)
127                || name.getKey().startsWith(METRIC_CHILD_CACHE_NAME));
128    }
129
130    protected <T> Cache<String, T> newCache(DBSRepositoryDescriptor descriptor) {
131        CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
132        builder = builder.expireAfterWrite(descriptor.cacheTTL.longValue(), TimeUnit.MINUTES).recordStats();
133        if (descriptor.cacheConcurrencyLevel != null) {
134            builder = builder.concurrencyLevel(descriptor.cacheConcurrencyLevel.intValue());
135        }
136        if (descriptor.cacheMaxSize != null) {
137            builder = builder.maximumSize(descriptor.cacheMaxSize.longValue());
138        }
139        return builder.build();
140    }
141
142    /**
143     * Invalidations need to be propagated between connection caches only if there is such a cache, which is the case
144     * only if transactions are used.
145     */
146    protected DBSInvalidationsPropagator initInvalidationsPropagator() {
147        ClusterService clusterService = Framework.getService(ClusterService.class);
148        if (clusterService.isEnabled() && supportsTransactions()) {
149            return new DBSInvalidationsPropagator();
150        } else {
151            return null;
152        }
153    }
154
155    protected DBSClusterInvalidator initClusterInvalidator(DBSRepositoryDescriptor descriptor) {
156        ClusterService clusterService = Framework.getService(ClusterService.class);
157        if (clusterService.isEnabled()) {
158            DBSClusterInvalidator ci = createClusterInvalidator(descriptor);
159            ci.initialize(clusterService.getNodeId(), getName());
160            return ci;
161        } else {
162            return null;
163        }
164    }
165
166    protected DBSClusterInvalidator createClusterInvalidator(DBSRepositoryDescriptor descriptor) {
167        Class<? extends DBSClusterInvalidator> klass = descriptor.clusterInvalidatorClass;
168        if (klass == null) {
169            throw new NuxeoException(
170                    "Unable to get cluster invalidator class from descriptor whereas clustering is enabled");
171        }
172        try {
173            return klass.getDeclaredConstructor().newInstance();
174        } catch (ReflectiveOperationException e) {
175            throw new NuxeoException(e);
176        }
177    }
178
179    @Override
180    public void shutdown() {
181        repository.shutdown();
182        if (cache != null) {
183            // Clear caches
184            cache.invalidateAll();
185            childCache.invalidateAll();
186        }
187        removeCacheMetrics();
188        if (log.isInfoEnabled()) {
189            log.info(String.format("DBS cache deactivated on '%s' repository", getName()));
190        }
191    }
192
193    @Override
194    @SuppressWarnings("resource") // connection closed by DBSCachingConnection.close
195    public DBSConnection getConnection() {
196        DBSConnection connection = repository.getConnection();
197        return new DBSCachingConnection(connection, this);
198    }
199
200    @Override
201    public boolean supportsTransactions() {
202        return repository.supportsTransactions();
203    }
204
205    @Override
206    public BlobManager getBlobManager() {
207        return repository.getBlobManager();
208    }
209
210    @Override
211    public FulltextConfiguration getFulltextConfiguration() {
212        return repository.getFulltextConfiguration();
213    }
214
215    @Override
216    public boolean isFulltextDisabled() {
217        return repository.isFulltextDisabled();
218    }
219
220    @Override
221    public boolean isFulltextStoredInBlob() {
222        return repository.isFulltextStoredInBlob();
223    }
224
225    @Override
226    public boolean isFulltextSearchDisabled() {
227        return repository.isFulltextSearchDisabled();
228    }
229
230    @Override
231    public boolean isChangeTokenEnabled() {
232        return repository.isChangeTokenEnabled();
233    }
234
235    @Override
236    public LockManager getLockManager() {
237        return repository.getLockManager();
238    }
239
240    @Override
241    public Lock getLock(String id) {
242        return repository.getLock(id);
243    }
244
245    @Override
246    public Lock setLock(String id, Lock lock) {
247        return repository.setLock(id, lock);
248    }
249
250    @Override
251    public Lock removeLock(String id, String owner) {
252        return repository.removeLock(id, owner);
253    }
254
255    @Override
256    public void closeLockManager() {
257        repository.closeLockManager();
258    }
259
260    @Override
261    public void clearLockManagerCaches() {
262        repository.clearLockManagerCaches();
263    }
264
265    @Override
266    public String getName() {
267        return repository.getName();
268    }
269
270    @Override
271    public Session getSession() {
272        // don't call repository.getSession() as we want the session to have
273        // a reference to the caching repository, not to the base one
274        return new DBSSession(this);
275    }
276
277    @Override
278    public void markReferencedBinaries() {
279        repository.markReferencedBinaries();
280    }
281
282}