001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Maxime Hilaire
018 *     Thierry Martins
019 */
020package org.nuxeo.ecm.core.cache;
021
022import static java.nio.charset.StandardCharsets.UTF_8;
023import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_MAX_SIZE;
024import static org.nuxeo.ecm.core.cache.CacheDescriptor.OPTION_MAX_SIZE;
025
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.util.Collection;
030import java.util.Map;
031import java.util.Random;
032import java.util.concurrent.ConcurrentHashMap;
033
034import org.apache.commons.io.IOUtils;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.logging.log4j.LogManager;
037import org.apache.logging.log4j.Logger;
038import org.nuxeo.ecm.core.api.NuxeoException;
039import org.nuxeo.runtime.api.Framework;
040import org.nuxeo.runtime.model.ComponentContext;
041import org.nuxeo.runtime.model.ComponentInstance;
042import org.nuxeo.runtime.model.DefaultComponent;
043import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
044import org.nuxeo.runtime.pubsub.SerializableMessage;
045
046/**
047 * Cache service implementation to manage nuxeo cache
048 *
049 * @since 6.0
050 */
051public class CacheServiceImpl extends DefaultComponent implements CacheService {
052
053    private static final Logger log = LogManager.getLogger(CacheServiceImpl.class);
054
055    /**
056     * @since 10.3
057     */
058    public static final String XP_CACHES = "caches";
059
060    protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength)
061
062    /**
063     * @since 8.2
064     */
065    public static final String DEFAULT_CACHE_ID = "default-cache";
066
067    /** @since 9.3 */
068    public static final String CACHE_INVAL_PUBSUB_TOPIC = "cacheinval";
069
070    /**
071     * Framework property defining whether clustering is enabled.
072     *
073     * @since 9.3
074     */
075    public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled";
076
077    /**
078     * Framework property containing the node id.
079     *
080     * @since 9.3
081     */
082    public static final String NODE_ID_PROP = "repository.clustering.id";
083
084    // allows us to start caches registered programmatically through registerCache(name)
085    protected boolean started;
086
087    /** Currently registered caches. */
088    protected final Map<String, CacheManagement> caches = new ConcurrentHashMap<>();
089
090    protected CachePubSubInvalidator invalidator;
091
092    public static class CacheInvalidation implements SerializableMessage {
093
094        private static final long serialVersionUID = 1L;
095
096        protected static final String SEP = "/";
097
098        public final String cacheName;
099
100        public final String key;
101
102        public CacheInvalidation(String name, String key) {
103            this.cacheName = name;
104            this.key = key;
105        }
106
107        @Override
108        public void serialize(OutputStream out) throws IOException {
109            String string = cacheName + SEP + key;
110            IOUtils.write(string, out, UTF_8);
111        }
112
113        public static CacheInvalidation deserialize(InputStream in) throws IOException {
114            String string = IOUtils.toString(in, UTF_8);
115            String[] parts = string.split(SEP, 2);
116            if (parts.length != 2) {
117                throw new IOException("Invalid invalidation: " + string);
118            }
119            String cacheName = parts[0];
120            String key = parts[1];
121            return new CacheInvalidation(cacheName, key);
122        }
123
124        @Override
125        public String toString() {
126            return getClass().getSimpleName() + "(" + cacheName + "," + key + ")";
127        }
128    }
129
130    public static abstract class AbstractCachePubSubInvalidator extends AbstractPubSubBroker<CacheInvalidation> {
131
132        public static final String ALL_KEYS = "__ALL__";
133
134        @Override
135        public CacheInvalidation deserialize(InputStream in) throws IOException {
136            return CacheInvalidation.deserialize(in);
137        }
138
139        public void sendInvalidation(String cacheName, String key) {
140            sendMessage(new CacheInvalidation(cacheName, key));
141        }
142
143        public void sendInvalidationsAll(String cacheName) {
144            sendMessage(new CacheInvalidation(cacheName, ALL_KEYS));
145        }
146
147        @Override
148        public void receivedMessage(CacheInvalidation invalidation) {
149            CacheManagement cache = (CacheManagement) getCache(invalidation.cacheName);
150            if (cache != null) {
151                String key = invalidation.key;
152                if (ALL_KEYS.equals(key)) {
153                    cache.invalidateLocalAll();
154                } else {
155                    cache.invalidateLocal(key);
156                }
157            }
158        }
159
160        // for testability, we want an alternative implementation to return a test cache
161        protected abstract Cache getCache(String name);
162    }
163
164    protected class CachePubSubInvalidator extends AbstractCachePubSubInvalidator {
165
166        @Override
167        protected Cache getCache(String name) {
168            return CacheServiceImpl.this.getCache(name);
169        }
170    }
171
172    @Override
173    @Deprecated
174    public void registerCache(String name, int maxSize, int timeout) {
175        registerCache(name);
176    }
177
178    @Override
179    public void registerCache(String name) {
180        CacheDescriptor defaultDescriptor = getCacheDescriptor(DEFAULT_CACHE_ID);
181        if (defaultDescriptor == null) {
182            defaultDescriptor = new CacheDescriptor();
183            defaultDescriptor.name = DEFAULT_CACHE_ID;
184            defaultDescriptor.options.put(OPTION_MAX_SIZE, String.valueOf(DEFAULT_MAX_SIZE));
185            register(XP_CACHES, defaultDescriptor);
186        }
187        CacheDescriptor newDescriptor = (CacheDescriptor) new CacheDescriptor().merge(defaultDescriptor);
188        newDescriptor.name = name;
189        // add to registry (merging if needed)
190        register(XP_CACHES, newDescriptor);
191        // start if needed
192        maybeStart(name);
193    }
194
195    @Override
196    public int getApplicationStartedOrder() {
197        ComponentInstance repositoryComponent = Framework.getRuntime().getComponentInstance(
198                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
199        if (repositoryComponent == null || repositoryComponent.getInstance() == null) {
200            return super.getApplicationStartedOrder();
201        }
202        return ((DefaultComponent) repositoryComponent.getInstance()).getApplicationStartedOrder() - 5;
203    }
204
205    @Override
206    public void start(ComponentContext context) {
207        super.start(context);
208        if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) {
209            // register cache invalidator
210            String nodeId = Framework.getProperty(NODE_ID_PROP);
211            if (StringUtils.isBlank(nodeId)) {
212                nodeId = String.valueOf(RANDOM.nextLong());
213                log.warn("Missing cluster node id configuration, please define it explicitly "
214                        + "(usually through repository.clustering.id). Using random cluster node id instead: {}",
215                        nodeId);
216            } else {
217                nodeId = nodeId.trim();
218            }
219            invalidator = new CachePubSubInvalidator();
220            invalidator.initialize(CACHE_INVAL_PUBSUB_TOPIC, nodeId);
221            log.info("Registered cache invalidator for node: {}", nodeId);
222        } else {
223            log.info("Not registering a cache invalidator because clustering is not enabled");
224        }
225        // create and starts caches
226        Collection<CacheDescriptor> descriptors = getDescriptors(XP_CACHES);
227        descriptors.forEach(this::startCacheDescriptor);
228        started = true;
229    }
230
231    /** Creates and starts the cache. */
232    protected void startCacheDescriptor(CacheDescriptor desc) {
233        CacheManagement cache;
234        if (desc.klass == null) {
235            cache = new InMemoryCacheImpl(desc); // default cache implementation
236        } else {
237            try {
238                cache = desc.klass.getConstructor(CacheDescriptor.class).newInstance(desc);
239            } catch (ReflectiveOperationException e) {
240                throw new NuxeoException("Failed to instantiate class: " + desc.klass + " for cache: " + desc.name, e);
241            }
242        }
243        // wrap with checker, metrics and invalidator
244        cache = new CacheAttributesChecker(cache);
245        cache = new CacheMetrics(cache);
246        if (invalidator != null) {
247            cache = new CacheInvalidator(cache, invalidator);
248        }
249        cache.start();
250        caches.put(desc.name, cache);
251    }
252
253    @Override
254    public void stop(ComponentContext context) throws InterruptedException {
255        super.stop(context);
256        if (invalidator != null) {
257            invalidator.close();
258            invalidator = null;
259        }
260        for (CacheManagement cache : caches.values()) {
261            cache.stop();
262        }
263        caches.clear();
264        started = false;
265    }
266
267    protected void maybeStart(String name) {
268        if (!started) {
269            // cache will be started by start()
270            return;
271        }
272        // stop previous
273        CacheManagement cache = caches.get(name);
274        if (cache != null) {
275            cache.stop();
276        }
277        // start new one
278        startCacheDescriptor(getCacheDescriptor(name));
279    }
280
281    // --------------- API ---------------
282
283    @Override
284    public Cache getCache(String name) {
285        return caches.get(name);
286    }
287
288    /**
289     * @since 9.3
290     */
291    public CacheDescriptor getCacheDescriptor(String descriptor) {
292        return getDescriptor(XP_CACHES, descriptor);
293    }
294
295}