001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Maxime Hilaire 018 * Thierry Martins 019 */ 020package org.nuxeo.ecm.core.cache; 021 022import static java.nio.charset.StandardCharsets.UTF_8; 023import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_MAX_SIZE; 024import static org.nuxeo.ecm.core.cache.CacheDescriptor.OPTION_MAX_SIZE; 025 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.util.Collection; 030import java.util.Map; 031import java.util.Random; 032import java.util.concurrent.ConcurrentHashMap; 033 034import org.apache.commons.io.IOUtils; 035import org.apache.commons.lang3.StringUtils; 036import org.apache.logging.log4j.LogManager; 037import org.apache.logging.log4j.Logger; 038import org.nuxeo.ecm.core.api.NuxeoException; 039import org.nuxeo.runtime.api.Framework; 040import org.nuxeo.runtime.model.ComponentContext; 041import org.nuxeo.runtime.model.ComponentInstance; 042import org.nuxeo.runtime.model.DefaultComponent; 043import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 044import org.nuxeo.runtime.pubsub.SerializableMessage; 045 046/** 047 * Cache service implementation to manage nuxeo cache 048 * 049 * @since 6.0 050 */ 051public class CacheServiceImpl extends DefaultComponent implements CacheService { 052 053 private static final Logger log = LogManager.getLogger(CacheServiceImpl.class); 054 055 /** 056 * @since 10.3 057 */ 058 public static final String XP_CACHES = "caches"; 059 060 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 061 062 /** 063 * @since 8.2 064 */ 065 public static final String DEFAULT_CACHE_ID = "default-cache"; 066 067 /** @since 9.3 */ 068 public static final String CACHE_INVAL_PUBSUB_TOPIC = "cacheinval"; 069 070 /** 071 * Framework property defining whether clustering is enabled. 072 * 073 * @since 9.3 074 */ 075 public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled"; 076 077 /** 078 * Framework property containing the node id. 079 * 080 * @since 9.3 081 */ 082 public static final String NODE_ID_PROP = "repository.clustering.id"; 083 084 // allows us to start caches registered programmatically through registerCache(name) 085 protected boolean started; 086 087 /** Currently registered caches. */ 088 protected final Map<String, CacheManagement> caches = new ConcurrentHashMap<>(); 089 090 protected CachePubSubInvalidator invalidator; 091 092 public static class CacheInvalidation implements SerializableMessage { 093 094 private static final long serialVersionUID = 1L; 095 096 protected static final String SEP = "/"; 097 098 public final String cacheName; 099 100 public final String key; 101 102 public CacheInvalidation(String name, String key) { 103 this.cacheName = name; 104 this.key = key; 105 } 106 107 @Override 108 public void serialize(OutputStream out) throws IOException { 109 String string = cacheName + SEP + key; 110 IOUtils.write(string, out, UTF_8); 111 } 112 113 public static CacheInvalidation deserialize(InputStream in) throws IOException { 114 String string = IOUtils.toString(in, UTF_8); 115 String[] parts = string.split(SEP, 2); 116 if (parts.length != 2) { 117 throw new IOException("Invalid invalidation: " + string); 118 } 119 String cacheName = parts[0]; 120 String key = parts[1]; 121 return new CacheInvalidation(cacheName, key); 122 } 123 124 @Override 125 public String toString() { 126 return getClass().getSimpleName() + "(" + cacheName + "," + key + ")"; 127 } 128 } 129 130 public static abstract class AbstractCachePubSubInvalidator extends AbstractPubSubBroker<CacheInvalidation> { 131 132 public static final String ALL_KEYS = "__ALL__"; 133 134 @Override 135 public CacheInvalidation deserialize(InputStream in) throws IOException { 136 return CacheInvalidation.deserialize(in); 137 } 138 139 public void sendInvalidation(String cacheName, String key) { 140 sendMessage(new CacheInvalidation(cacheName, key)); 141 } 142 143 public void sendInvalidationsAll(String cacheName) { 144 sendMessage(new CacheInvalidation(cacheName, ALL_KEYS)); 145 } 146 147 @Override 148 public void receivedMessage(CacheInvalidation invalidation) { 149 CacheManagement cache = (CacheManagement) getCache(invalidation.cacheName); 150 if (cache != null) { 151 String key = invalidation.key; 152 if (ALL_KEYS.equals(key)) { 153 cache.invalidateLocalAll(); 154 } else { 155 cache.invalidateLocal(key); 156 } 157 } 158 } 159 160 // for testability, we want an alternative implementation to return a test cache 161 protected abstract Cache getCache(String name); 162 } 163 164 protected class CachePubSubInvalidator extends AbstractCachePubSubInvalidator { 165 166 @Override 167 protected Cache getCache(String name) { 168 return CacheServiceImpl.this.getCache(name); 169 } 170 } 171 172 @Override 173 @Deprecated 174 public void registerCache(String name, int maxSize, int timeout) { 175 registerCache(name); 176 } 177 178 @Override 179 public void registerCache(String name) { 180 CacheDescriptor defaultDescriptor = getCacheDescriptor(DEFAULT_CACHE_ID); 181 if (defaultDescriptor == null) { 182 defaultDescriptor = new CacheDescriptor(); 183 defaultDescriptor.name = DEFAULT_CACHE_ID; 184 defaultDescriptor.options.put(OPTION_MAX_SIZE, String.valueOf(DEFAULT_MAX_SIZE)); 185 register(XP_CACHES, defaultDescriptor); 186 } 187 CacheDescriptor newDescriptor = (CacheDescriptor) new CacheDescriptor().merge(defaultDescriptor); 188 newDescriptor.name = name; 189 // add to registry (merging if needed) 190 register(XP_CACHES, newDescriptor); 191 // start if needed 192 maybeStart(name); 193 } 194 195 @Override 196 public int getApplicationStartedOrder() { 197 ComponentInstance repositoryComponent = Framework.getRuntime().getComponentInstance( 198 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 199 if (repositoryComponent == null || repositoryComponent.getInstance() == null) { 200 return super.getApplicationStartedOrder(); 201 } 202 return ((DefaultComponent) repositoryComponent.getInstance()).getApplicationStartedOrder() - 5; 203 } 204 205 @Override 206 public void start(ComponentContext context) { 207 super.start(context); 208 if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) { 209 // register cache invalidator 210 String nodeId = Framework.getProperty(NODE_ID_PROP); 211 if (StringUtils.isBlank(nodeId)) { 212 nodeId = String.valueOf(RANDOM.nextLong()); 213 log.warn("Missing cluster node id configuration, please define it explicitly " 214 + "(usually through repository.clustering.id). Using random cluster node id instead: {}", 215 nodeId); 216 } else { 217 nodeId = nodeId.trim(); 218 } 219 invalidator = new CachePubSubInvalidator(); 220 invalidator.initialize(CACHE_INVAL_PUBSUB_TOPIC, nodeId); 221 log.info("Registered cache invalidator for node: {}", nodeId); 222 } else { 223 log.info("Not registering a cache invalidator because clustering is not enabled"); 224 } 225 // create and starts caches 226 Collection<CacheDescriptor> descriptors = getDescriptors(XP_CACHES); 227 descriptors.forEach(this::startCacheDescriptor); 228 started = true; 229 } 230 231 /** Creates and starts the cache. */ 232 protected void startCacheDescriptor(CacheDescriptor desc) { 233 CacheManagement cache; 234 if (desc.klass == null) { 235 cache = new InMemoryCacheImpl(desc); // default cache implementation 236 } else { 237 try { 238 cache = desc.klass.getConstructor(CacheDescriptor.class).newInstance(desc); 239 } catch (ReflectiveOperationException e) { 240 throw new NuxeoException("Failed to instantiate class: " + desc.klass + " for cache: " + desc.name, e); 241 } 242 } 243 // wrap with checker, metrics and invalidator 244 cache = new CacheAttributesChecker(cache); 245 cache = new CacheMetrics(cache); 246 if (invalidator != null) { 247 cache = new CacheInvalidator(cache, invalidator); 248 } 249 cache.start(); 250 caches.put(desc.name, cache); 251 } 252 253 @Override 254 public void stop(ComponentContext context) throws InterruptedException { 255 super.stop(context); 256 if (invalidator != null) { 257 invalidator.close(); 258 invalidator = null; 259 } 260 for (CacheManagement cache : caches.values()) { 261 cache.stop(); 262 } 263 caches.clear(); 264 started = false; 265 } 266 267 protected void maybeStart(String name) { 268 if (!started) { 269 // cache will be started by start() 270 return; 271 } 272 // stop previous 273 CacheManagement cache = caches.get(name); 274 if (cache != null) { 275 cache.stop(); 276 } 277 // start new one 278 startCacheDescriptor(getCacheDescriptor(name)); 279 } 280 281 // --------------- API --------------- 282 283 @Override 284 public Cache getCache(String name) { 285 return caches.get(name); 286 } 287 288 /** 289 * @since 9.3 290 */ 291 public CacheDescriptor getCacheDescriptor(String descriptor) { 292 return getDescriptor(XP_CACHES, descriptor); 293 } 294 295}