001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.commons.io.FileUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.Blob;
035import org.nuxeo.ecm.core.cache.Cache;
036import org.nuxeo.ecm.core.cache.CacheDescriptor;
037import org.nuxeo.ecm.core.cache.CacheService;
038import org.nuxeo.ecm.core.cache.CacheServiceImpl;
039import org.nuxeo.ecm.core.cache.InMemoryCacheImpl;
040import org.nuxeo.ecm.core.transientstore.api.TransientStore;
041import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
042import org.nuxeo.runtime.api.Framework;
043
044/**
045 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a
046 * representation of an entry in the store.
047 *
048 * @since 7.2
049 */
050public class SimpleTransientStore extends AbstractTransientStore {
051
052    protected Log log = LogFactory.getLog(SimpleTransientStore.class);
053
054    protected Cache l1Cache;
055
056    protected Cache l2Cache;
057
058    protected CacheDescriptor l1cd;
059
060    protected CacheDescriptor l2cd;
061
062    protected AtomicLong storageSize = new AtomicLong(0);
063
064    public SimpleTransientStore() {
065    }
066
067    @Override
068    public void init(TransientStoreConfig config) {
069        log.debug("Initializing SimpleTransientStore: " + config.getName());
070        super.init(config);
071        CacheService cs = Framework.getService(CacheService.class);
072        if (cs == null) {
073            throw new UnsupportedOperationException("Cache service is required");
074        }
075        // register the caches
076        l1cd = getL1CacheConfig();
077        l2cd = getL2CacheConfig();
078        ((CacheServiceImpl) cs).registerCache(l1cd);
079        ((CacheServiceImpl) cs).registerCache(l2cd);
080
081        // get caches
082        l1Cache = cs.getCache(l1cd.name);
083        l2Cache = cs.getCache(l2cd.name);
084    }
085
086    @Override
087    public void shutdown() {
088        log.debug("Shutting down SimpleTransientStore: " + config.getName());
089        CacheService cs = Framework.getService(CacheService.class);
090        if (cs != null) {
091            if (l1cd != null) {
092                ((CacheServiceImpl) cs).unregisterCache(l1cd);
093            }
094            if (l2cd != null) {
095                ((CacheServiceImpl) cs).unregisterCache(l2cd);
096            }
097        }
098    }
099
100    @Override
101    public boolean exists(String key) {
102        return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key);
103    }
104
105    @Override
106    public Set<String> keySet() {
107        Set<String> keys = new HashSet<>();
108        keys.addAll(getL1Cache().keySet());
109        keys.addAll(getL2Cache().keySet());
110        return keys;
111    }
112
113    @Override
114    public void putParameter(String key, String parameter, Serializable value) {
115        synchronized (this) {
116            StorageEntry entry = getStorageEntry(key);
117            if (entry == null) {
118                entry = new StorageEntry();
119            }
120            entry.putParam(parameter, value);
121            if (log.isDebugEnabled()) {
122                log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter,
123                        value, key));
124            }
125            putStorageEntry(key, entry);
126        }
127    }
128
129    @Override
130    public Serializable getParameter(String key, String parameter) {
131        StorageEntry entry = getStorageEntry(key);
132        if (entry == null) {
133            return null;
134        }
135        Serializable res = entry.getParam(parameter);
136        if (log.isDebugEnabled()) {
137            log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res));
138        }
139        return res;
140    }
141
142    @Override
143    public void putParameters(String key, Map<String, Serializable> parameters) {
144        synchronized (this) {
145            StorageEntry entry = getStorageEntry(key);
146            if (entry == null) {
147                entry = new StorageEntry();
148            }
149            entry.putParams(parameters);
150            if (log.isDebugEnabled()) {
151                log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key));
152            }
153            putStorageEntry(key, entry);
154        }
155    }
156
157    @Override
158    public Map<String, Serializable> getParameters(String key) {
159        StorageEntry entry = getStorageEntry(key);
160        if (entry == null) {
161            return null;
162        }
163        Map<String, Serializable> res = entry.getParams();
164        if (log.isDebugEnabled()) {
165            log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res));
166        }
167        return res;
168    }
169
170    @Override
171    public List<Blob> getBlobs(String key) {
172        StorageEntry entry = getStorageEntry(key);
173        if (entry == null) {
174            return null;
175        }
176        // Get blob information from the store
177        List<Map<String, String>> blobInfos = entry.getBlobInfos();
178        if (blobInfos == null) {
179            return new ArrayList<>();
180        }
181        // Load blobs from the file system
182        return loadBlobs(blobInfos);
183    }
184
185    @Override
186    public long getSize(String key) {
187        StorageEntry entry = getStorageEntry(key);
188        if (entry == null) {
189            return -1;
190        }
191        long size = entry.getSize();
192        if (log.isDebugEnabled()) {
193            log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size));
194        }
195        return size;
196    }
197
198    @Override
199    public boolean isCompleted(String key) {
200        StorageEntry entry = getStorageEntry(key);
201        boolean completed = entry != null && entry.isCompleted();
202        if (log.isDebugEnabled()) {
203            log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key,
204                    completed));
205        }
206        return completed;
207    }
208
209    @Override
210    public void setCompleted(String key, boolean completed) {
211        synchronized (this) {
212            StorageEntry entry = getStorageEntry(key);
213            if (entry == null) {
214                entry = new StorageEntry();
215            }
216            entry.setCompleted(completed);
217            if (log.isDebugEnabled()) {
218                log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s",
219                        completed, key));
220            }
221            putStorageEntry(key, entry);
222        }
223    }
224
225    @Override
226    public void remove(String key) {
227        synchronized (this) {
228            StorageEntry entry = (StorageEntry) getL1Cache().get(key);
229            if (entry == null) {
230                entry = (StorageEntry) getL2Cache().get(key);
231                if (log.isDebugEnabled()) {
232                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key));
233                }
234                getL2Cache().invalidate(key);
235            } else {
236                if (log.isDebugEnabled()) {
237                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
238                }
239                getL1Cache().invalidate(key);
240            }
241            if (entry != null) {
242                long entrySize = entry.getSize();
243                if (entrySize > 0) {
244                    decrementStorageSize(entrySize);
245                }
246            }
247            FileUtils.deleteQuietly(getCachingDirectory(key));
248        }
249    }
250
251    @Override
252    public void release(String key) {
253        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
254        if (entry != null) {
255            if (log.isDebugEnabled()) {
256                log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
257            }
258            getL1Cache().invalidate(key);
259            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
260                if (log.isDebugEnabled()) {
261                    log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key));
262                }
263                getL2Cache().put(key, entry);
264            }
265        }
266    }
267
268    @Override
269    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
270        synchronized (this) {
271            StorageEntry entry = getStorageEntry(key);
272            // Update storage size
273            if (entry == null) {
274                if (sizeOfBlobs > 0) {
275                    incrementStorageSize(sizeOfBlobs);
276                }
277                entry = new StorageEntry();
278            } else {
279                incrementStorageSize(sizeOfBlobs - entry.getSize());
280            }
281            // Update entry size
282            entry.setSize(sizeOfBlobs);
283            // Set blob information
284            entry.setBlobInfos(blobInfos);
285            if (log.isDebugEnabled()) {
286                log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key));
287            }
288            putStorageEntry(key, entry);
289        }
290    }
291
292    @Override
293    public long getStorageSize() {
294        int intStorageSize = (int) storageSize.get();
295        if (log.isDebugEnabled()) {
296            log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize));
297        }
298        return intStorageSize;
299    }
300
301    @Override
302    protected void setStorageSize(long newSize) {
303        if (log.isDebugEnabled()) {
304            log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize));
305        }
306        storageSize.set(newSize);
307    }
308
309    @Override
310    protected long incrementStorageSize(long size) {
311        long incremented = storageSize.addAndGet(size);
312        if (log.isDebugEnabled()) {
313            log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented));
314        }
315        return incremented;
316    }
317
318    @Override
319    protected long decrementStorageSize(long size) {
320        long decremented = storageSize.addAndGet(-size);
321        if (log.isDebugEnabled()) {
322            log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented));
323        }
324        return decremented;
325    }
326
327    @Override
328    protected void removeAllEntries() {
329        log.debug("Invalidating all entries from L1 and L2 caches");
330        getL1Cache().invalidateAll();
331        getL2Cache().invalidateAll();
332    }
333
334    public Cache getL1Cache() {
335        return l1Cache;
336    }
337
338    public Cache getL2Cache() {
339        return l2Cache;
340    }
341
342    protected CacheDescriptor getL1CacheConfig() {
343        return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL());
344    }
345
346    protected CacheDescriptor getL2CacheConfig() {
347        return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL());
348    }
349
350    protected class TransientCacheConfig extends CacheDescriptor {
351
352        TransientCacheConfig(String name, int ttl) {
353            super();
354            super.name = name;
355            super.implClass = getCacheImplClass();
356            super.ttl = ttl;
357        }
358    }
359
360    protected Class<? extends Cache> getCacheImplClass() {
361        return InMemoryCacheImpl.class;
362    }
363
364    /**
365     * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't
366     * exist.
367     */
368    protected StorageEntry getStorageEntry(String key) {
369        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
370        if (entry == null) {
371            entry = (StorageEntry) getL2Cache().get(key);
372        }
373        return entry;
374    }
375
376    /**
377     * Stores the given {@code entry} with the given {@code key}.
378     * <p>
379     * If an entry exists with the given {@code key} it is overwritten.
380     */
381    protected void putStorageEntry(String key, StorageEntry entry) {
382        getL1Cache().put(key, entry);
383    }
384
385}