001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Thierry Delprat <tdelprat@nuxeo.com> 016 * Antoine Taillefer <ataillefer@nuxeo.com> 017 */ 018 019package org.nuxeo.ecm.core.transientstore; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.Blob; 030import org.nuxeo.ecm.core.cache.Cache; 031import org.nuxeo.ecm.core.cache.CacheDescriptor; 032import org.nuxeo.ecm.core.cache.CacheService; 033import org.nuxeo.ecm.core.cache.CacheServiceImpl; 034import org.nuxeo.ecm.core.cache.InMemoryCacheImpl; 035import org.nuxeo.ecm.core.transientstore.api.TransientStore; 036import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 037import org.nuxeo.runtime.api.Framework; 038 039/** 040 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a 041 * representation of an entry in the store. 042 * 043 * @since 7.2 044 */ 045public class SimpleTransientStore extends AbstractTransientStore { 046 047 protected Log log = LogFactory.getLog(SimpleTransientStore.class); 048 049 protected Cache l1Cache; 050 051 protected Cache l2Cache; 052 053 protected CacheDescriptor l1cd; 054 055 protected CacheDescriptor l2cd; 056 057 protected AtomicLong storageSize = new AtomicLong(0); 058 059 public SimpleTransientStore() { 060 } 061 062 @Override 063 public void init(TransientStoreConfig config) { 064 log.debug("Initializing SimpleTransientStore: " + config.getName()); 065 super.init(config); 066 CacheService cs = Framework.getService(CacheService.class); 067 if (cs == null) { 068 throw new UnsupportedOperationException("Cache service is required"); 069 } 070 // register the caches 071 l1cd = getL1CacheConfig(); 072 l2cd = getL2CacheConfig(); 073 ((CacheServiceImpl) cs).registerCache(l1cd); 074 ((CacheServiceImpl) cs).registerCache(l2cd); 075 l1cd.start(); 076 l2cd.start(); 077 078 // get caches 079 l1Cache = cs.getCache(l1cd.name); 080 l2Cache = cs.getCache(l2cd.name); 081 } 082 083 @Override 084 public void shutdown() { 085 log.debug("Shutting down SimpleTransientStore: " + config.getName()); 086 CacheService cs = Framework.getService(CacheService.class); 087 if (cs != null) { 088 if (l1cd != null) { 089 ((CacheServiceImpl) cs).unregisterCache(l1cd); 090 } 091 if (l2cd != null) { 092 ((CacheServiceImpl) cs).unregisterCache(l2cd); 093 } 094 } 095 } 096 097 @Override 098 public boolean exists(String key) { 099 return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key); 100 } 101 102 @Override 103 public void putParameter(String key, String parameter, Serializable value) { 104 synchronized (this) { 105 StorageEntry entry = getStorageEntry(key); 106 if (entry == null) { 107 entry = new StorageEntry(); 108 } 109 entry.putParam(parameter, value); 110 if (log.isDebugEnabled()) { 111 log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter, 112 value, key)); 113 } 114 putStorageEntry(key, entry); 115 } 116 } 117 118 @Override 119 public Serializable getParameter(String key, String parameter) { 120 StorageEntry entry = getStorageEntry(key); 121 if (entry == null) { 122 return null; 123 } 124 Serializable res = entry.getParam(parameter); 125 if (log.isDebugEnabled()) { 126 log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res)); 127 } 128 return res; 129 } 130 131 @Override 132 public void putParameters(String key, Map<String, Serializable> parameters) { 133 synchronized (this) { 134 StorageEntry entry = getStorageEntry(key); 135 if (entry == null) { 136 entry = new StorageEntry(); 137 } 138 entry.putParams(parameters); 139 if (log.isDebugEnabled()) { 140 log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key)); 141 } 142 putStorageEntry(key, entry); 143 } 144 } 145 146 @Override 147 public Map<String, Serializable> getParameters(String key) { 148 StorageEntry entry = getStorageEntry(key); 149 if (entry == null) { 150 return null; 151 } 152 Map<String, Serializable> res = entry.getParams(); 153 if (log.isDebugEnabled()) { 154 log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res)); 155 } 156 return res; 157 } 158 159 @Override 160 public List<Blob> getBlobs(String key) { 161 StorageEntry entry = getStorageEntry(key); 162 if (entry == null) { 163 return null; 164 } 165 // Get blob information from the store 166 List<Map<String, String>> blobInfos = entry.getBlobInfos(); 167 if (blobInfos == null) { 168 return new ArrayList<>(); 169 } 170 // Load blobs from the file system 171 return loadBlobs(blobInfos); 172 } 173 174 @Override 175 public long getSize(String key) { 176 StorageEntry entry = getStorageEntry(key); 177 if (entry == null) { 178 return -1; 179 } 180 long size = entry.getSize(); 181 if (log.isDebugEnabled()) { 182 log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size)); 183 } 184 return size; 185 } 186 187 @Override 188 public boolean isCompleted(String key) { 189 StorageEntry entry = getStorageEntry(key); 190 boolean completed = entry != null && entry.isCompleted(); 191 if (log.isDebugEnabled()) { 192 log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key, 193 completed)); 194 } 195 return completed; 196 } 197 198 @Override 199 public void setCompleted(String key, boolean completed) { 200 synchronized (this) { 201 StorageEntry entry = getStorageEntry(key); 202 if (entry == null) { 203 entry = new StorageEntry(); 204 } 205 entry.setCompleted(completed); 206 if (log.isDebugEnabled()) { 207 log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s", 208 completed, key)); 209 } 210 putStorageEntry(key, entry); 211 } 212 } 213 214 @Override 215 public void remove(String key) { 216 synchronized (this) { 217 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 218 if (entry == null) { 219 entry = (StorageEntry) getL2Cache().get(key); 220 if (log.isDebugEnabled()) { 221 log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key)); 222 } 223 getL2Cache().invalidate(key); 224 } else { 225 if (log.isDebugEnabled()) { 226 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 227 } 228 getL1Cache().invalidate(key); 229 } 230 if (entry != null) { 231 long entrySize = entry.getSize(); 232 if (entrySize > 0) { 233 decrementStorageSize(entrySize); 234 } 235 } 236 } 237 } 238 239 @Override 240 public void release(String key) { 241 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 242 if (entry != null) { 243 if (log.isDebugEnabled()) { 244 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 245 } 246 getL1Cache().invalidate(key); 247 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 248 if (log.isDebugEnabled()) { 249 log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key)); 250 } 251 getL2Cache().put(key, entry); 252 } 253 } 254 } 255 256 @Override 257 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 258 synchronized (this) { 259 StorageEntry entry = getStorageEntry(key); 260 // Update storage size 261 if (entry == null) { 262 if (sizeOfBlobs > 0) { 263 incrementStorageSize(sizeOfBlobs); 264 } 265 entry = new StorageEntry(); 266 } else { 267 incrementStorageSize(sizeOfBlobs - entry.getSize()); 268 } 269 // Update entry size 270 entry.setSize(sizeOfBlobs); 271 // Set blob information 272 entry.setBlobInfos(blobInfos); 273 if (log.isDebugEnabled()) { 274 log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key)); 275 } 276 putStorageEntry(key, entry); 277 } 278 } 279 280 @Override 281 public long getStorageSize() { 282 int intStorageSize = (int) storageSize.get(); 283 if (log.isDebugEnabled()) { 284 log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize)); 285 } 286 return intStorageSize; 287 } 288 289 @Override 290 protected void setStorageSize(long newSize) { 291 if (log.isDebugEnabled()) { 292 log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize)); 293 } 294 storageSize.set(newSize); 295 } 296 297 @Override 298 protected long incrementStorageSize(long size) { 299 long incremented = storageSize.addAndGet(size); 300 if (log.isDebugEnabled()) { 301 log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented)); 302 } 303 return incremented; 304 } 305 306 @Override 307 protected long decrementStorageSize(long size) { 308 long decremented = storageSize.addAndGet(-size); 309 if (log.isDebugEnabled()) { 310 log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented)); 311 } 312 return decremented; 313 } 314 315 @Override 316 protected void removeAllEntries() { 317 log.debug("Invalidating all entries from L1 and L2 caches"); 318 getL1Cache().invalidateAll(); 319 getL2Cache().invalidateAll(); 320 } 321 322 public Cache getL1Cache() { 323 return l1Cache; 324 } 325 326 public Cache getL2Cache() { 327 return l2Cache; 328 } 329 330 protected CacheDescriptor getL1CacheConfig() { 331 return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL()); 332 } 333 334 protected CacheDescriptor getL2CacheConfig() { 335 return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL()); 336 } 337 338 protected class TransientCacheConfig extends CacheDescriptor { 339 340 TransientCacheConfig(String name, int ttl) { 341 super(); 342 super.name = name; 343 super.implClass = getCacheImplClass(); 344 super.ttl = ttl; 345 } 346 } 347 348 protected Class<? extends Cache> getCacheImplClass() { 349 return InMemoryCacheImpl.class; 350 } 351 352 /** 353 * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't 354 * exist. 355 */ 356 protected StorageEntry getStorageEntry(String key) { 357 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 358 if (entry == null) { 359 entry = (StorageEntry) getL2Cache().get(key); 360 } 361 return entry; 362 } 363 364 /** 365 * Stores the given {@code entry} with the given {@code key}. 366 * <p> 367 * If an entry exists with the given {@code key} it is overwritten. 368 */ 369 protected void putStorageEntry(String key, StorageEntry entry) { 370 getL1Cache().put(key, entry); 371 } 372 373}