001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import java.util.stream.Stream;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.core.api.Blob;
036import org.nuxeo.ecm.core.transientstore.api.TransientStore;
037import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
038
039import com.google.common.cache.Cache;
040import com.google.common.cache.CacheBuilder;
041
042/**
043 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a
044 * representation of an entry in the store.
045 *
046 * @since 7.2
047 */
048public class SimpleTransientStore extends AbstractTransientStore {
049
050    protected Log log = LogFactory.getLog(SimpleTransientStore.class);
051
052    protected Cache<String, Serializable> l1Cache;
053
054    protected Cache<String, Serializable> l2Cache;
055
056    protected AtomicLong storageSize = new AtomicLong(0);
057
058    public SimpleTransientStore() {
059    }
060
061    @Override
062    public void init(TransientStoreConfig config) {
063        log.debug("Initializing SimpleTransientStore: " + config.getName());
064        super.init(config);
065        l1Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getFirstLevelTTL(), TimeUnit.MINUTES).build();
066        l2Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getSecondLevelTTL(), TimeUnit.MINUTES).build();
067    }
068
069    @Override
070    public void shutdown() {
071        log.debug("Shutting down SimpleTransientStore: " + config.getName());
072    }
073
074    @Override
075    public boolean exists(String key) {
076        return getL1Cache().getIfPresent(key) != null || getL2Cache().getIfPresent(key) != null;
077    }
078
079    @Override
080    public Set<String> keySet() {
081        Set<String> keys = new HashSet<>();
082        keys.addAll(getL1Cache().asMap().keySet());
083        keys.addAll(getL2Cache().asMap().keySet());
084        return keys;
085    }
086
087    @Override
088    public Stream<String> keyStream() {
089        return keySet().stream();
090    }
091
092    @Override
093    public void putParameter(String key, String parameter, Serializable value) {
094        synchronized (this) {
095            StorageEntry entry = getStorageEntry(key);
096            if (entry == null) {
097                entry = new StorageEntry();
098            }
099            entry.putParam(parameter, value);
100            if (log.isDebugEnabled()) {
101                log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter,
102                        value, key));
103            }
104            putStorageEntry(key, entry);
105        }
106    }
107
108    @Override
109    public Serializable getParameter(String key, String parameter) {
110        StorageEntry entry = getStorageEntry(key);
111        if (entry == null) {
112            return null;
113        }
114        Serializable res = entry.getParam(parameter);
115        if (log.isDebugEnabled()) {
116            log.debug(
117                    String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res));
118        }
119        return res;
120    }
121
122    @Override
123    public void putParameters(String key, Map<String, Serializable> parameters) {
124        synchronized (this) {
125            StorageEntry entry = getStorageEntry(key);
126            if (entry == null) {
127                entry = new StorageEntry();
128            }
129            entry.putParams(parameters);
130            if (log.isDebugEnabled()) {
131                log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key));
132            }
133            putStorageEntry(key, entry);
134        }
135    }
136
137    @Override
138    public Map<String, Serializable> getParameters(String key) {
139        StorageEntry entry = getStorageEntry(key);
140        if (entry == null) {
141            return null;
142        }
143        Map<String, Serializable> res = entry.getParams();
144        if (log.isDebugEnabled()) {
145            log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res));
146        }
147        return res;
148    }
149
150    @Override
151    public List<Blob> getBlobs(String key) {
152        StorageEntry entry = getStorageEntry(key);
153        if (entry == null) {
154            return null;
155        }
156        // Get blob information from the store
157        List<Map<String, String>> blobInfos = entry.getBlobInfos();
158        if (blobInfos == null) {
159            return new ArrayList<>();
160        }
161        // Load blobs from the file system
162        return loadBlobs(blobInfos);
163    }
164
165    @Override
166    public long getSize(String key) {
167        StorageEntry entry = getStorageEntry(key);
168        if (entry == null) {
169            return -1;
170        }
171        long size = entry.getSize();
172        if (log.isDebugEnabled()) {
173            log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size));
174        }
175        return size;
176    }
177
178    @Override
179    public boolean isCompleted(String key) {
180        StorageEntry entry = getStorageEntry(key);
181        boolean completed = entry != null && entry.isCompleted();
182        if (log.isDebugEnabled()) {
183            log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key,
184                    completed));
185        }
186        return completed;
187    }
188
189    @Override
190    public void setCompleted(String key, boolean completed) {
191        synchronized (this) {
192            StorageEntry entry = getStorageEntry(key);
193            if (entry == null) {
194                entry = new StorageEntry();
195            }
196            entry.setCompleted(completed);
197            if (log.isDebugEnabled()) {
198                log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s",
199                        completed, key));
200            }
201            putStorageEntry(key, entry);
202        }
203    }
204
205    @Override
206    public void release(String key) {
207        StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key);
208        if (entry != null) {
209            if (log.isDebugEnabled()) {
210                log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
211            }
212            getL1Cache().invalidate(key);
213            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
214                if (log.isDebugEnabled()) {
215                    log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key));
216                }
217                getL2Cache().put(key, entry);
218            }
219        }
220    }
221
222    @Override
223    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
224        synchronized (this) {
225            StorageEntry entry = getStorageEntry(key);
226            // Update storage size
227            if (entry == null) {
228                if (sizeOfBlobs > 0) {
229                    incrementStorageSize(sizeOfBlobs);
230                }
231                entry = new StorageEntry();
232            } else {
233                incrementStorageSize(sizeOfBlobs - entry.getSize());
234            }
235            // Update entry size
236            entry.setSize(sizeOfBlobs);
237            // Set blob information
238            entry.setBlobInfos(blobInfos);
239            if (log.isDebugEnabled()) {
240                log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key));
241            }
242            putStorageEntry(key, entry);
243        }
244    }
245
246    @Override
247    public long getStorageSize() {
248        long size = storageSize.get();
249        if (log.isDebugEnabled()) {
250            log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), size));
251        }
252        return size;
253    }
254
255    @Override
256    protected void setStorageSize(long newSize) {
257        if (log.isDebugEnabled()) {
258            log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize));
259        }
260        storageSize.set(newSize);
261    }
262
263    @Override
264    protected long incrementStorageSize(long size) {
265        long incremented = storageSize.addAndGet(size);
266        if (log.isDebugEnabled()) {
267            log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented));
268        }
269        return incremented;
270    }
271
272    @Override
273    protected long decrementStorageSize(long size) {
274        long decremented = storageSize.addAndGet(-size);
275        if (log.isDebugEnabled()) {
276            log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented));
277        }
278        return decremented;
279    }
280
281    @Override
282    protected void removeEntry(String key) {
283        synchronized (this) {
284            StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key);
285            if (entry == null) {
286                entry = (StorageEntry) getL2Cache().getIfPresent(key);
287                if (log.isDebugEnabled()) {
288                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key));
289                }
290                getL2Cache().invalidate(key);
291            } else {
292                if (log.isDebugEnabled()) {
293                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
294                }
295                getL1Cache().invalidate(key);
296            }
297            if (entry != null) {
298                long entrySize = entry.getSize();
299                if (entrySize > 0) {
300                    decrementStorageSize(entrySize);
301                }
302            }
303        }
304    }
305
306    @Override
307    protected void removeAllEntries() {
308        log.debug("Invalidating all entries from L1 and L2 caches");
309        getL1Cache().invalidateAll();
310        getL2Cache().invalidateAll();
311    }
312
313    public Cache<String, Serializable> getL1Cache() {
314        return l1Cache;
315    }
316
317    public Cache<String, Serializable> getL2Cache() {
318        return l2Cache;
319    }
320
321    /**
322     * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't
323     * exist.
324     */
325    protected StorageEntry getStorageEntry(String key) {
326        StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key);
327        if (entry == null) {
328            entry = (StorageEntry) getL2Cache().getIfPresent(key);
329        }
330        return entry;
331    }
332
333    /**
334     * Stores the given {@code entry} with the given {@code key}.
335     * <p>
336     * If an entry exists with the given {@code key} it is overwritten.
337     */
338    protected void putStorageEntry(String key, StorageEntry entry) {
339        getL1Cache().put(key, entry);
340    }
341
342}