001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.atomic.AtomicLong; 030 031import org.apache.commons.io.FileUtils; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.ecm.core.api.Blob; 035import org.nuxeo.ecm.core.cache.Cache; 036import org.nuxeo.ecm.core.cache.CacheDescriptor; 037import org.nuxeo.ecm.core.cache.CacheService; 038import org.nuxeo.ecm.core.cache.CacheServiceImpl; 039import org.nuxeo.ecm.core.cache.InMemoryCacheImpl; 040import org.nuxeo.ecm.core.transientstore.api.TransientStore; 041import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 042import org.nuxeo.runtime.api.Framework; 043 044/** 045 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a 046 * representation of an entry in the store. 047 * 048 * @since 7.2 049 */ 050public class SimpleTransientStore extends AbstractTransientStore { 051 052 protected Log log = LogFactory.getLog(SimpleTransientStore.class); 053 054 protected Cache l1Cache; 055 056 protected Cache l2Cache; 057 058 protected CacheDescriptor l1cd; 059 060 protected CacheDescriptor l2cd; 061 062 protected AtomicLong storageSize = new AtomicLong(0); 063 064 public SimpleTransientStore() { 065 } 066 067 @Override 068 public void init(TransientStoreConfig config) { 069 log.debug("Initializing SimpleTransientStore: " + config.getName()); 070 super.init(config); 071 CacheService cs = Framework.getService(CacheService.class); 072 if (cs == null) { 073 throw new UnsupportedOperationException("Cache service is required"); 074 } 075 // register the caches 076 l1cd = getL1CacheConfig(); 077 l2cd = getL2CacheConfig(); 078 ((CacheServiceImpl) cs).registerCache(l1cd); 079 ((CacheServiceImpl) cs).registerCache(l2cd); 080 081 // get caches 082 l1Cache = cs.getCache(l1cd.name); 083 l2Cache = cs.getCache(l2cd.name); 084 } 085 086 @Override 087 public void shutdown() { 088 log.debug("Shutting down SimpleTransientStore: " + config.getName()); 089 CacheService cs = Framework.getService(CacheService.class); 090 if (cs != null) { 091 if (l1cd != null) { 092 ((CacheServiceImpl) cs).unregisterCache(l1cd); 093 } 094 if (l2cd != null) { 095 ((CacheServiceImpl) cs).unregisterCache(l2cd); 096 } 097 } 098 } 099 100 @Override 101 public boolean exists(String key) { 102 return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key); 103 } 104 105 @Override 106 public Set<String> keySet() { 107 Set<String> keys = new HashSet<>(); 108 keys.addAll(getL1Cache().keySet()); 109 keys.addAll(getL2Cache().keySet()); 110 return keys; 111 } 112 113 @Override 114 public void putParameter(String key, String parameter, Serializable value) { 115 synchronized (this) { 116 StorageEntry entry = getStorageEntry(key); 117 if (entry == null) { 118 entry = new StorageEntry(); 119 } 120 entry.putParam(parameter, value); 121 if (log.isDebugEnabled()) { 122 log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter, 123 value, key)); 124 } 125 putStorageEntry(key, entry); 126 } 127 } 128 129 @Override 130 public Serializable getParameter(String key, String parameter) { 131 StorageEntry entry = getStorageEntry(key); 132 if (entry == null) { 133 return null; 134 } 135 Serializable res = entry.getParam(parameter); 136 if (log.isDebugEnabled()) { 137 log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res)); 138 } 139 return res; 140 } 141 142 @Override 143 public void putParameters(String key, Map<String, Serializable> parameters) { 144 synchronized (this) { 145 StorageEntry entry = getStorageEntry(key); 146 if (entry == null) { 147 entry = new StorageEntry(); 148 } 149 entry.putParams(parameters); 150 if (log.isDebugEnabled()) { 151 log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key)); 152 } 153 putStorageEntry(key, entry); 154 } 155 } 156 157 @Override 158 public Map<String, Serializable> getParameters(String key) { 159 StorageEntry entry = getStorageEntry(key); 160 if (entry == null) { 161 return null; 162 } 163 Map<String, Serializable> res = entry.getParams(); 164 if (log.isDebugEnabled()) { 165 log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res)); 166 } 167 return res; 168 } 169 170 @Override 171 public List<Blob> getBlobs(String key) { 172 StorageEntry entry = getStorageEntry(key); 173 if (entry == null) { 174 return null; 175 } 176 // Get blob information from the store 177 List<Map<String, String>> blobInfos = entry.getBlobInfos(); 178 if (blobInfos == null) { 179 return new ArrayList<>(); 180 } 181 // Load blobs from the file system 182 return loadBlobs(blobInfos); 183 } 184 185 @Override 186 public long getSize(String key) { 187 StorageEntry entry = getStorageEntry(key); 188 if (entry == null) { 189 return -1; 190 } 191 long size = entry.getSize(); 192 if (log.isDebugEnabled()) { 193 log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size)); 194 } 195 return size; 196 } 197 198 @Override 199 public boolean isCompleted(String key) { 200 StorageEntry entry = getStorageEntry(key); 201 boolean completed = entry != null && entry.isCompleted(); 202 if (log.isDebugEnabled()) { 203 log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key, 204 completed)); 205 } 206 return completed; 207 } 208 209 @Override 210 public void setCompleted(String key, boolean completed) { 211 synchronized (this) { 212 StorageEntry entry = getStorageEntry(key); 213 if (entry == null) { 214 entry = new StorageEntry(); 215 } 216 entry.setCompleted(completed); 217 if (log.isDebugEnabled()) { 218 log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s", 219 completed, key)); 220 } 221 putStorageEntry(key, entry); 222 } 223 } 224 225 @Override 226 public void remove(String key) { 227 synchronized (this) { 228 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 229 if (entry == null) { 230 entry = (StorageEntry) getL2Cache().get(key); 231 if (log.isDebugEnabled()) { 232 log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key)); 233 } 234 getL2Cache().invalidate(key); 235 } else { 236 if (log.isDebugEnabled()) { 237 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 238 } 239 getL1Cache().invalidate(key); 240 } 241 if (entry != null) { 242 long entrySize = entry.getSize(); 243 if (entrySize > 0) { 244 decrementStorageSize(entrySize); 245 } 246 } 247 FileUtils.deleteQuietly(getCachingDirectory(key)); 248 } 249 } 250 251 @Override 252 public void release(String key) { 253 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 254 if (entry != null) { 255 if (log.isDebugEnabled()) { 256 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 257 } 258 getL1Cache().invalidate(key); 259 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 260 if (log.isDebugEnabled()) { 261 log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key)); 262 } 263 getL2Cache().put(key, entry); 264 } 265 } 266 } 267 268 @Override 269 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 270 synchronized (this) { 271 StorageEntry entry = getStorageEntry(key); 272 // Update storage size 273 if (entry == null) { 274 if (sizeOfBlobs > 0) { 275 incrementStorageSize(sizeOfBlobs); 276 } 277 entry = new StorageEntry(); 278 } else { 279 incrementStorageSize(sizeOfBlobs - entry.getSize()); 280 } 281 // Update entry size 282 entry.setSize(sizeOfBlobs); 283 // Set blob information 284 entry.setBlobInfos(blobInfos); 285 if (log.isDebugEnabled()) { 286 log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key)); 287 } 288 putStorageEntry(key, entry); 289 } 290 } 291 292 @Override 293 public long getStorageSize() { 294 int intStorageSize = (int) storageSize.get(); 295 if (log.isDebugEnabled()) { 296 log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize)); 297 } 298 return intStorageSize; 299 } 300 301 @Override 302 protected void setStorageSize(long newSize) { 303 if (log.isDebugEnabled()) { 304 log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize)); 305 } 306 storageSize.set(newSize); 307 } 308 309 @Override 310 protected long incrementStorageSize(long size) { 311 long incremented = storageSize.addAndGet(size); 312 if (log.isDebugEnabled()) { 313 log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented)); 314 } 315 return incremented; 316 } 317 318 @Override 319 protected long decrementStorageSize(long size) { 320 long decremented = storageSize.addAndGet(-size); 321 if (log.isDebugEnabled()) { 322 log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented)); 323 } 324 return decremented; 325 } 326 327 @Override 328 protected void removeAllEntries() { 329 log.debug("Invalidating all entries from L1 and L2 caches"); 330 getL1Cache().invalidateAll(); 331 getL2Cache().invalidateAll(); 332 } 333 334 public Cache getL1Cache() { 335 return l1Cache; 336 } 337 338 public Cache getL2Cache() { 339 return l2Cache; 340 } 341 342 protected CacheDescriptor getL1CacheConfig() { 343 return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL()); 344 } 345 346 protected CacheDescriptor getL2CacheConfig() { 347 return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL()); 348 } 349 350 protected class TransientCacheConfig extends CacheDescriptor { 351 352 TransientCacheConfig(String name, int ttl) { 353 super(); 354 super.name = name; 355 super.implClass = getCacheImplClass(); 356 super.ttl = ttl; 357 } 358 } 359 360 protected Class<? extends Cache> getCacheImplClass() { 361 return InMemoryCacheImpl.class; 362 } 363 364 /** 365 * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't 366 * exist. 367 */ 368 protected StorageEntry getStorageEntry(String key) { 369 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 370 if (entry == null) { 371 entry = (StorageEntry) getL2Cache().get(key); 372 } 373 return entry; 374 } 375 376 /** 377 * Stores the given {@code entry} with the given {@code key}. 378 * <p> 379 * If an entry exists with the given {@code key} it is overwritten. 380 */ 381 protected void putStorageEntry(String key, StorageEntry entry) { 382 getL1Cache().put(key, entry); 383 } 384 385}