001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.api.Blob;
034import org.nuxeo.ecm.core.cache.Cache;
035import org.nuxeo.ecm.core.cache.CacheDescriptor;
036import org.nuxeo.ecm.core.cache.CacheService;
037import org.nuxeo.ecm.core.cache.CacheServiceImpl;
038import org.nuxeo.ecm.core.cache.InMemoryCacheImpl;
039import org.nuxeo.ecm.core.transientstore.api.TransientStore;
040import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
041import org.nuxeo.runtime.api.Framework;
042
043/**
044 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a
045 * representation of an entry in the store.
046 *
047 * @since 7.2
048 */
049public class SimpleTransientStore extends AbstractTransientStore {
050
051    protected Log log = LogFactory.getLog(SimpleTransientStore.class);
052
053    protected Cache l1Cache;
054
055    protected Cache l2Cache;
056
057    protected CacheDescriptor l1cd;
058
059    protected CacheDescriptor l2cd;
060
061    protected AtomicLong storageSize = new AtomicLong(0);
062
063    public SimpleTransientStore() {
064    }
065
066    @Override
067    public void init(TransientStoreConfig config) {
068        log.debug("Initializing SimpleTransientStore: " + config.getName());
069        super.init(config);
070        CacheService cs = Framework.getService(CacheService.class);
071        if (cs == null) {
072            throw new UnsupportedOperationException("Cache service is required");
073        }
074        // register the caches
075        l1cd = getL1CacheConfig();
076        l2cd = getL2CacheConfig();
077        ((CacheServiceImpl) cs).registerCache(l1cd);
078        ((CacheServiceImpl) cs).registerCache(l2cd);
079        l1cd.start();
080        l2cd.start();
081
082        // get caches
083        l1Cache = cs.getCache(l1cd.name);
084        l2Cache = cs.getCache(l2cd.name);
085    }
086
087    @Override
088    public void shutdown() {
089        log.debug("Shutting down SimpleTransientStore: " + config.getName());
090        CacheService cs = Framework.getService(CacheService.class);
091        if (cs != null) {
092            if (l1cd != null) {
093                ((CacheServiceImpl) cs).unregisterCache(l1cd);
094            }
095            if (l2cd != null) {
096                ((CacheServiceImpl) cs).unregisterCache(l2cd);
097            }
098        }
099    }
100
101    @Override
102    public boolean exists(String key) {
103        return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key);
104    }
105
106    @Override
107    public Set<String> keySet() {
108        Set<String> keys = new HashSet<>();
109        keys.addAll(getL1Cache().keySet());
110        keys.addAll(getL2Cache().keySet());
111        return keys;
112    }
113
114    @Override
115    public void putParameter(String key, String parameter, Serializable value) {
116        synchronized (this) {
117            StorageEntry entry = getStorageEntry(key);
118            if (entry == null) {
119                entry = new StorageEntry();
120            }
121            entry.putParam(parameter, value);
122            if (log.isDebugEnabled()) {
123                log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter,
124                        value, key));
125            }
126            putStorageEntry(key, entry);
127        }
128    }
129
130    @Override
131    public Serializable getParameter(String key, String parameter) {
132        StorageEntry entry = getStorageEntry(key);
133        if (entry == null) {
134            return null;
135        }
136        Serializable res = entry.getParam(parameter);
137        if (log.isDebugEnabled()) {
138            log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res));
139        }
140        return res;
141    }
142
143    @Override
144    public void putParameters(String key, Map<String, Serializable> parameters) {
145        synchronized (this) {
146            StorageEntry entry = getStorageEntry(key);
147            if (entry == null) {
148                entry = new StorageEntry();
149            }
150            entry.putParams(parameters);
151            if (log.isDebugEnabled()) {
152                log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key));
153            }
154            putStorageEntry(key, entry);
155        }
156    }
157
158    @Override
159    public Map<String, Serializable> getParameters(String key) {
160        StorageEntry entry = getStorageEntry(key);
161        if (entry == null) {
162            return null;
163        }
164        Map<String, Serializable> res = entry.getParams();
165        if (log.isDebugEnabled()) {
166            log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res));
167        }
168        return res;
169    }
170
171    @Override
172    public List<Blob> getBlobs(String key) {
173        StorageEntry entry = getStorageEntry(key);
174        if (entry == null) {
175            return null;
176        }
177        // Get blob information from the store
178        List<Map<String, String>> blobInfos = entry.getBlobInfos();
179        if (blobInfos == null) {
180            return new ArrayList<>();
181        }
182        // Load blobs from the file system
183        return loadBlobs(blobInfos);
184    }
185
186    @Override
187    public long getSize(String key) {
188        StorageEntry entry = getStorageEntry(key);
189        if (entry == null) {
190            return -1;
191        }
192        long size = entry.getSize();
193        if (log.isDebugEnabled()) {
194            log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size));
195        }
196        return size;
197    }
198
199    @Override
200    public boolean isCompleted(String key) {
201        StorageEntry entry = getStorageEntry(key);
202        boolean completed = entry != null && entry.isCompleted();
203        if (log.isDebugEnabled()) {
204            log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key,
205                    completed));
206        }
207        return completed;
208    }
209
210    @Override
211    public void setCompleted(String key, boolean completed) {
212        synchronized (this) {
213            StorageEntry entry = getStorageEntry(key);
214            if (entry == null) {
215                entry = new StorageEntry();
216            }
217            entry.setCompleted(completed);
218            if (log.isDebugEnabled()) {
219                log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s",
220                        completed, key));
221            }
222            putStorageEntry(key, entry);
223        }
224    }
225
226    @Override
227    public void remove(String key) {
228        synchronized (this) {
229            StorageEntry entry = (StorageEntry) getL1Cache().get(key);
230            if (entry == null) {
231                entry = (StorageEntry) getL2Cache().get(key);
232                if (log.isDebugEnabled()) {
233                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key));
234                }
235                getL2Cache().invalidate(key);
236            } else {
237                if (log.isDebugEnabled()) {
238                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
239                }
240                getL1Cache().invalidate(key);
241            }
242            if (entry != null) {
243                long entrySize = entry.getSize();
244                if (entrySize > 0) {
245                    decrementStorageSize(entrySize);
246                }
247            }
248        }
249    }
250
251    @Override
252    public void release(String key) {
253        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
254        if (entry != null) {
255            if (log.isDebugEnabled()) {
256                log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
257            }
258            getL1Cache().invalidate(key);
259            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
260                if (log.isDebugEnabled()) {
261                    log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key));
262                }
263                getL2Cache().put(key, entry);
264            }
265        }
266    }
267
268    @Override
269    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
270        synchronized (this) {
271            StorageEntry entry = getStorageEntry(key);
272            // Update storage size
273            if (entry == null) {
274                if (sizeOfBlobs > 0) {
275                    incrementStorageSize(sizeOfBlobs);
276                }
277                entry = new StorageEntry();
278            } else {
279                incrementStorageSize(sizeOfBlobs - entry.getSize());
280            }
281            // Update entry size
282            entry.setSize(sizeOfBlobs);
283            // Set blob information
284            entry.setBlobInfos(blobInfos);
285            if (log.isDebugEnabled()) {
286                log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key));
287            }
288            putStorageEntry(key, entry);
289        }
290    }
291
292    @Override
293    public long getStorageSize() {
294        int intStorageSize = (int) storageSize.get();
295        if (log.isDebugEnabled()) {
296            log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize));
297        }
298        return intStorageSize;
299    }
300
301    @Override
302    protected void setStorageSize(long newSize) {
303        if (log.isDebugEnabled()) {
304            log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize));
305        }
306        storageSize.set(newSize);
307    }
308
309    @Override
310    protected long incrementStorageSize(long size) {
311        long incremented = storageSize.addAndGet(size);
312        if (log.isDebugEnabled()) {
313            log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented));
314        }
315        return incremented;
316    }
317
318    @Override
319    protected long decrementStorageSize(long size) {
320        long decremented = storageSize.addAndGet(-size);
321        if (log.isDebugEnabled()) {
322            log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented));
323        }
324        return decremented;
325    }
326
327    @Override
328    protected void removeAllEntries() {
329        log.debug("Invalidating all entries from L1 and L2 caches");
330        getL1Cache().invalidateAll();
331        getL2Cache().invalidateAll();
332    }
333
334    public Cache getL1Cache() {
335        return l1Cache;
336    }
337
338    public Cache getL2Cache() {
339        return l2Cache;
340    }
341
342    protected CacheDescriptor getL1CacheConfig() {
343        return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL());
344    }
345
346    protected CacheDescriptor getL2CacheConfig() {
347        return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL());
348    }
349
350    protected class TransientCacheConfig extends CacheDescriptor {
351
352        TransientCacheConfig(String name, int ttl) {
353            super();
354            super.name = name;
355            super.implClass = getCacheImplClass();
356            super.ttl = ttl;
357        }
358    }
359
360    protected Class<? extends Cache> getCacheImplClass() {
361        return InMemoryCacheImpl.class;
362    }
363
364    /**
365     * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't
366     * exist.
367     */
368    protected StorageEntry getStorageEntry(String key) {
369        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
370        if (entry == null) {
371            entry = (StorageEntry) getL2Cache().get(key);
372        }
373        return entry;
374    }
375
376    /**
377     * Stores the given {@code entry} with the given {@code key}.
378     * <p>
379     * If an entry exists with the given {@code key} it is overwritten.
380     */
381    protected void putStorageEntry(String key, StorageEntry entry) {
382        getL1Cache().put(key, entry);
383    }
384
385}