001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.stream.Stream; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.api.Blob; 037import org.nuxeo.ecm.core.transientstore.api.TransientStore; 038import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 039 040import com.google.common.cache.Cache; 041import com.google.common.cache.CacheBuilder; 042 043/** 044 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a 045 * representation of an entry in the store. 046 * 047 * @since 7.2 048 */ 049public class SimpleTransientStore extends AbstractTransientStore { 050 051 protected Log log = LogFactory.getLog(SimpleTransientStore.class); 052 053 protected Cache<String, Serializable> l1Cache; 054 055 protected Cache<String, Serializable> l2Cache; 056 057 protected AtomicLong storageSize = new AtomicLong(0); 058 059 public SimpleTransientStore() { 060 } 061 062 @Override 063 public void init(TransientStoreConfig config) { 064 log.debug("Initializing SimpleTransientStore: " + config.getName()); 065 super.init(config); 066 l1Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getFirstLevelTTL(), TimeUnit.MINUTES).build(); 067 l2Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getSecondLevelTTL(), TimeUnit.MINUTES).build(); 068 } 069 070 @Override 071 public void shutdown() { 072 log.debug("Shutting down SimpleTransientStore: " + config.getName()); 073 } 074 075 @Override 076 public boolean exists(String key) { 077 return getL1Cache().getIfPresent(key) != null || getL2Cache().getIfPresent(key) != null; 078 } 079 080 @Override 081 public Set<String> keySet() { 082 Set<String> keys = new HashSet<>(); 083 keys.addAll(getL1Cache().asMap().keySet()); 084 keys.addAll(getL2Cache().asMap().keySet()); 085 return keys; 086 } 087 088 @Override 089 public Stream<String> keyStream() { 090 return keySet().stream(); 091 } 092 093 @Override 094 public void putParameter(String key, String parameter, Serializable value) { 095 synchronized (this) { 096 StorageEntry entry = getStorageEntry(key); 097 if (entry == null) { 098 entry = new StorageEntry(); 099 } 100 entry.putParam(parameter, value); 101 if (log.isDebugEnabled()) { 102 log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter, 103 value, key)); 104 } 105 putStorageEntry(key, entry); 106 } 107 } 108 109 @Override 110 public Serializable getParameter(String key, String parameter) { 111 StorageEntry entry = getStorageEntry(key); 112 if (entry == null) { 113 return null; 114 } 115 Serializable res = entry.getParam(parameter); 116 if (log.isDebugEnabled()) { 117 log.debug( 118 String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res)); 119 } 120 return res; 121 } 122 123 @Override 124 public void putParameters(String key, Map<String, Serializable> parameters) { 125 synchronized (this) { 126 StorageEntry entry = getStorageEntry(key); 127 if (entry == null) { 128 entry = new StorageEntry(); 129 } 130 entry.putParams(parameters); 131 if (log.isDebugEnabled()) { 132 log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key)); 133 } 134 putStorageEntry(key, entry); 135 } 136 } 137 138 @Override 139 public Map<String, Serializable> getParameters(String key) { 140 StorageEntry entry = getStorageEntry(key); 141 if (entry == null) { 142 return null; 143 } 144 Map<String, Serializable> res = new HashMap<>(entry.getParams()); 145 if (log.isDebugEnabled()) { 146 log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res)); 147 } 148 return res; 149 } 150 151 @Override 152 public List<Blob> getBlobs(String key) { 153 StorageEntry entry = getStorageEntry(key); 154 if (entry == null) { 155 return null; 156 } 157 // Get blob information from the store 158 List<Map<String, String>> blobInfos = entry.getBlobInfos(); 159 if (blobInfos == null) { 160 return new ArrayList<>(); 161 } 162 // Load blobs from the file system 163 return loadBlobs(blobInfos); 164 } 165 166 @Override 167 public long getSize(String key) { 168 StorageEntry entry = getStorageEntry(key); 169 if (entry == null) { 170 return -1; 171 } 172 long size = entry.getSize(); 173 if (log.isDebugEnabled()) { 174 log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size)); 175 } 176 return size; 177 } 178 179 @Override 180 public boolean isCompleted(String key) { 181 StorageEntry entry = getStorageEntry(key); 182 boolean completed = entry != null && entry.isCompleted(); 183 if (log.isDebugEnabled()) { 184 log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key, 185 completed)); 186 } 187 return completed; 188 } 189 190 @Override 191 public void setCompleted(String key, boolean completed) { 192 synchronized (this) { 193 StorageEntry entry = getStorageEntry(key); 194 if (entry == null) { 195 entry = new StorageEntry(); 196 } 197 entry.setCompleted(completed); 198 if (log.isDebugEnabled()) { 199 log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s", 200 completed, key)); 201 } 202 putStorageEntry(key, entry); 203 } 204 } 205 206 @Override 207 public void release(String key) { 208 StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key); 209 if (entry != null) { 210 if (log.isDebugEnabled()) { 211 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 212 } 213 getL1Cache().invalidate(key); 214 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 215 if (log.isDebugEnabled()) { 216 log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key)); 217 } 218 getL2Cache().put(key, entry); 219 } 220 } 221 } 222 223 @Override 224 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 225 synchronized (this) { 226 StorageEntry entry = getStorageEntry(key); 227 // Update storage size 228 if (entry == null) { 229 if (sizeOfBlobs > 0) { 230 incrementStorageSize(sizeOfBlobs); 231 } 232 entry = new StorageEntry(); 233 } else { 234 incrementStorageSize(sizeOfBlobs - entry.getSize()); 235 } 236 // Update entry size 237 entry.setSize(sizeOfBlobs); 238 // Set blob information 239 entry.setBlobInfos(blobInfos); 240 if (log.isDebugEnabled()) { 241 log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key)); 242 } 243 putStorageEntry(key, entry); 244 } 245 } 246 247 @Override 248 public long getStorageSize() { 249 long size = storageSize.get(); 250 if (log.isDebugEnabled()) { 251 log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), size)); 252 } 253 return size; 254 } 255 256 @Override 257 protected void setStorageSize(long newSize) { 258 if (log.isDebugEnabled()) { 259 log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize)); 260 } 261 storageSize.set(newSize); 262 } 263 264 @Override 265 protected long incrementStorageSize(long size) { 266 long incremented = storageSize.addAndGet(size); 267 if (log.isDebugEnabled()) { 268 log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented)); 269 } 270 return incremented; 271 } 272 273 @Override 274 protected long decrementStorageSize(long size) { 275 long decremented = storageSize.addAndGet(-size); 276 if (log.isDebugEnabled()) { 277 log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented)); 278 } 279 return decremented; 280 } 281 282 @Override 283 protected void removeEntry(String key) { 284 synchronized (this) { 285 StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key); 286 if (entry == null) { 287 entry = (StorageEntry) getL2Cache().getIfPresent(key); 288 if (log.isDebugEnabled()) { 289 log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key)); 290 } 291 getL2Cache().invalidate(key); 292 } else { 293 if (log.isDebugEnabled()) { 294 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 295 } 296 getL1Cache().invalidate(key); 297 } 298 if (entry != null) { 299 long entrySize = entry.getSize(); 300 if (entrySize > 0) { 301 decrementStorageSize(entrySize); 302 } 303 } 304 } 305 } 306 307 @Override 308 protected void removeAllEntries() { 309 log.debug("Invalidating all entries from L1 and L2 caches"); 310 getL1Cache().invalidateAll(); 311 getL2Cache().invalidateAll(); 312 } 313 314 public Cache<String, Serializable> getL1Cache() { 315 return l1Cache; 316 } 317 318 public Cache<String, Serializable> getL2Cache() { 319 return l2Cache; 320 } 321 322 /** 323 * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't 324 * exist. 325 */ 326 protected StorageEntry getStorageEntry(String key) { 327 StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key); 328 if (entry == null) { 329 entry = (StorageEntry) getL2Cache().getIfPresent(key); 330 } 331 return entry; 332 } 333 334 /** 335 * Stores the given {@code entry} with the given {@code key}. 336 * <p> 337 * If an entry exists with the given {@code key} it is overwritten. 338 */ 339 protected void putStorageEntry(String key, StorageEntry entry) { 340 getL1Cache().put(key, entry); 341 } 342 343}