001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.api.Blob;
032import org.nuxeo.ecm.core.cache.Cache;
033import org.nuxeo.ecm.core.cache.CacheDescriptor;
034import org.nuxeo.ecm.core.cache.CacheService;
035import org.nuxeo.ecm.core.cache.CacheServiceImpl;
036import org.nuxeo.ecm.core.cache.InMemoryCacheImpl;
037import org.nuxeo.ecm.core.transientstore.api.TransientStore;
038import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
039import org.nuxeo.runtime.api.Framework;
040
041/**
042 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a
043 * representation of an entry in the store.
044 *
045 * @since 7.2
046 */
047public class SimpleTransientStore extends AbstractTransientStore {
048
049    protected Log log = LogFactory.getLog(SimpleTransientStore.class);
050
051    protected Cache l1Cache;
052
053    protected Cache l2Cache;
054
055    protected CacheDescriptor l1cd;
056
057    protected CacheDescriptor l2cd;
058
059    protected AtomicLong storageSize = new AtomicLong(0);
060
061    public SimpleTransientStore() {
062    }
063
064    @Override
065    public void init(TransientStoreConfig config) {
066        log.debug("Initializing SimpleTransientStore: " + config.getName());
067        super.init(config);
068        CacheService cs = Framework.getService(CacheService.class);
069        if (cs == null) {
070            throw new UnsupportedOperationException("Cache service is required");
071        }
072        // register the caches
073        l1cd = getL1CacheConfig();
074        l2cd = getL2CacheConfig();
075        ((CacheServiceImpl) cs).registerCache(l1cd);
076        ((CacheServiceImpl) cs).registerCache(l2cd);
077        l1cd.start();
078        l2cd.start();
079
080        // get caches
081        l1Cache = cs.getCache(l1cd.name);
082        l2Cache = cs.getCache(l2cd.name);
083    }
084
085    @Override
086    public void shutdown() {
087        log.debug("Shutting down SimpleTransientStore: " + config.getName());
088        CacheService cs = Framework.getService(CacheService.class);
089        if (cs != null) {
090            if (l1cd != null) {
091                ((CacheServiceImpl) cs).unregisterCache(l1cd);
092            }
093            if (l2cd != null) {
094                ((CacheServiceImpl) cs).unregisterCache(l2cd);
095            }
096        }
097    }
098
099    @Override
100    public boolean exists(String key) {
101        return getL1Cache().hasEntry(key) || getL2Cache().hasEntry(key);
102    }
103
104    @Override
105    public void putParameter(String key, String parameter, Serializable value) {
106        synchronized (this) {
107            StorageEntry entry = getStorageEntry(key);
108            if (entry == null) {
109                entry = new StorageEntry();
110            }
111            entry.putParam(parameter, value);
112            if (log.isDebugEnabled()) {
113                log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter,
114                        value, key));
115            }
116            putStorageEntry(key, entry);
117        }
118    }
119
120    @Override
121    public Serializable getParameter(String key, String parameter) {
122        StorageEntry entry = getStorageEntry(key);
123        if (entry == null) {
124            return null;
125        }
126        Serializable res = entry.getParam(parameter);
127        if (log.isDebugEnabled()) {
128            log.debug(String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res));
129        }
130        return res;
131    }
132
133    @Override
134    public void putParameters(String key, Map<String, Serializable> parameters) {
135        synchronized (this) {
136            StorageEntry entry = getStorageEntry(key);
137            if (entry == null) {
138                entry = new StorageEntry();
139            }
140            entry.putParams(parameters);
141            if (log.isDebugEnabled()) {
142                log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key));
143            }
144            putStorageEntry(key, entry);
145        }
146    }
147
148    @Override
149    public Map<String, Serializable> getParameters(String key) {
150        StorageEntry entry = getStorageEntry(key);
151        if (entry == null) {
152            return null;
153        }
154        Map<String, Serializable> res = entry.getParams();
155        if (log.isDebugEnabled()) {
156            log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res));
157        }
158        return res;
159    }
160
161    @Override
162    public List<Blob> getBlobs(String key) {
163        StorageEntry entry = getStorageEntry(key);
164        if (entry == null) {
165            return null;
166        }
167        // Get blob information from the store
168        List<Map<String, String>> blobInfos = entry.getBlobInfos();
169        if (blobInfos == null) {
170            return new ArrayList<>();
171        }
172        // Load blobs from the file system
173        return loadBlobs(blobInfos);
174    }
175
176    @Override
177    public long getSize(String key) {
178        StorageEntry entry = getStorageEntry(key);
179        if (entry == null) {
180            return -1;
181        }
182        long size = entry.getSize();
183        if (log.isDebugEnabled()) {
184            log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size));
185        }
186        return size;
187    }
188
189    @Override
190    public boolean isCompleted(String key) {
191        StorageEntry entry = getStorageEntry(key);
192        boolean completed = entry != null && entry.isCompleted();
193        if (log.isDebugEnabled()) {
194            log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key,
195                    completed));
196        }
197        return completed;
198    }
199
200    @Override
201    public void setCompleted(String key, boolean completed) {
202        synchronized (this) {
203            StorageEntry entry = getStorageEntry(key);
204            if (entry == null) {
205                entry = new StorageEntry();
206            }
207            entry.setCompleted(completed);
208            if (log.isDebugEnabled()) {
209                log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s",
210                        completed, key));
211            }
212            putStorageEntry(key, entry);
213        }
214    }
215
216    @Override
217    public void remove(String key) {
218        synchronized (this) {
219            StorageEntry entry = (StorageEntry) getL1Cache().get(key);
220            if (entry == null) {
221                entry = (StorageEntry) getL2Cache().get(key);
222                if (log.isDebugEnabled()) {
223                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key));
224                }
225                getL2Cache().invalidate(key);
226            } else {
227                if (log.isDebugEnabled()) {
228                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
229                }
230                getL1Cache().invalidate(key);
231            }
232            if (entry != null) {
233                long entrySize = entry.getSize();
234                if (entrySize > 0) {
235                    decrementStorageSize(entrySize);
236                }
237            }
238        }
239    }
240
241    @Override
242    public void release(String key) {
243        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
244        if (entry != null) {
245            if (log.isDebugEnabled()) {
246                log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
247            }
248            getL1Cache().invalidate(key);
249            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
250                if (log.isDebugEnabled()) {
251                    log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key));
252                }
253                getL2Cache().put(key, entry);
254            }
255        }
256    }
257
258    @Override
259    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
260        synchronized (this) {
261            StorageEntry entry = getStorageEntry(key);
262            // Update storage size
263            if (entry == null) {
264                if (sizeOfBlobs > 0) {
265                    incrementStorageSize(sizeOfBlobs);
266                }
267                entry = new StorageEntry();
268            } else {
269                incrementStorageSize(sizeOfBlobs - entry.getSize());
270            }
271            // Update entry size
272            entry.setSize(sizeOfBlobs);
273            // Set blob information
274            entry.setBlobInfos(blobInfos);
275            if (log.isDebugEnabled()) {
276                log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key));
277            }
278            putStorageEntry(key, entry);
279        }
280    }
281
282    @Override
283    public long getStorageSize() {
284        int intStorageSize = (int) storageSize.get();
285        if (log.isDebugEnabled()) {
286            log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), intStorageSize));
287        }
288        return intStorageSize;
289    }
290
291    @Override
292    protected void setStorageSize(long newSize) {
293        if (log.isDebugEnabled()) {
294            log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize));
295        }
296        storageSize.set(newSize);
297    }
298
299    @Override
300    protected long incrementStorageSize(long size) {
301        long incremented = storageSize.addAndGet(size);
302        if (log.isDebugEnabled()) {
303            log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented));
304        }
305        return incremented;
306    }
307
308    @Override
309    protected long decrementStorageSize(long size) {
310        long decremented = storageSize.addAndGet(-size);
311        if (log.isDebugEnabled()) {
312            log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented));
313        }
314        return decremented;
315    }
316
317    @Override
318    protected void removeAllEntries() {
319        log.debug("Invalidating all entries from L1 and L2 caches");
320        getL1Cache().invalidateAll();
321        getL2Cache().invalidateAll();
322    }
323
324    public Cache getL1Cache() {
325        return l1Cache;
326    }
327
328    public Cache getL2Cache() {
329        return l2Cache;
330    }
331
332    protected CacheDescriptor getL1CacheConfig() {
333        return new TransientCacheConfig(config.getName() + "L1", config.getFirstLevelTTL());
334    }
335
336    protected CacheDescriptor getL2CacheConfig() {
337        return new TransientCacheConfig(config.getName() + "L2", config.getSecondLevelTTL());
338    }
339
340    protected class TransientCacheConfig extends CacheDescriptor {
341
342        TransientCacheConfig(String name, int ttl) {
343            super();
344            super.name = name;
345            super.implClass = getCacheImplClass();
346            super.ttl = ttl;
347        }
348    }
349
350    protected Class<? extends Cache> getCacheImplClass() {
351        return InMemoryCacheImpl.class;
352    }
353
354    /**
355     * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't
356     * exist.
357     */
358    protected StorageEntry getStorageEntry(String key) {
359        StorageEntry entry = (StorageEntry) getL1Cache().get(key);
360        if (entry == null) {
361            entry = (StorageEntry) getL2Cache().get(key);
362        }
363        return entry;
364    }
365
366    /**
367     * Stores the given {@code entry} with the given {@code key}.
368     * <p>
369     * If an entry exists with the given {@code key} it is overwritten.
370     */
371    protected void putStorageEntry(String key, StorageEntry entry) {
372        getL1Cache().put(key, entry);
373    }
374
375}