001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.transientstore; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031import java.util.stream.Stream; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.core.api.Blob; 036import org.nuxeo.ecm.core.transientstore.api.TransientStore; 037import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 038 039import com.google.common.cache.Cache; 040import com.google.common.cache.CacheBuilder; 041 042/** 043 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a 044 * representation of an entry in the store. 045 * 046 * @since 7.2 047 */ 048public class SimpleTransientStore extends AbstractTransientStore { 049 050 protected Log log = LogFactory.getLog(SimpleTransientStore.class); 051 052 protected Cache<String, Serializable> l1Cache; 053 054 protected Cache<String, Serializable> l2Cache; 055 056 protected AtomicLong storageSize = new AtomicLong(0); 057 058 public SimpleTransientStore() { 059 } 060 061 @Override 062 public void init(TransientStoreConfig config) { 063 log.debug("Initializing SimpleTransientStore: " + config.getName()); 064 super.init(config); 065 l1Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getFirstLevelTTL(), TimeUnit.MINUTES).build(); 066 l2Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getSecondLevelTTL(), TimeUnit.MINUTES).build(); 067 } 068 069 @Override 070 public void shutdown() { 071 log.debug("Shutting down SimpleTransientStore: " + config.getName()); 072 } 073 074 @Override 075 public boolean exists(String key) { 076 return getL1Cache().getIfPresent(key) != null || getL2Cache().getIfPresent(key) != null; 077 } 078 079 @Override 080 public Set<String> keySet() { 081 Set<String> keys = new HashSet<>(); 082 keys.addAll(getL1Cache().asMap().keySet()); 083 keys.addAll(getL2Cache().asMap().keySet()); 084 return keys; 085 } 086 087 @Override 088 public Stream<String> keyStream() { 089 return keySet().stream(); 090 } 091 092 @Override 093 public void putParameter(String key, String parameter, Serializable value) { 094 synchronized (this) { 095 StorageEntry entry = getStorageEntry(key); 096 if (entry == null) { 097 entry = new StorageEntry(); 098 } 099 entry.putParam(parameter, value); 100 if (log.isDebugEnabled()) { 101 log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter, 102 value, key)); 103 } 104 putStorageEntry(key, entry); 105 } 106 } 107 108 @Override 109 public Serializable getParameter(String key, String parameter) { 110 StorageEntry entry = getStorageEntry(key); 111 if (entry == null) { 112 return null; 113 } 114 Serializable res = entry.getParam(parameter); 115 if (log.isDebugEnabled()) { 116 log.debug( 117 String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res)); 118 } 119 return res; 120 } 121 122 @Override 123 public void putParameters(String key, Map<String, Serializable> parameters) { 124 synchronized (this) { 125 StorageEntry entry = getStorageEntry(key); 126 if (entry == null) { 127 entry = new StorageEntry(); 128 } 129 entry.putParams(parameters); 130 if (log.isDebugEnabled()) { 131 log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key)); 132 } 133 putStorageEntry(key, entry); 134 } 135 } 136 137 @Override 138 public Map<String, Serializable> getParameters(String key) { 139 StorageEntry entry = getStorageEntry(key); 140 if (entry == null) { 141 return null; 142 } 143 Map<String, Serializable> res = entry.getParams(); 144 if (log.isDebugEnabled()) { 145 log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res)); 146 } 147 return res; 148 } 149 150 @Override 151 public List<Blob> getBlobs(String key) { 152 StorageEntry entry = getStorageEntry(key); 153 if (entry == null) { 154 return null; 155 } 156 // Get blob information from the store 157 List<Map<String, String>> blobInfos = entry.getBlobInfos(); 158 if (blobInfos == null) { 159 return new ArrayList<>(); 160 } 161 // Load blobs from the file system 162 return loadBlobs(blobInfos); 163 } 164 165 @Override 166 public long getSize(String key) { 167 StorageEntry entry = getStorageEntry(key); 168 if (entry == null) { 169 return -1; 170 } 171 long size = entry.getSize(); 172 if (log.isDebugEnabled()) { 173 log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size)); 174 } 175 return size; 176 } 177 178 @Override 179 public boolean isCompleted(String key) { 180 StorageEntry entry = getStorageEntry(key); 181 boolean completed = entry != null && entry.isCompleted(); 182 if (log.isDebugEnabled()) { 183 log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key, 184 completed)); 185 } 186 return completed; 187 } 188 189 @Override 190 public void setCompleted(String key, boolean completed) { 191 synchronized (this) { 192 StorageEntry entry = getStorageEntry(key); 193 if (entry == null) { 194 entry = new StorageEntry(); 195 } 196 entry.setCompleted(completed); 197 if (log.isDebugEnabled()) { 198 log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s", 199 completed, key)); 200 } 201 putStorageEntry(key, entry); 202 } 203 } 204 205 @Override 206 public void release(String key) { 207 StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key); 208 if (entry != null) { 209 if (log.isDebugEnabled()) { 210 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 211 } 212 getL1Cache().invalidate(key); 213 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 214 if (log.isDebugEnabled()) { 215 log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key)); 216 } 217 getL2Cache().put(key, entry); 218 } 219 } 220 } 221 222 @Override 223 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 224 synchronized (this) { 225 StorageEntry entry = getStorageEntry(key); 226 // Update storage size 227 if (entry == null) { 228 if (sizeOfBlobs > 0) { 229 incrementStorageSize(sizeOfBlobs); 230 } 231 entry = new StorageEntry(); 232 } else { 233 incrementStorageSize(sizeOfBlobs - entry.getSize()); 234 } 235 // Update entry size 236 entry.setSize(sizeOfBlobs); 237 // Set blob information 238 entry.setBlobInfos(blobInfos); 239 if (log.isDebugEnabled()) { 240 log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key)); 241 } 242 putStorageEntry(key, entry); 243 } 244 } 245 246 @Override 247 public long getStorageSize() { 248 long size = storageSize.get(); 249 if (log.isDebugEnabled()) { 250 log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), size)); 251 } 252 return size; 253 } 254 255 @Override 256 protected void setStorageSize(long newSize) { 257 if (log.isDebugEnabled()) { 258 log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize)); 259 } 260 storageSize.set(newSize); 261 } 262 263 @Override 264 protected long incrementStorageSize(long size) { 265 long incremented = storageSize.addAndGet(size); 266 if (log.isDebugEnabled()) { 267 log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented)); 268 } 269 return incremented; 270 } 271 272 @Override 273 protected long decrementStorageSize(long size) { 274 long decremented = storageSize.addAndGet(-size); 275 if (log.isDebugEnabled()) { 276 log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented)); 277 } 278 return decremented; 279 } 280 281 @Override 282 protected void removeEntry(String key) { 283 synchronized (this) { 284 StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key); 285 if (entry == null) { 286 entry = (StorageEntry) getL2Cache().getIfPresent(key); 287 if (log.isDebugEnabled()) { 288 log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key)); 289 } 290 getL2Cache().invalidate(key); 291 } else { 292 if (log.isDebugEnabled()) { 293 log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key)); 294 } 295 getL1Cache().invalidate(key); 296 } 297 if (entry != null) { 298 long entrySize = entry.getSize(); 299 if (entrySize > 0) { 300 decrementStorageSize(entrySize); 301 } 302 } 303 } 304 } 305 306 @Override 307 protected void removeAllEntries() { 308 log.debug("Invalidating all entries from L1 and L2 caches"); 309 getL1Cache().invalidateAll(); 310 getL2Cache().invalidateAll(); 311 } 312 313 public Cache<String, Serializable> getL1Cache() { 314 return l1Cache; 315 } 316 317 public Cache<String, Serializable> getL2Cache() { 318 return l2Cache; 319 } 320 321 /** 322 * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't 323 * exist. 324 */ 325 protected StorageEntry getStorageEntry(String key) { 326 StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key); 327 if (entry == null) { 328 entry = (StorageEntry) getL2Cache().getIfPresent(key); 329 } 330 return entry; 331 } 332 333 /** 334 * Stores the given {@code entry} with the given {@code key}. 335 * <p> 336 * If an entry exists with the given {@code key} it is overwritten. 337 */ 338 protected void putStorageEntry(String key, StorageEntry entry) { 339 getL1Cache().put(key, entry); 340 } 341 342}