001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Thierry Delprat <tdelprat@nuxeo.com>
016 *     Antoine Taillefer <ataillefer@nuxeo.com>
017 */
018
019package org.nuxeo.ecm.core.transientstore;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.core.api.Blob;
030import org.nuxeo.ecm.core.cache.Cache;
031import org.nuxeo.ecm.core.cache.CacheDescriptor;
032import org.nuxeo.ecm.core.cache.CacheService;
033import org.nuxeo.ecm.core.cache.CacheServiceImpl;
034import org.nuxeo.ecm.core.cache.InMemoryCacheImpl;
035import org.nuxeo.ecm.core.transientstore.api.TransientStore;
036import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
037import org.nuxeo.runtime.api.Framework;
038
039/**
040 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a
041 * representation of an entry in the store.
042 *
043 * @since 7.2
044 */
045public class SimpleTransientStore extends AbstractTransientStore {
046
047    protected Log log = LogFactory.getLog(SimpleTransientStore.class);
048
049    protected Cache l1Cache;
050
051    protected Cache l2Cache;
052
053    protected CacheDescriptor l1cd;
054
055    protected CacheDescriptor l2cd;
056
057    protected AtomicLong storageSize = new AtomicLong(0);
058
059    public SimpleTransientStore() {
060    }
061
062    @Override
063    public void init(TransientStoreConfig config) {
064        log.debug("Initializing SimpleTransientStore: " + config.getName());
065        super.init(config);
066        CacheService cs = Framework.getService(CacheService.class);
067        if (cs == null) {
068            throw new UnsupportedOperationException("Cache service is required");
069        }
070        // register the caches
071        l1cd = getL1CacheConfig();
072        l2cd = getL2CacheConfig();
073        ((CacheServiceImpl) cs).registerCache(l1cd);
074        ((CacheServiceImpl) cs).registerCache(l2cd);
075        l1cd.start();
076        l2cd.start();
077
078        // get caches
079        l1Cache = cs.getCache(l1cd.name);
080        l2Cache = cs.getCache(l2cd.name);
081    }
082
083    @Override
084    public void shutdown() {
085        log.debug("Shutting down SimpleTransientStore: " + config.getName());
086        CacheService cs = Framework.getService(CacheService.class);
087        if (cs != null) {
088            if (l1cd != null) {
089                ((CacheServiceImpl) cs).unregisterCache(l1cd);
090            }
091            if (l2cd != null) {
092                ((CacheServiceImpl) cs).unregisterCache(l2cd);
093            }
094        }
095    }
096
097    @Override
098    public boolean exists(String key) {
099        return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key);
100    }
101
102    @Override
103    public void putParameter(String key, String parameter, Serializable value) {
104        synchronized (this) {
105            StorageEntry entry = getStorageEntry(key);
106            if (entry == null) {
107                entry = new StorageEntry();
108            }
109            entry.putParam(parameter, value);
110            if (log.isDebugEnabled()) {
111                log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter,
112                        value, key));
113            }
114            putStorageEntry(key, entry);
115        }
116    }
117
118    @Override
119    public Serializable getParameter(String key, String parameter) {
120        StorageEntry entry = getStorageEntry(key);
121        if (entry == null) {
122            return null;
123        }
124        Serializable res = entry.getParam(parameter);
125        if (log.isDebugEnabled()) {
126            log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res));
127        }
128        return res;
129    }
130
131    @Override
132    public void putParameters(String key, Map<String, Serializable> parameters) {
133        synchronized (this) {
134            StorageEntry entry = getStorageEntry(key);
135            if (entry == null) {
136                entry = new StorageEntry();
137            }
138            entry.putParams(parameters);
139            if (log.isDebugEnabled()) {
140                log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key));
141            }
142            putStorageEntry(key, entry);
143        }
144    }
145
146    @Override
147    public Map<String, Serializable> getParameters(String key) {
148        StorageEntry entry = getStorageEntry(key);
149        if (entry == null) {
150            return null;
151        }
152        Map<String, Serializable> res = entry.getParams();
153        if (log.isDebugEnabled()) {
154            log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res));
155        }
156        return res;
157    }
158
159    @Override
160    public List<Blob> getBlobs(String key) {
161        StorageEntry entry = getStorageEntry(key);
162        if (entry == null) {
163            return null;
164        }
165        // Get blob information from the store
166        List<Map<String, String>> blobInfos = entry.getBlobInfos();
167        if (blobInfos == null) {
168            return new ArrayList<>();
169        }
170        // Load blobs from the file system
171        return loadBlobs(blobInfos);
172    }
173
174    @Override
175    public long getSize(String key) {
176        StorageEntry entry = getStorageEntry(key);
177        if (entry == null) {
178            return -1;
179        }
180        long size = entry.getSize();
181        if (log.isDebugEnabled()) {
182            log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size));
183        }
184        return size;
185    }
186
187    @Override
188    public boolean isCompleted(String key) {
189        StorageEntry entry = getStorageEntry(key);
190        boolean completed = entry != null && entry.isCompleted();
191        if (log.isDebugEnabled()) {
192            log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key,
193                    completed));
194        }
195        return completed;
196    }
197
198    @Override
199    public void setCompleted(String key, boolean completed) {
200        synchronized (this) {
201            StorageEntry entry = getStorageEntry(key);
202            if (entry == null) {
203                entry = new StorageEntry();
204            }
205            entry.setCompleted(completed);
206            if (log.isDebugEnabled()) {
207                log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s",
208                        completed, key));
209            }
210            putStorageEntry(key, entry);
211        }
212    }
213
214    @Override
215    public void remove(String key) {
216        synchronized (this) {
217            StorageEntry entry = (StorageEntry) getL1Cache().get(key);
218            if (entry == null) {
219                entry = (StorageEntry) getL2Cache().get(key);
220                if (log.isDebugEnabled()) {
221                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key));
222                }
223                getL2Cache().invalidate(key);
224            } else {
225                if (log.isDebugEnabled()) {
226                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
227                }
228                getL1Cache().invalidate(key);
229            }
230            if (entry != null) {
231                long entrySize = entry.getSize();
232                if (entrySize > 0) {
233                    decrementStorageSize(entrySize);
234                }
235            }
236        }
237    }
238
239    @Override
240    public void release(String key) {
241        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
242        if (entry != null) {
243            if (log.isDebugEnabled()) {
244                log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
245            }
246            getL1Cache().invalidate(key);
247            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
248                if (log.isDebugEnabled()) {
249                    log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key));
250                }
251                getL2Cache().put(key, entry);
252            }
253        }
254    }
255
256    @Override
257    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
258        synchronized (this) {
259            StorageEntry entry = getStorageEntry(key);
260            // Update storage size
261            if (entry == null) {
262                if (sizeOfBlobs > 0) {
263                    incrementStorageSize(sizeOfBlobs);
264                }
265                entry = new StorageEntry();
266            } else {
267                incrementStorageSize(sizeOfBlobs - entry.getSize());
268            }
269            // Update entry size
270            entry.setSize(sizeOfBlobs);
271            // Set blob information
272            entry.setBlobInfos(blobInfos);
273            if (log.isDebugEnabled()) {
274                log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key));
275            }
276            putStorageEntry(key, entry);
277        }
278    }
279
280    @Override
281    public long getStorageSize() {
282        int intStorageSize = (int) storageSize.get();
283        if (log.isDebugEnabled()) {
284            log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize));
285        }
286        return intStorageSize;
287    }
288
289    @Override
290    protected void setStorageSize(long newSize) {
291        if (log.isDebugEnabled()) {
292            log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize));
293        }
294        storageSize.set(newSize);
295    }
296
297    @Override
298    protected long incrementStorageSize(long size) {
299        long incremented = storageSize.addAndGet(size);
300        if (log.isDebugEnabled()) {
301            log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented));
302        }
303        return incremented;
304    }
305
306    @Override
307    protected long decrementStorageSize(long size) {
308        long decremented = storageSize.addAndGet(-size);
309        if (log.isDebugEnabled()) {
310            log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented));
311        }
312        return decremented;
313    }
314
315    @Override
316    protected void removeAllEntries() {
317        log.debug("Invalidating all entries from L1 and L2 caches");
318        getL1Cache().invalidateAll();
319        getL2Cache().invalidateAll();
320    }
321
322    public Cache getL1Cache() {
323        return l1Cache;
324    }
325
326    public Cache getL2Cache() {
327        return l2Cache;
328    }
329
330    protected CacheDescriptor getL1CacheConfig() {
331        return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL());
332    }
333
334    protected CacheDescriptor getL2CacheConfig() {
335        return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL());
336    }
337
338    protected class TransientCacheConfig extends CacheDescriptor {
339
340        TransientCacheConfig(String name, int ttl) {
341            super();
342            super.name = name;
343            super.implClass = getCacheImplClass();
344            super.ttl = ttl;
345        }
346    }
347
348    protected Class<? extends Cache> getCacheImplClass() {
349        return InMemoryCacheImpl.class;
350    }
351
352    /**
353     * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't
354     * exist.
355     */
356    protected StorageEntry getStorageEntry(String key) {
357        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
358        if (entry == null) {
359            entry = (StorageEntry) getL2Cache().get(key);
360        }
361        return entry;
362    }
363
364    /**
365     * Stores the given {@code entry} with the given {@code key}.
366     * <p>
367     * If an entry exists with the given {@code key} it is overwritten.
368     */
369    protected void putStorageEntry(String key, StorageEntry entry) {
370        getL1Cache().put(key, entry);
371    }
372
373}