001/*
002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Maxime Hilaire
018 *     Thierry Martins
019 */
020package org.nuxeo.ecm.core.cache;
021
022import static java.nio.charset.StandardCharsets.UTF_8;
023import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_MAX_SIZE;
024import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_TTL;
025import static org.nuxeo.ecm.core.cache.CacheDescriptor.OPTION_MAX_SIZE;
026
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.OutputStream;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.CopyOnWriteArrayList;
037
038import org.apache.commons.io.IOUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.model.ComponentContext;
044import org.nuxeo.runtime.model.ComponentInstance;
045import org.nuxeo.runtime.model.ComponentName;
046import org.nuxeo.runtime.model.DefaultComponent;
047import org.nuxeo.runtime.pubsub.AbstractPubSubBroker;
048import org.nuxeo.runtime.pubsub.SerializableMessage;
049
050/**
051 * Cache service implementation to manage nuxeo cache
052 *
053 * @since 6.0
054 */
055public class CacheServiceImpl extends DefaultComponent implements CacheService {
056
057    private static final Log log = LogFactory.getLog(CacheServiceImpl.class);
058
059    public static final ComponentName NAME = new ComponentName(CacheServiceImpl.class.getName());
060
061    protected static final Random RANDOM = new Random();
062
063    /**
064     * @since 8.2
065     */
066    public static final String DEFAULT_CACHE_ID = "default-cache";
067
068    /** @since 9.3 */
069    public static final String CACHE_INVAL_PUBSUB_TOPIC = "cacheinval";
070
071    /**
072     * Framework property defining whether clustering is enabled.
073     *
074     * @since 9.3
075     */
076    public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled";
077
078    /**
079     * Framework property containing the node id.
080     *
081     * @since 9.3
082     */
083    public static final String NODE_ID_PROP = "repository.clustering.id";
084
085    protected final CacheDescriptorRegistry registry = new CacheDescriptorRegistry();
086
087    /** Currently registered caches. */
088    protected final Map<String, CacheManagement> caches = new ConcurrentHashMap<>();
089
090    protected CachePubSubInvalidator invalidator;
091
092    // SimpleContributionRegistry is overkill and does not deal well with a "remove" feature
093    protected static class CacheDescriptorRegistry {
094
095        protected Map<String, List<CacheDescriptor>> allDescriptors = new ConcurrentHashMap<>();
096
097        protected Map<String, CacheDescriptor> effectiveDescriptors = new ConcurrentHashMap<>();
098
099        public void addContribution(CacheDescriptor descriptor) {
100            String name = descriptor.name;
101            allDescriptors.computeIfAbsent(name, n -> new CopyOnWriteArrayList<>()).add(descriptor);
102            recompute(name);
103        }
104
105        public void removeContribution(CacheDescriptor descriptor) {
106            String name = descriptor.name;
107            allDescriptors.getOrDefault(name, Collections.emptyList()).remove(descriptor);
108            recompute(name);
109        }
110
111        protected void recompute(String name) {
112            CacheDescriptor desc = null;
113            for (CacheDescriptor d : allDescriptors.getOrDefault(name, Collections.emptyList())) {
114                if (d.remove) {
115                    desc = null;
116                } else {
117                    if (desc == null) {
118                        desc = d.clone();
119                    } else {
120                        desc.merge(d);
121                    }
122                }
123            }
124            if (desc == null) {
125                effectiveDescriptors.remove(name);
126            } else {
127                effectiveDescriptors.put(name, desc);
128            }
129        }
130
131        public CacheDescriptor getCacheDescriptor(String name) {
132            return effectiveDescriptors.get(name);
133        }
134
135        public Collection<CacheDescriptor> getCacheDescriptors() {
136            return effectiveDescriptors.values();
137        }
138
139        public CacheDescriptor getDefaultDescriptor() {
140            CacheDescriptor defaultDescriptor = getCacheDescriptor(DEFAULT_CACHE_ID);
141            if (defaultDescriptor == null) {
142                defaultDescriptor = new CacheDescriptor();
143                defaultDescriptor.ttl = DEFAULT_TTL;
144                defaultDescriptor.options.put(OPTION_MAX_SIZE, String.valueOf(DEFAULT_MAX_SIZE));
145            }
146            return defaultDescriptor;
147        }
148    }
149
150    public static class CacheInvalidation implements SerializableMessage {
151
152        private static final long serialVersionUID = 1L;
153
154        protected static final String SEP = "/";
155
156        public final String cacheName;
157
158        public final String key;
159
160        public CacheInvalidation(String name, String key) {
161            this.cacheName = name;
162            this.key = key;
163        }
164
165        @Override
166        public void serialize(OutputStream out) throws IOException {
167            String string = cacheName + SEP + key;
168            IOUtils.write(string, out, UTF_8);
169        }
170
171        public static CacheInvalidation deserialize(InputStream in) throws IOException {
172            String string = IOUtils.toString(in, UTF_8);
173            String[] parts = string.split(SEP, 2);
174            if (parts.length != 2) {
175                throw new IOException("Invalid invalidation: " + string);
176            }
177            String cacheName = parts[0];
178            String key = parts[1];
179            return new CacheInvalidation(cacheName, key);
180        }
181
182        @Override
183        public String toString() {
184            return getClass().getSimpleName() + "(" + cacheName + "," + key + ")";
185        }
186    }
187
188    public static abstract class AbstractCachePubSubInvalidator extends AbstractPubSubBroker<CacheInvalidation> {
189
190        public static final String ALL_KEYS = "__ALL__";
191
192        @Override
193        public CacheInvalidation deserialize(InputStream in) throws IOException {
194            return CacheInvalidation.deserialize(in);
195        }
196
197        public void sendInvalidation(String cacheName, String key) {
198            sendMessage(new CacheInvalidation(cacheName, key));
199        }
200
201        public void sendInvalidationsAll(String cacheName) {
202            sendMessage(new CacheInvalidation(cacheName, ALL_KEYS));
203        }
204
205        @Override
206        public void receivedMessage(CacheInvalidation invalidation) {
207            CacheManagement cache = (CacheManagement) getCache(invalidation.cacheName);
208            if (cache != null) {
209                String key = invalidation.key;
210                if (ALL_KEYS.equals(key)) {
211                    cache.invalidateLocalAll();
212                } else {
213                    cache.invalidateLocal(key);
214                }
215            }
216        }
217
218        // for testability, we want an alternative implementation to return a test cache
219        protected abstract Cache getCache(String name);
220    }
221
222    protected class CachePubSubInvalidator extends AbstractCachePubSubInvalidator {
223
224        @Override
225        protected Cache getCache(String name) {
226            return CacheServiceImpl.this.getCache(name);
227        }
228    }
229
230    @Override
231    public void registerContribution(Object contrib, String extensionPoint, ComponentInstance contributor) {
232        registerCacheDescriptor((CacheDescriptor) contrib);
233    }
234
235    @Override
236    @Deprecated
237    public void registerCache(String name, int maxSize, int timeout) {
238        registerCache(name);
239    }
240
241    @Override
242    public void registerCache(String name) {
243        CacheDescriptor defaultDescriptor = registry.getDefaultDescriptor().clone();
244        defaultDescriptor.name = name;
245        // add to registry (merging if needed)
246        registerCacheDescriptor(defaultDescriptor);
247        // start if needed
248        maybeStart(name);
249    }
250
251    public void registerCacheDescriptor(CacheDescriptor descriptor) {
252        registry.addContribution(descriptor);
253        log.info("Cache registered: " + descriptor.name);
254    }
255
256    @Override
257    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
258        unregisterCacheDescriptor((CacheDescriptor) contribution);
259    }
260
261    public void unregisterCacheDescriptor(CacheDescriptor descriptor) {
262        registry.removeContribution(descriptor);
263        log.info("Cache unregistered: " + descriptor.name);
264    }
265
266    @Override
267    public int getApplicationStartedOrder() {
268        ComponentInstance repositoryComponent = Framework.getRuntime().getComponentInstance(
269                "org.nuxeo.ecm.core.repository.RepositoryServiceComponent");
270        if (repositoryComponent == null || repositoryComponent.getInstance() == null) {
271            return super.getApplicationStartedOrder();
272        }
273        return ((DefaultComponent) repositoryComponent.getInstance()).getApplicationStartedOrder() - 5;
274    }
275
276    @Override
277    public void start(ComponentContext context) {
278        if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) {
279            // register cache invalidator
280            String nodeId = Framework.getProperty(NODE_ID_PROP);
281            if (StringUtils.isBlank(nodeId)) {
282                nodeId = String.valueOf(RANDOM.nextLong());
283                log.warn("Missing cluster node id configuration, please define it explicitly "
284                        + "(usually through repository.clustering.id). Using random cluster node id instead: "
285                        + nodeId);
286            } else {
287                nodeId = nodeId.trim();
288            }
289            invalidator = new CachePubSubInvalidator();
290            invalidator.initialize(CACHE_INVAL_PUBSUB_TOPIC, nodeId);
291            log.info("Registered cache invalidator for node: " + nodeId);
292        } else {
293            log.info("Not registering a cache invalidator because clustering is not enabled");
294        }
295        // create and starts caches
296        registry.getCacheDescriptors().forEach(this::startCacheDescriptor);
297    }
298
299    /** Creates and starts the cache. */
300    protected void startCacheDescriptor(CacheDescriptor desc) {
301        CacheManagement cache = desc.newInstance(invalidator);
302        cache.start();
303        caches.put(desc.name, cache);
304    }
305
306    @Override
307    public void stop(ComponentContext context) {
308        if (invalidator != null) {
309            invalidator.close();
310            invalidator = null;
311        }
312        for (CacheManagement cache : caches.values()) {
313            cache.stop();
314        }
315        caches.clear();
316    }
317
318    protected void maybeStart(String name) {
319        if (!Framework.getRuntime().getComponentManager().isStarted()) {
320            return;
321        }
322        // stop previous
323        CacheManagement cache = caches.get(name);
324        if (cache != null) {
325            cache.stop();
326        }
327        // start new one
328        startCacheDescriptor(registry.getCacheDescriptor(name));
329    }
330
331    // --------------- API ---------------
332
333    @Override
334    public Cache getCache(String name) {
335        return caches.get(name);
336    }
337
338    /**
339     * @since 9.3
340     */
341    public CacheDescriptor getCacheDescriptor(String descriptor) {
342        return registry.getCacheDescriptor(descriptor);
343    }
344
345}