001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Maxime Hilaire
018 *     Thierry Martins
019 */
020package org.nuxeo.ecm.core.cache;
021
022import static java.nio.charset.StandardCharsets.UTF_8;
023import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_MAX_SIZE;
024import static org.nuxeo.ecm.core.cache.CacheDescriptor.OPTION_MAX_SIZE;
025
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.util.Collection;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032
033import org.apache.commons.io.IOUtils;
034import org.apache.logging.log4j.LogManager;
035import org.apache.logging.log4j.Logger;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.runtime.api.Framework;
038import org.nuxeo.runtime.cluster.ClusterService;
039import org.nuxeo.runtime.model.ComponentContext;
040import org.nuxeo.runtime.model.ComponentInstance;
041import org.nuxeo.runtime.model.DefaultComponent;
042import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
043import org.nuxeo.runtime.pubsub.SerializableMessage;
044
045/**
046 * Cache service implementation to manage nuxeo cache
047 *
048 * @since 6.0
049 */
050public class CacheServiceImpl extends DefaultComponent implements CacheService {
051
052    private static final Logger log = LogManager.getLogger(CacheServiceImpl.class);
053
054    /**
055     * @since 10.3
056     */
057    public static final String XP_CACHES = "caches";
058
059    /**
060     * @since 8.2
061     */
062    public static final String DEFAULT_CACHE_ID = "default-cache";
063
064    /** @since 9.3 */
065    public static final String CACHE_INVAL_PUBSUB_TOPIC = "cacheinval";
066
067    // allows us to start caches registered programmatically through registerCache(name)
068    protected boolean started;
069
070    /** Currently registered caches. */
071    protected final Map<String, CacheManagement> caches = new ConcurrentHashMap<>();
072
073    protected CachePubSubInvalidator invalidator;
074
075    public static class CacheInvalidation implements SerializableMessage {
076
077        private static final long serialVersionUID = 1L;
078
079        protected static final String SEP = "/";
080
081        public final String cacheName;
082
083        public final String key;
084
085        public CacheInvalidation(String name, String key) {
086            this.cacheName = name;
087            this.key = key;
088        }
089
090        @Override
091        public void serialize(OutputStream out) throws IOException {
092            String string = cacheName + SEP + key;
093            IOUtils.write(string, out, UTF_8);
094        }
095
096        public static CacheInvalidation deserialize(InputStream in) throws IOException {
097            String string = IOUtils.toString(in, UTF_8);
098            String[] parts = string.split(SEP, 2);
099            if (parts.length != 2) {
100                throw new IOException("Invalid invalidation: " + string);
101            }
102            String cacheName = parts[0];
103            String key = parts[1];
104            return new CacheInvalidation(cacheName, key);
105        }
106
107        @Override
108        public String toString() {
109            return getClass().getSimpleName() + "(" + cacheName + "," + key + ")";
110        }
111    }
112
113    public static abstract class AbstractCachePubSubInvalidator extends AbstractPubSubBroker<CacheInvalidation> {
114
115        public static final String ALL_KEYS = "__ALL__";
116
117        @Override
118        public CacheInvalidation deserialize(InputStream in) throws IOException {
119            return CacheInvalidation.deserialize(in);
120        }
121
122        public void sendInvalidation(String cacheName, String key) {
123            sendMessage(new CacheInvalidation(cacheName, key));
124        }
125
126        public void sendInvalidationsAll(String cacheName) {
127            sendMessage(new CacheInvalidation(cacheName, ALL_KEYS));
128        }
129
130        @Override
131        public void receivedMessage(CacheInvalidation invalidation) {
132            CacheManagement cache = (CacheManagement) getCache(invalidation.cacheName);
133            if (cache != null) {
134                String key = invalidation.key;
135                if (ALL_KEYS.equals(key)) {
136                    cache.invalidateLocalAll();
137                } else {
138                    cache.invalidateLocal(key);
139                }
140            }
141        }
142
143        // for testability, we want an alternative implementation to return a test cache
144        protected abstract Cache getCache(String name);
145    }
146
147    protected class CachePubSubInvalidator extends AbstractCachePubSubInvalidator {
148
149        @Override
150        protected Cache getCache(String name) {
151            return CacheServiceImpl.this.getCache(name);
152        }
153    }
154
155    @Override
156    @Deprecated
157    public void registerCache(String name, int maxSize, int timeout) {
158        registerCache(name);
159    }
160
161    @Override
162    public void registerCache(String name) {
163        CacheDescriptor defaultDescriptor = getCacheDescriptor(DEFAULT_CACHE_ID);
164        if (defaultDescriptor == null) {
165            defaultDescriptor = new CacheDescriptor();
166            defaultDescriptor.name = DEFAULT_CACHE_ID;
167            defaultDescriptor.options.put(OPTION_MAX_SIZE, String.valueOf(DEFAULT_MAX_SIZE));
168            register(XP_CACHES, defaultDescriptor);
169        }
170        CacheDescriptor newDescriptor = (CacheDescriptor) new CacheDescriptor().merge(defaultDescriptor);
171        newDescriptor.name = name;
172        // add to registry (merging if needed)
173        register(XP_CACHES, newDescriptor);
174        // start if needed
175        maybeStart(name);
176    }
177
178    @Override
179    public int getApplicationStartedOrder() {
180        ComponentInstance repositoryComponent = Framework.getRuntime().getComponentInstance(
181                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
182        if (repositoryComponent == null || repositoryComponent.getInstance() == null) {
183            return super.getApplicationStartedOrder();
184        }
185        return ((DefaultComponent) repositoryComponent.getInstance()).getApplicationStartedOrder() - 5;
186    }
187
188    @Override
189    public void start(ComponentContext context) {
190        super.start(context);
191        ClusterService clusterService = Framework.getService(ClusterService.class);
192        if (clusterService.isEnabled()) {
193            // register cache invalidator
194            String nodeId = clusterService.getNodeId();
195            invalidator = new CachePubSubInvalidator();
196            invalidator.initialize(CACHE_INVAL_PUBSUB_TOPIC, nodeId);
197            log.info("Registered cache invalidator for node: {}", nodeId);
198        } else {
199            log.info("Not registering a cache invalidator because clustering is not enabled");
200        }
201        // create and starts caches
202        Collection<CacheDescriptor> descriptors = getDescriptors(XP_CACHES);
203        descriptors.forEach(this::startCacheDescriptor);
204        started = true;
205    }
206
207    /** Creates and starts the cache. */
208    protected void startCacheDescriptor(CacheDescriptor desc) {
209        CacheManagement cache;
210        if (desc.klass == null) {
211            cache = new InMemoryCacheImpl(desc); // default cache implementation
212        } else {
213            try {
214                cache = desc.klass.getConstructor(CacheDescriptor.class).newInstance(desc);
215            } catch (ReflectiveOperationException e) {
216                throw new NuxeoException("Failed to instantiate class: " + desc.klass + " for cache: " + desc.name, e);
217            }
218        }
219        // wrap with checker, metrics and invalidator
220        cache = new CacheAttributesChecker(cache);
221        cache = new CacheMetrics(cache);
222        if (invalidator != null) {
223            cache = new CacheInvalidator(cache, invalidator);
224        }
225        cache.start();
226        caches.put(desc.name, cache);
227    }
228
229    @Override
230    public void stop(ComponentContext context) throws InterruptedException {
231        super.stop(context);
232        if (invalidator != null) {
233            invalidator.close();
234            invalidator = null;
235        }
236        for (CacheManagement cache : caches.values()) {
237            cache.stop();
238        }
239        caches.clear();
240        started = false;
241    }
242
243    protected void maybeStart(String name) {
244        if (!started) {
245            // cache will be started by start()
246            return;
247        }
248        // stop previous
249        CacheManagement cache = caches.get(name);
250        if (cache != null) {
251            cache.stop();
252        }
253        // start new one
254        startCacheDescriptor(getCacheDescriptor(name));
255    }
256
257    // --------------- API ---------------
258
259    @Override
260    public Cache getCache(String name) {
261        return caches.get(name);
262    }
263
264    /**
265     * @since 9.3
266     */
267    public CacheDescriptor getCacheDescriptor(String descriptor) {
268        return getDescriptor(XP_CACHES, descriptor);
269    }
270
271}