001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.ecm.core.api.Blob; 032import org.nuxeo.ecm.core.cache.Cache; 033import org.nuxeo.ecm.core.cache.CacheDescriptor; 034import org.nuxeo.ecm.core.cache.CacheService; 035import org.nuxeo.ecm.core.cache.CacheServiceImpl; 036import org.nuxeo.ecm.core.cache.InMemoryCacheImpl; 037import org.nuxeo.ecm.core.transientstore.api.TransientStore; 038import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 039import org.nuxeo.runtime.api.Framework; 040 041/** 042 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a 043 * representation of an entry in the store. 044 * 045 * @since 7.2 046 */ 047public class SimpleTransientStore extends AbstractTransientStore { 048 049 protected Log log = LogFactory.getLog(SimpleTransientStore.class); 050 051 protected Cache l1Cache; 052 053 protected Cache l2Cache; 054 055 protected CacheDescriptor l1cd; 056 057 protected CacheDescriptor l2cd; 058 059 protected AtomicLong storageSize = new AtomicLong(0); 060 061 public SimpleTransientStore() { 062 } 063 064 @Override 065 public void init(TransientStoreConfig config) { 066 log.debug("Initializing SimpleTransientStore: " + config.getName()); 067 super.init(config); 068 CacheService cs = Framework.getService(CacheService.class); 069 if (cs == null) { 070 throw new UnsupportedOperationException("Cache service is required"); 071 } 072 // register the caches 073 l1cd = getL1CacheConfig(); 074 l2cd = getL2CacheConfig(); 075 ((CacheServiceImpl) cs).registerCache(l1cd); 076 ((CacheServiceImpl) cs).registerCache(l2cd); 077 l1cd.start(); 078 l2cd.start(); 079 080 // get caches 081 l1Cache = cs.getCache(l1cd.name); 082 l2Cache = cs.getCache(l2cd.name); 083 } 084 085 @Override 086 public void shutdown() { 087 log.debug("Shutting down SimpleTransientStore: " + config.getName()); 088 CacheService cs = Framework.getService(CacheService.class); 089 if (cs != null) { 090 if (l1cd != null) { 091 ((CacheServiceImpl) cs).unregisterCache(l1cd); 092 } 093 if (l2cd != null) { 094 ((CacheServiceImpl) cs).unregisterCache(l2cd); 095 } 096 } 097 } 098 099 @Override 100 public boolean exists(String key) { 101 return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key); 102 } 103 104 @Override 105 public void putParameter(String key, String parameter, Serializable value) { 106 synchronized (this) { 107 StorageEntry entry = getStorageEntry(key); 108 if (entry == null) { 109 entry = new StorageEntry(); 110 } 111 entry.putParam(parameter, value); 112 if (log.isDebugEnabled()) { 113 log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter, 114 value, key)); 115 } 116 putStorageEntry(key, entry); 117 } 118 } 119 120 @Override 121 public Serializable getParameter(String key, String parameter) { 122 StorageEntry entry = getStorageEntry(key); 123 if (entry == null) { 124 return null; 125 } 126 Serializable res = entry.getParam(parameter); 127 if (log.isDebugEnabled()) { 128 log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res)); 129 } 130 return res; 131 } 132 133 @Override 134 public void putParameters(String key, Map<String, Serializable> parameters) { 135 synchronized (this) { 136 StorageEntry entry = getStorageEntry(key); 137 if (entry == null) { 138 entry = new StorageEntry(); 139 } 140 entry.putParams(parameters); 141 if (log.isDebugEnabled()) { 142 log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key)); 143 } 144 putStorageEntry(key, entry); 145 } 146 } 147 148 @Override 149 public Map<String, Serializable> getParameters(String key) { 150 StorageEntry entry = getStorageEntry(key); 151 if (entry == null) { 152 return null; 153 } 154 Map<String, Serializable> res = entry.getParams(); 155 if (log.isDebugEnabled()) { 156 log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res)); 157 } 158 return res; 159 } 160 161 @Override 162 public List<Blob> getBlobs(String key) { 163 StorageEntry entry = getStorageEntry(key); 164 if (entry == null) { 165 return null; 166 } 167 // Get blob information from the store 168 List<Map<String, String>> blobInfos = entry.getBlobInfos(); 169 if (blobInfos == null) { 170 return new ArrayList<>(); 171 } 172 // Load blobs from the file system 173 return loadBlobs(blobInfos); 174 } 175 176 @Override 177 public long getSize(String key) { 178 StorageEntry entry = getStorageEntry(key); 179 if (entry == null) { 180 return -1; 181 } 182 long size = entry.getSize(); 183 if (log.isDebugEnabled()) { 184 log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size)); 185 } 186 return size; 187 } 188 189 @Override 190 public boolean isCompleted(String key) { 191 StorageEntry entry = getStorageEntry(key); 192 boolean completed = entry != null && entry.isCompleted(); 193 if (log.isDebugEnabled()) { 194 log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key, 195 completed)); 196 } 197 return completed; 198 } 199 200 @Override 201 public void setCompleted(String key, boolean completed) { 202 synchronized (this) { 203 StorageEntry entry = getStorageEntry(key); 204 if (entry == null) { 205 entry = new StorageEntry(); 206 } 207 entry.setCompleted(completed); 208 if (log.isDebugEnabled()) { 209 log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s", 210 completed, key)); 211 } 212 putStorageEntry(key, entry); 213 } 214 } 215 216 @Override 217 public void remove(String key) { 218 synchronized (this) { 219 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 220 if (entry == null) { 221 entry = (StorageEntry) getL2Cache().get(key); 222 if (log.isDebugEnabled()) { 223 log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key)); 224 } 225 getL2Cache().invalidate(key); 226 } else { 227 if (log.isDebugEnabled()) { 228 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 229 } 230 getL1Cache().invalidate(key); 231 } 232 if (entry != null) { 233 long entrySize = entry.getSize(); 234 if (entrySize > 0) { 235 decrementStorageSize(entrySize); 236 } 237 } 238 } 239 } 240 241 @Override 242 public void release(String key) { 243 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 244 if (entry != null) { 245 if (log.isDebugEnabled()) { 246 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 247 } 248 getL1Cache().invalidate(key); 249 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 250 if (log.isDebugEnabled()) { 251 log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key)); 252 } 253 getL2Cache().put(key, entry); 254 } 255 } 256 } 257 258 @Override 259 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 260 synchronized (this) { 261 StorageEntry entry = getStorageEntry(key); 262 // Update storage size 263 if (entry == null) { 264 if (sizeOfBlobs > 0) { 265 incrementStorageSize(sizeOfBlobs); 266 } 267 entry = new StorageEntry(); 268 } else { 269 incrementStorageSize(sizeOfBlobs - entry.getSize()); 270 } 271 // Update entry size 272 entry.setSize(sizeOfBlobs); 273 // Set blob information 274 entry.setBlobInfos(blobInfos); 275 if (log.isDebugEnabled()) { 276 log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key)); 277 } 278 putStorageEntry(key, entry); 279 } 280 } 281 282 @Override 283 public long getStorageSize() { 284 int intStorageSize = (int) storageSize.get(); 285 if (log.isDebugEnabled()) { 286 log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize)); 287 } 288 return intStorageSize; 289 } 290 291 @Override 292 protected void setStorageSize(long newSize) { 293 if (log.isDebugEnabled()) { 294 log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize)); 295 } 296 storageSize.set(newSize); 297 } 298 299 @Override 300 protected long incrementStorageSize(long size) { 301 long incremented = storageSize.addAndGet(size); 302 if (log.isDebugEnabled()) { 303 log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented)); 304 } 305 return incremented; 306 } 307 308 @Override 309 protected long decrementStorageSize(long size) { 310 long decremented = storageSize.addAndGet(-size); 311 if (log.isDebugEnabled()) { 312 log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented)); 313 } 314 return decremented; 315 } 316 317 @Override 318 protected void removeAllEntries() { 319 log.debug("Invalidating all entries from L1 and L2 caches"); 320 getL1Cache().invalidateAll(); 321 getL2Cache().invalidateAll(); 322 } 323 324 public Cache getL1Cache() { 325 return l1Cache; 326 } 327 328 public Cache getL2Cache() { 329 return l2Cache; 330 } 331 332 protected CacheDescriptor getL1CacheConfig() { 333 return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL()); 334 } 335 336 protected CacheDescriptor getL2CacheConfig() { 337 return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL()); 338 } 339 340 protected class TransientCacheConfig extends CacheDescriptor { 341 342 TransientCacheConfig(String name, int ttl) { 343 super(); 344 super.name = name; 345 super.implClass = getCacheImplClass(); 346 super.ttl = ttl; 347 } 348 } 349 350 protected Class<? extends Cache> getCacheImplClass() { 351 return InMemoryCacheImpl.class; 352 } 353 354 /** 355 * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't 356 * exist. 357 */ 358 protected StorageEntry getStorageEntry(String key) { 359 StorageEntry entry = (StorageEntry) getL1Cache().get(key); 360 if (entry == null) { 361 entry = (StorageEntry) getL2Cache().get(key); 362 } 363 return entry; 364 } 365 366 /** 367 * Stores the given {@code entry} with the given {@code key}. 368 * <p> 369 * If an entry exists with the given {@code key} it is overwritten. 370 */ 371 protected void putStorageEntry(String key, StorageEntry entry) { 372 getL1Cache().put(key, entry); 373 } 374 375}