001/* 002 * (C) Copyright 2014-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Maxime Hilaire 018 * Thierry Martins 019 */ 020package org.nuxeo.ecm.core.cache; 021 022import static java.nio.charset.StandardCharsets.UTF_8; 023import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_MAX_SIZE; 024import static org.nuxeo.ecm.core.cache.CacheDescriptor.DEFAULT_TTL; 025import static org.nuxeo.ecm.core.cache.CacheDescriptor.OPTION_MAX_SIZE; 026 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.OutputStream; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.CopyOnWriteArrayList; 037 038import org.apache.commons.io.IOUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.model.ComponentContext; 044import org.nuxeo.runtime.model.ComponentInstance; 045import org.nuxeo.runtime.model.ComponentName; 046import org.nuxeo.runtime.model.DefaultComponent; 047import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 048import org.nuxeo.runtime.pubsub.SerializableMessage; 049 050/** 051 * Cache service implementation to manage nuxeo cache 052 * 053 * @since 6.0 054 */ 055public class CacheServiceImpl extends DefaultComponent implements CacheService { 056 057 private static final Log log = LogFactory.getLog(CacheServiceImpl.class); 058 059 public static final ComponentName NAME = new ComponentName(CacheServiceImpl.class.getName()); 060 061 protected static final Random RANDOM = new Random(); 062 063 /** 064 * @since 8.2 065 */ 066 public static final String DEFAULT_CACHE_ID = "default-cache"; 067 068 /** @since 9.3 */ 069 public static final String CACHE_INVAL_PUBSUB_TOPIC = "cacheinval"; 070 071 /** 072 * Framework property defining whether clustering is enabled. 073 * 074 * @since 9.3 075 */ 076 public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled"; 077 078 /** 079 * Framework property containing the node id. 080 * 081 * @since 9.3 082 */ 083 public static final String NODE_ID_PROP = "repository.clustering.id"; 084 085 protected final CacheDescriptorRegistry registry = new CacheDescriptorRegistry(); 086 087 /** Currently registered caches. */ 088 protected final Map<String, CacheManagement> caches = new ConcurrentHashMap<>(); 089 090 protected CachePubSubInvalidator invalidator; 091 092 // SimpleContributionRegistry is overkill and does not deal well with a "remove" feature 093 protected static class CacheDescriptorRegistry { 094 095 protected Map<String, List<CacheDescriptor>> allDescriptors = new ConcurrentHashMap<>(); 096 097 protected Map<String, CacheDescriptor> effectiveDescriptors = new ConcurrentHashMap<>(); 098 099 public void addContribution(CacheDescriptor descriptor) { 100 String name = descriptor.name; 101 allDescriptors.computeIfAbsent(name, n -> new CopyOnWriteArrayList<>()).add(descriptor); 102 recompute(name); 103 } 104 105 public void removeContribution(CacheDescriptor descriptor) { 106 String name = descriptor.name; 107 allDescriptors.getOrDefault(name, Collections.emptyList()).remove(descriptor); 108 recompute(name); 109 } 110 111 protected void recompute(String name) { 112 CacheDescriptor desc = null; 113 for (CacheDescriptor d : allDescriptors.getOrDefault(name, Collections.emptyList())) { 114 if (d.remove) { 115 desc = null; 116 } else { 117 if (desc == null) { 118 desc = d.clone(); 119 } else { 120 desc.merge(d); 121 } 122 } 123 } 124 if (desc == null) { 125 effectiveDescriptors.remove(name); 126 } else { 127 effectiveDescriptors.put(name, desc); 128 } 129 } 130 131 public CacheDescriptor getCacheDescriptor(String name) { 132 return effectiveDescriptors.get(name); 133 } 134 135 public Collection<CacheDescriptor> getCacheDescriptors() { 136 return effectiveDescriptors.values(); 137 } 138 139 public CacheDescriptor getDefaultDescriptor() { 140 CacheDescriptor defaultDescriptor = getCacheDescriptor(DEFAULT_CACHE_ID); 141 if (defaultDescriptor == null) { 142 defaultDescriptor = new CacheDescriptor(); 143 defaultDescriptor.ttl = DEFAULT_TTL; 144 defaultDescriptor.options.put(OPTION_MAX_SIZE, String.valueOf(DEFAULT_MAX_SIZE)); 145 } 146 return defaultDescriptor; 147 } 148 } 149 150 protected static class CacheInvalidation implements SerializableMessage { 151 152 private static final long serialVersionUID = 1L; 153 154 protected static final String SEP = "/"; 155 156 public final String cacheName; 157 158 public final String key; 159 160 public CacheInvalidation(String name, String key) { 161 this.cacheName = name; 162 this.key = key; 163 } 164 165 @Override 166 public void serialize(OutputStream out) throws IOException { 167 String string = cacheName + SEP + key; 168 IOUtils.write(string, out, UTF_8); 169 } 170 171 public static CacheInvalidation deserialize(InputStream in) throws IOException { 172 String string = IOUtils.toString(in, UTF_8); 173 String[] parts = string.split(SEP, 2); 174 if (parts.length != 2) { 175 throw new IOException("Invalid invalidation: " + string); 176 } 177 String cacheName = parts[0]; 178 String key = parts[1]; 179 return new CacheInvalidation(cacheName, key); 180 } 181 182 @Override 183 public String toString() { 184 return getClass().getSimpleName() + "(" + cacheName + "," + key + ")"; 185 } 186 } 187 188 protected static abstract class AbstractCachePubSubInvalidator extends AbstractPubSubBroker<CacheInvalidation> { 189 190 public static final String ALL_KEYS = "__ALL__"; 191 192 @Override 193 public CacheInvalidation deserialize(InputStream in) throws IOException { 194 return CacheInvalidation.deserialize(in); 195 } 196 197 public void sendInvalidation(String cacheName, String key) { 198 sendMessage(new CacheInvalidation(cacheName, key)); 199 } 200 201 public void sendInvalidationsAll(String cacheName) { 202 sendMessage(new CacheInvalidation(cacheName, ALL_KEYS)); 203 } 204 205 @Override 206 public void receivedMessage(CacheInvalidation invalidation) { 207 CacheManagement cache = (CacheManagement) getCache(invalidation.cacheName); 208 if (cache != null) { 209 String key = invalidation.key; 210 if (ALL_KEYS.equals(key)) { 211 cache.invalidateLocalAll(); 212 } else { 213 cache.invalidateLocal(key); 214 } 215 } 216 } 217 218 // for testability, we want an alternative implementation to return a test cache 219 protected abstract Cache getCache(String name); 220 } 221 222 protected class CachePubSubInvalidator extends AbstractCachePubSubInvalidator { 223 224 @Override 225 protected Cache getCache(String name) { 226 return CacheServiceImpl.this.getCache(name); 227 } 228 } 229 230 @Override 231 public void registerContribution(Object contrib, String extensionPoint, ComponentInstance contributor) { 232 registerCacheDescriptor((CacheDescriptor) contrib); 233 } 234 235 @Override 236 @Deprecated 237 public void registerCache(String name, int maxSize, int timeout) { 238 registerCache(name); 239 } 240 241 @Override 242 public void registerCache(String name) { 243 CacheDescriptor defaultDescriptor = registry.getDefaultDescriptor().clone(); 244 defaultDescriptor.name = name; 245 // add to registry (merging if needed) 246 registerCacheDescriptor(defaultDescriptor); 247 // start if needed 248 maybeStart(name); 249 } 250 251 public void registerCacheDescriptor(CacheDescriptor descriptor) { 252 registry.addContribution(descriptor); 253 log.info("Cache registered: " + descriptor.name); 254 } 255 256 @Override 257 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 258 unregisterCacheDescriptor((CacheDescriptor) contribution); 259 } 260 261 public void unregisterCacheDescriptor(CacheDescriptor descriptor) { 262 registry.removeContribution(descriptor); 263 log.info("Cache unregistered: " + descriptor.name); 264 } 265 266 @Override 267 public int getApplicationStartedOrder() { 268 ComponentInstance repositoryComponent = Framework.getRuntime().getComponentInstance( 269 "org.nuxeo.ecm.core.repository.RepositoryServiceComponent"); 270 if (repositoryComponent == null || repositoryComponent.getInstance() == null) { 271 return super.getApplicationStartedOrder(); 272 } 273 return ((DefaultComponent) repositoryComponent.getInstance()).getApplicationStartedOrder() - 5; 274 } 275 276 @Override 277 public void start(ComponentContext context) { 278 if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) { 279 // register cache invalidator 280 String nodeId = Framework.getProperty(NODE_ID_PROP); 281 if (StringUtils.isBlank(nodeId)) { 282 nodeId = String.valueOf(RANDOM.nextLong()); 283 log.warn("Missing cluster node id configuration, please define it explicitly " 284 + "(usually through repository.clustering.id). Using random cluster node id instead: " 285 + nodeId); 286 } else { 287 nodeId = nodeId.trim(); 288 } 289 invalidator = new CachePubSubInvalidator(); 290 invalidator.initialize(CACHE_INVAL_PUBSUB_TOPIC, nodeId); 291 log.info("Registered cache invalidator for node: " + nodeId); 292 } else { 293 log.info("Not registering a cache invalidator because clustering is not enabled"); 294 } 295 // create and starts caches 296 registry.getCacheDescriptors().forEach(this::startCacheDescriptor); 297 } 298 299 /** Creates and starts the cache. */ 300 protected void startCacheDescriptor(CacheDescriptor desc) { 301 CacheManagement cache = desc.newInstance(invalidator); 302 cache.start(); 303 caches.put(desc.name, cache); 304 } 305 306 @Override 307 public void stop(ComponentContext context) { 308 if (invalidator != null) { 309 invalidator.close(); 310 invalidator = null; 311 } 312 for (CacheManagement cache : caches.values()) { 313 cache.stop(); 314 } 315 caches.clear(); 316 } 317 318 protected void maybeStart(String name) { 319 if (!Framework.getRuntime().getComponentManager().isStarted()) { 320 return; 321 } 322 // stop previous 323 CacheManagement cache = caches.get(name); 324 if (cache != null) { 325 cache.stop(); 326 } 327 // start new one 328 startCacheDescriptor(registry.getCacheDescriptor(name)); 329 } 330 331 // --------------- API --------------- 332 333 @Override 334 public Cache getCache(String name) { 335 return caches.get(name); 336 } 337 338 /** 339 * @since 9.3 340 */ 341 public CacheDescriptor getCacheDescriptor(String descriptor) { 342 return registry.getCacheDescriptor(descriptor); 343 } 344 345}