001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.atomic.AtomicLong; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.api.Blob; 034import org.nuxeo.ecm.core.cache.Cache; 035import org.nuxeo.ecm.core.cache.CacheDescriptor; 036import org.nuxeo.ecm.core.cache.CacheService; 037import org.nuxeo.ecm.core.cache.CacheServiceImpl; 038import org.nuxeo.ecm.core.cache.InMemoryCacheImpl; 039import org.nuxeo.ecm.core.transientstore.api.TransientStore; 040import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 041import org.nuxeo.runtime.api.Framework; 042 043/** 044 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a 045 * representation of an entry in the store. 046 * 047 * @since 7.2 048 */ 049public class SimpleTransientStore extends AbstractTransientStore { 050 051 protected Log log = LogFactory.getLog(SimpleTransientStore.class); 052 053 protected Cache l1Cache; 054 055 protected Cache l2Cache; 056 057 protected CacheDescriptor l1cd; 058 059 protected CacheDescriptor l2cd; 060 061 protected AtomicLong storageSize = new AtomicLong(0); 062 063 public SimpleTransientStore() { 064 } 065 066 @Override 067 public void init(TransientStoreConfig config) { 068 log.debug("Initializing SimpleTransientStore: " + config.getName()); 069 super.init(config); 070 CacheService cs = Framework.getService(CacheService.class); 071 if (cs == null) { 072 throw new UnsupportedOperationException("Cache service is required"); 073 } 074 // register the caches 075 l1cd = getL1CacheConfig(); 076 l2cd = getL2CacheConfig(); 077 ((CacheServiceImpl) cs).registerCache(l1cd); 078 ((CacheServiceImpl) cs).registerCache(l2cd); 079 l1cd.start(); 080 l2cd.start(); 081 082 // get caches 083 l1Cache = cs.getCache(l1cd.name); 084 l2Cache = cs.getCache(l2cd.name); 085 } 086 087 @Override 088 public void shutdown() { 089 log.debug("Shutting down SimpleTransientStore: " + config.getName()); 090 CacheService cs = Framework.getService(CacheService.class); 091 if (cs != null) { 092 if (l1cd != null) { 093 ((CacheServiceImpl) cs).unregisterCache(l1cd); 094 } 095 if (l2cd != null) { 096 ((CacheServiceImpl) cs).unregisterCache(l2cd); 097 } 098 } 099 } 100 101 @Override 102 public boolean exists(String key) { 103 return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key); 104 } 105 106 @Override 107 public Set<String> keySet() { 108 Set<String> keys = new HashSet<>(); 109 keys.addAll(getL1Cache().keySet()); 110 keys.addAll(getL2Cache().keySet()); 111 return keys; 112 } 113 114 @Override 115 public void putParameter(String key, String parameter, Serializable value) { 116 synchronized (this) { 117 StorageEntry entry = getStorageEntry(key); 118 if (entry == null) { 119 entry = new StorageEntry(); 120 } 121 entry.putParam(parameter, value); 122 if (log.isDebugEnabled()) { 123 log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter, 124 value, key)); 125 } 126 putStorageEntry(key, entry); 127 } 128 } 129 130 @Override 131 public Serializable getParameter(String key, String parameter) { 132 StorageEntry entry = getStorageEntry(key); 133 if (entry == null) { 134 return null; 135 } 136 Serializable res = entry.getParam(parameter); 137 if (log.isDebugEnabled()) { 138 log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res)); 139 } 140 return res; 141 } 142 143 @Override 144 public void putParameters(String key, Map<String, Serializable> parameters) { 145 synchronized (this) { 146 StorageEntry entry = getStorageEntry(key); 147 if (entry == null) { 148 entry = new StorageEntry(); 149 } 150 entry.putParams(parameters); 151 if (log.isDebugEnabled()) { 152 log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key)); 153 } 154 putStorageEntry(key, entry); 155 } 156 } 157 158 @Override 159 public Map<String, Serializable> getParameters(String key) { 160 StorageEntry entry = getStorageEntry(key); 161 if (entry == null) { 162 return null; 163 } 164 Map<String, Serializable> res = entry.getParams(); 165 if (log.isDebugEnabled()) { 166 log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res)); 167 } 168 return res; 169 } 170 171 @Override 172 public List<Blob> getBlobs(String key) { 173 StorageEntry entry = getStorageEntry(key); 174 if (entry == null) { 175 return null; 176 } 177 // Get blob information from the store 178 List<Map<String, String>> blobInfos = entry.getBlobInfos(); 179 if (blobInfos == null) { 180 return new ArrayList<>(); 181 } 182 // Load blobs from the file system 183 return loadBlobs(blobInfos); 184 } 185 186 @Override 187 public long getSize(String key) { 188 StorageEntry entry = getStorageEntry(key); 189 if (entry == null) { 190 return -1; 191 } 192 long size = entry.getSize(); 193 if (log.isDebugEnabled()) { 194 log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size)); 195 } 196 return size; 197 } 198 199 @Override 200 public boolean isCompleted(String key) { 201 StorageEntry entry = getStorageEntry(key); 202 boolean completed = entry != null && entry.isCompleted(); 203 if (log.isDebugEnabled()) { 204 log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key, 205 completed)); 206 } 207 return completed; 208 } 209 210 @Override 211 public void setCompleted(String key, boolean completed) { 212 synchronized (this) { 213 StorageEntry entry = getStorageEntry(key); 214 if (entry == null) { 215 entry = new StorageEntry(); 216 } 217 entry.setCompleted(completed); 218 if (log.isDebugEnabled()) { 219 log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s", 220 completed, key)); 221 } 222 putStorageEntry(key, entry); 223 } 224 } 225 226 @Override 227 public void remove(String key) { 228 synchronized (this) { 229 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 230 if (entry == null) { 231 entry = (StorageEntry) getL2Cache().get(key); 232 if (log.isDebugEnabled()) { 233 log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key)); 234 } 235 getL2Cache().invalidate(key); 236 } else { 237 if (log.isDebugEnabled()) { 238 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 239 } 240 getL1Cache().invalidate(key); 241 } 242 if (entry != null) { 243 long entrySize = entry.getSize(); 244 if (entrySize > 0) { 245 decrementStorageSize(entrySize); 246 } 247 } 248 } 249 } 250 251 @Override 252 public void release(String key) { 253 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 254 if (entry != null) { 255 if (log.isDebugEnabled()) { 256 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 257 } 258 getL1Cache().invalidate(key); 259 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 260 if (log.isDebugEnabled()) { 261 log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key)); 262 } 263 getL2Cache().put(key, entry); 264 } 265 } 266 } 267 268 @Override 269 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 270 synchronized (this) { 271 StorageEntry entry = getStorageEntry(key); 272 // Update storage size 273 if (entry == null) { 274 if (sizeOfBlobs > 0) { 275 incrementStorageSize(sizeOfBlobs); 276 } 277 entry = new StorageEntry(); 278 } else { 279 incrementStorageSize(sizeOfBlobs - entry.getSize()); 280 } 281 // Update entry size 282 entry.setSize(sizeOfBlobs); 283 // Set blob information 284 entry.setBlobInfos(blobInfos); 285 if (log.isDebugEnabled()) { 286 log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key)); 287 } 288 putStorageEntry(key, entry); 289 } 290 } 291 292 @Override 293 public long getStorageSize() { 294 int intStorageSize = (int) storageSize.get(); 295 if (log.isDebugEnabled()) { 296 log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize)); 297 } 298 return intStorageSize; 299 } 300 301 @Override 302 protected void setStorageSize(long newSize) { 303 if (log.isDebugEnabled()) { 304 log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize)); 305 } 306 storageSize.set(newSize); 307 } 308 309 @Override 310 protected long incrementStorageSize(long size) { 311 long incremented = storageSize.addAndGet(size); 312 if (log.isDebugEnabled()) { 313 log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented)); 314 } 315 return incremented; 316 } 317 318 @Override 319 protected long decrementStorageSize(long size) { 320 long decremented = storageSize.addAndGet(-size); 321 if (log.isDebugEnabled()) { 322 log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented)); 323 } 324 return decremented; 325 } 326 327 @Override 328 protected void removeAllEntries() { 329 log.debug("Invalidating all entries from L1 and L2 caches"); 330 getL1Cache().invalidateAll(); 331 getL2Cache().invalidateAll(); 332 } 333 334 public Cache getL1Cache() { 335 return l1Cache; 336 } 337 338 public Cache getL2Cache() { 339 return l2Cache; 340 } 341 342 protected CacheDescriptor getL1CacheConfig() { 343 return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL()); 344 } 345 346 protected CacheDescriptor getL2CacheConfig() { 347 return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL()); 348 } 349 350 protected class TransientCacheConfig extends CacheDescriptor { 351 352 TransientCacheConfig(String name, int ttl) { 353 super(); 354 super.name = name; 355 super.implClass = getCacheImplClass(); 356 super.ttl = ttl; 357 } 358 } 359 360 protected Class<? extends Cache> getCacheImplClass() { 361 return InMemoryCacheImpl.class; 362 } 363 364 /** 365 * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't 366 * exist. 367 */ 368 protected StorageEntry getStorageEntry(String key) { 369 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 370 if (entry == null) { 371 entry = (StorageEntry) getL2Cache().get(key); 372 } 373 return entry; 374 } 375 376 /** 377 * Stores the given {@code entry} with the given {@code key}. 378 * <p> 379 * If an entry exists with the given {@code key} it is overwritten. 380 */ 381 protected void putStorageEntry(String key, StorageEntry entry) { 382 getL1Cache().put(key, entry); 383 } 384 385}