001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Maxime Hilaire 018 * Thierry Martins 019 */ 020package org.nuxeo.ecm.core.cache; 021 022import static java.nio.charset.StandardCharsets.UTF_8; 023import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_MAX_SIZE; 024import static org.nuxeo.ecm.core.cache.CacheDescriptor.OPTION_MAX_SIZE; 025 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.util.Collection; 030import java.util.Map; 031import java.util.concurrent.ConcurrentHashMap; 032 033import org.apache.commons.io.IOUtils; 034import org.apache.logging.log4j.LogManager; 035import org.apache.logging.log4j.Logger; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.runtime.api.Framework; 038import org.nuxeo.runtime.cluster.ClusterService; 039import org.nuxeo.runtime.model.ComponentContext; 040import org.nuxeo.runtime.model.ComponentInstance; 041import org.nuxeo.runtime.model.DefaultComponent; 042import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 043import org.nuxeo.runtime.pubsub.SerializableMessage; 044 045/** 046 * Cache service implementation to manage nuxeo cache 047 * 048 * @since 6.0 049 */ 050public class CacheServiceImpl extends DefaultComponent implements CacheService { 051 052 private static final Logger log = LogManager.getLogger(CacheServiceImpl.class); 053 054 /** 055 * @since 10.3 056 */ 057 public static final String XP_CACHES = "caches"; 058 059 /** 060 * @since 8.2 061 */ 062 public static final String DEFAULT_CACHE_ID = "default-cache"; 063 064 /** @since 9.3 */ 065 public static final String CACHE_INVAL_PUBSUB_TOPIC = "cacheinval"; 066 067 // allows us to start caches registered programmatically through registerCache(name) 068 protected boolean started; 069 070 /** Currently registered caches. */ 071 protected final Map<String, CacheManagement> caches = new ConcurrentHashMap<>(); 072 073 protected CachePubSubInvalidator invalidator; 074 075 public static class CacheInvalidation implements SerializableMessage { 076 077 private static final long serialVersionUID = 1L; 078 079 protected static final String SEP = "/"; 080 081 public final String cacheName; 082 083 public final String key; 084 085 public CacheInvalidation(String name, String key) { 086 this.cacheName = name; 087 this.key = key; 088 } 089 090 @Override 091 public void serialize(OutputStream out) throws IOException { 092 String string = cacheName + SEP + key; 093 IOUtils.write(string, out, UTF_8); 094 } 095 096 public static CacheInvalidation deserialize(InputStream in) throws IOException { 097 String string = IOUtils.toString(in, UTF_8); 098 String[] parts = string.split(SEP, 2); 099 if (parts.length != 2) { 100 throw new IOException("Invalid invalidation: " + string); 101 } 102 String cacheName = parts[0]; 103 String key = parts[1]; 104 return new CacheInvalidation(cacheName, key); 105 } 106 107 @Override 108 public String toString() { 109 return getClass().getSimpleName() + "(" + cacheName + "," + key + ")"; 110 } 111 } 112 113 public static abstract class AbstractCachePubSubInvalidator extends AbstractPubSubBroker<CacheInvalidation> { 114 115 public static final String ALL_KEYS = "__ALL__"; 116 117 @Override 118 public CacheInvalidation deserialize(InputStream in) throws IOException { 119 return CacheInvalidation.deserialize(in); 120 } 121 122 public void sendInvalidation(String cacheName, String key) { 123 sendMessage(new CacheInvalidation(cacheName, key)); 124 } 125 126 public void sendInvalidationsAll(String cacheName) { 127 sendMessage(new CacheInvalidation(cacheName, ALL_KEYS)); 128 } 129 130 @Override 131 public void receivedMessage(CacheInvalidation invalidation) { 132 CacheManagement cache = (CacheManagement) getCache(invalidation.cacheName); 133 if (cache != null) { 134 String key = invalidation.key; 135 if (ALL_KEYS.equals(key)) { 136 cache.invalidateLocalAll(); 137 } else { 138 cache.invalidateLocal(key); 139 } 140 } 141 } 142 143 // for testability, we want an alternative implementation to return a test cache 144 protected abstract Cache getCache(String name); 145 } 146 147 protected class CachePubSubInvalidator extends AbstractCachePubSubInvalidator { 148 149 @Override 150 protected Cache getCache(String name) { 151 return CacheServiceImpl.this.getCache(name); 152 } 153 } 154 155 @Override 156 @Deprecated 157 public void registerCache(String name, int maxSize, int timeout) { 158 registerCache(name); 159 } 160 161 @Override 162 public void registerCache(String name) { 163 CacheDescriptor defaultDescriptor = getCacheDescriptor(DEFAULT_CACHE_ID); 164 if (defaultDescriptor == null) { 165 defaultDescriptor = new CacheDescriptor(); 166 defaultDescriptor.name = DEFAULT_CACHE_ID; 167 defaultDescriptor.options.put(OPTION_MAX_SIZE, String.valueOf(DEFAULT_MAX_SIZE)); 168 register(XP_CACHES, defaultDescriptor); 169 } 170 CacheDescriptor newDescriptor = (CacheDescriptor) new CacheDescriptor().merge(defaultDescriptor); 171 newDescriptor.name = name; 172 // add to registry (merging if needed) 173 register(XP_CACHES, newDescriptor); 174 // start if needed 175 maybeStart(name); 176 } 177 178 @Override 179 public int getApplicationStartedOrder() { 180 ComponentInstance repositoryComponent = Framework.getRuntime().getComponentInstance( 181 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 182 if (repositoryComponent == null || repositoryComponent.getInstance() == null) { 183 return super.getApplicationStartedOrder(); 184 } 185 return ((DefaultComponent) repositoryComponent.getInstance()).getApplicationStartedOrder() - 5; 186 } 187 188 @Override 189 public void start(ComponentContext context) { 190 super.start(context); 191 ClusterService clusterService = Framework.getService(ClusterService.class); 192 if (clusterService.isEnabled()) { 193 // register cache invalidator 194 String nodeId = clusterService.getNodeId(); 195 invalidator = new CachePubSubInvalidator(); 196 invalidator.initialize(CACHE_INVAL_PUBSUB_TOPIC, nodeId); 197 log.info("Registered cache invalidator for node: {}", nodeId); 198 } else { 199 log.info("Not registering a cache invalidator because clustering is not enabled"); 200 } 201 // create and starts caches 202 Collection<CacheDescriptor> descriptors = getDescriptors(XP_CACHES); 203 descriptors.forEach(this::startCacheDescriptor); 204 started = true; 205 } 206 207 /** Creates and starts the cache. */ 208 protected void startCacheDescriptor(CacheDescriptor desc) { 209 CacheManagement cache; 210 if (desc.klass == null) { 211 cache = new InMemoryCacheImpl(desc); // default cache implementation 212 } else { 213 try { 214 cache = desc.klass.getConstructor(CacheDescriptor.class).newInstance(desc); 215 } catch (ReflectiveOperationException e) { 216 throw new NuxeoException("Failed to instantiate class: " + desc.klass + " for cache: " + desc.name, e); 217 } 218 } 219 // wrap with checker, metrics and invalidator 220 cache = new CacheAttributesChecker(cache); 221 cache = new CacheMetrics(cache); 222 if (invalidator != null) { 223 cache = new CacheInvalidator(cache, invalidator); 224 } 225 cache.start(); 226 caches.put(desc.name, cache); 227 } 228 229 @Override 230 public void stop(ComponentContext context) throws InterruptedException { 231 super.stop(context); 232 if (invalidator != null) { 233 invalidator.close(); 234 invalidator = null; 235 } 236 for (CacheManagement cache : caches.values()) { 237 cache.stop(); 238 } 239 caches.clear(); 240 started = false; 241 } 242 243 protected void maybeStart(String name) { 244 if (!started) { 245 // cache will be started by start() 246 return; 247 } 248 // stop previous 249 CacheManagement cache = caches.get(name); 250 if (cache != null) { 251 cache.stop(); 252 } 253 // start new one 254 startCacheDescriptor(getCacheDescriptor(name)); 255 } 256 257 // --------------- API --------------- 258 259 @Override 260 public Cache getCache(String name) { 261 return caches.get(name); 262 } 263 264 /** 265 * @since 9.3 266 */ 267 public CacheDescriptor getCacheDescriptor(String descriptor) { 268 return getDescriptor(XP_CACHES, descriptor); 269 } 270 271}