001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.transientstore;
022
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.stream.Stream;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.api.Blob;
037import org.nuxeo.ecm.core.transientstore.api.TransientStore;
038import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
039
040import com.google.common.cache.Cache;
041import com.google.common.cache.CacheBuilder;
042
043/**
044 * Default implementation (i.e., not cluster aware) of the {@link TransientStore}. Uses {@link StorageEntry} as a
045 * representation of an entry in the store.
046 *
047 * @since 7.2
048 */
049public class SimpleTransientStore extends AbstractTransientStore {
050
051    protected Log log = LogFactory.getLog(SimpleTransientStore.class);
052
053    protected Cache<String, Serializable> l1Cache;
054
055    protected Cache<String, Serializable> l2Cache;
056
057    protected AtomicLong storageSize = new AtomicLong(0);
058
059    public SimpleTransientStore() {
060    }
061
062    @Override
063    public void init(TransientStoreConfig config) {
064        log.debug("Initializing SimpleTransientStore: " + config.getName());
065        super.init(config);
066        l1Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getFirstLevelTTL(), TimeUnit.MINUTES).build();
067        l2Cache = CacheBuilder.newBuilder().expireAfterWrite(config.getSecondLevelTTL(), TimeUnit.MINUTES).build();
068    }
069
070    @Override
071    public void shutdown() {
072        log.debug("Shutting down SimpleTransientStore: " + config.getName());
073    }
074
075    @Override
076    public boolean exists(String key) {
077        return getL1Cache().getIfPresent(key) != null || getL2Cache().getIfPresent(key) != null;
078    }
079
080    @Override
081    public Set<String> keySet() {
082        Set<String> keys = new HashSet<>();
083        keys.addAll(getL1Cache().asMap().keySet());
084        keys.addAll(getL2Cache().asMap().keySet());
085        return keys;
086    }
087
088    @Override
089    public Stream<String> keyStream() {
090        return keySet().stream();
091    }
092
093    @Override
094    public void putParameter(String key, String parameter, Serializable value) {
095        synchronized (this) {
096            StorageEntry entry = getStorageEntry(key);
097            if (entry == null) {
098                entry = new StorageEntry();
099            }
100            entry.putParam(parameter, value);
101            if (log.isDebugEnabled()) {
102                log.debug(String.format("Setting parameter %s to value %s in StorageEntry stored at key %s", parameter,
103                        value, key));
104            }
105            putStorageEntry(key, entry);
106        }
107    }
108
109    @Override
110    public Serializable getParameter(String key, String parameter) {
111        StorageEntry entry = getStorageEntry(key);
112        if (entry == null) {
113            return null;
114        }
115        Serializable res = entry.getParam(parameter);
116        if (log.isDebugEnabled()) {
117            log.debug(
118                    String.format("Fetched parameter %s from StorageEntry stored at key %s: %s", parameter, key, res));
119        }
120        return res;
121    }
122
123    @Override
124    public void putParameters(String key, Map<String, Serializable> parameters) {
125        synchronized (this) {
126            StorageEntry entry = getStorageEntry(key);
127            if (entry == null) {
128                entry = new StorageEntry();
129            }
130            entry.putParams(parameters);
131            if (log.isDebugEnabled()) {
132                log.debug(String.format("Setting parameters %s in StorageEntry stored at key %s", parameters, key));
133            }
134            putStorageEntry(key, entry);
135        }
136    }
137
138    @Override
139    public Map<String, Serializable> getParameters(String key) {
140        StorageEntry entry = getStorageEntry(key);
141        if (entry == null) {
142            return null;
143        }
144        Map<String, Serializable> res = new HashMap<>(entry.getParams());
145        if (log.isDebugEnabled()) {
146            log.debug(String.format("Fetched parameters from StorageEntry stored at key %s: %s", key, res));
147        }
148        return res;
149    }
150
151    @Override
152    public List<Blob> getBlobs(String key) {
153        StorageEntry entry = getStorageEntry(key);
154        if (entry == null) {
155            return null;
156        }
157        // Get blob information from the store
158        List<Map<String, String>> blobInfos = entry.getBlobInfos();
159        if (blobInfos == null) {
160            return new ArrayList<>();
161        }
162        // Load blobs from the file system
163        return loadBlobs(blobInfos);
164    }
165
166    @Override
167    public long getSize(String key) {
168        StorageEntry entry = getStorageEntry(key);
169        if (entry == null) {
170            return -1;
171        }
172        long size = entry.getSize();
173        if (log.isDebugEnabled()) {
174            log.debug(String.format("Fetched field \"size\" from StorageEntry stored at key %s: %d", key, size));
175        }
176        return size;
177    }
178
179    @Override
180    public boolean isCompleted(String key) {
181        StorageEntry entry = getStorageEntry(key);
182        boolean completed = entry != null && entry.isCompleted();
183        if (log.isDebugEnabled()) {
184            log.debug(String.format("Fetched field \"completed\" from StorageEntry stored at key %s: %s", key,
185                    completed));
186        }
187        return completed;
188    }
189
190    @Override
191    public void setCompleted(String key, boolean completed) {
192        synchronized (this) {
193            StorageEntry entry = getStorageEntry(key);
194            if (entry == null) {
195                entry = new StorageEntry();
196            }
197            entry.setCompleted(completed);
198            if (log.isDebugEnabled()) {
199                log.debug(String.format("Setting field \"completed\" to value %s in StorageEntry stored at key %s",
200                        completed, key));
201            }
202            putStorageEntry(key, entry);
203        }
204    }
205
206    @Override
207    public void release(String key) {
208        StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key);
209        if (entry != null) {
210            if (log.isDebugEnabled()) {
211                log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
212            }
213            getL1Cache().invalidate(key);
214            if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
215                if (log.isDebugEnabled()) {
216                    log.debug(String.format("Putting StorageEntry at key %s in L2 cache", key));
217                }
218                getL2Cache().put(key, entry);
219            }
220        }
221    }
222
223    @Override
224    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
225        synchronized (this) {
226            StorageEntry entry = getStorageEntry(key);
227            // Update storage size
228            if (entry == null) {
229                if (sizeOfBlobs > 0) {
230                    incrementStorageSize(sizeOfBlobs);
231                }
232                entry = new StorageEntry();
233            } else {
234                incrementStorageSize(sizeOfBlobs - entry.getSize());
235            }
236            // Update entry size
237            entry.setSize(sizeOfBlobs);
238            // Set blob information
239            entry.setBlobInfos(blobInfos);
240            if (log.isDebugEnabled()) {
241                log.debug(String.format("Setting blobs %s in StorageEntry stored at key %s", blobInfos, key));
242            }
243            putStorageEntry(key, entry);
244        }
245    }
246
247    @Override
248    public long getStorageSize() {
249        long size = storageSize.get();
250        if (log.isDebugEnabled()) {
251            log.debug(String.format("Fetched storage size of store %s: %d", config.getName(), size));
252        }
253        return size;
254    }
255
256    @Override
257    protected void setStorageSize(long newSize) {
258        if (log.isDebugEnabled()) {
259            log.debug(String.format("Setting storage size of store %s to %d", config.getName(), newSize));
260        }
261        storageSize.set(newSize);
262    }
263
264    @Override
265    protected long incrementStorageSize(long size) {
266        long incremented = storageSize.addAndGet(size);
267        if (log.isDebugEnabled()) {
268            log.debug(String.format("Incremented storage size of store %s to %s", config.getName(), incremented));
269        }
270        return incremented;
271    }
272
273    @Override
274    protected long decrementStorageSize(long size) {
275        long decremented = storageSize.addAndGet(-size);
276        if (log.isDebugEnabled()) {
277            log.debug(String.format("Decremented storage size of store %s to %s", config.getName(), decremented));
278        }
279        return decremented;
280    }
281
282    @Override
283    protected void removeEntry(String key) {
284        synchronized (this) {
285            StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key);
286            if (entry == null) {
287                entry = (StorageEntry) getL2Cache().getIfPresent(key);
288                if (log.isDebugEnabled()) {
289                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L2 cache", key));
290                }
291                getL2Cache().invalidate(key);
292            } else {
293                if (log.isDebugEnabled()) {
294                    log.debug(String.format("Invalidating StorageEntry stored at key %s form L1 cache", key));
295                }
296                getL1Cache().invalidate(key);
297            }
298            if (entry != null) {
299                long entrySize = entry.getSize();
300                if (entrySize > 0) {
301                    decrementStorageSize(entrySize);
302                }
303            }
304        }
305    }
306
307    @Override
308    protected void removeAllEntries() {
309        log.debug("Invalidating all entries from L1 and L2 caches");
310        getL1Cache().invalidateAll();
311        getL2Cache().invalidateAll();
312    }
313
314    public Cache<String, Serializable> getL1Cache() {
315        return l1Cache;
316    }
317
318    public Cache<String, Serializable> getL2Cache() {
319        return l2Cache;
320    }
321
322    /**
323     * Returns the {@link StorageEntry} representing the entry with the given {@code key} or {@code null} if it doesn't
324     * exist.
325     */
326    protected StorageEntry getStorageEntry(String key) {
327        StorageEntry entry = (StorageEntry) getL1Cache().getIfPresent(key);
328        if (entry == null) {
329            entry = (StorageEntry) getL2Cache().getIfPresent(key);
330        }
331        return entry;
332    }
333
334    /**
335     * Stores the given {@code entry} with the given {@code key}.
336     * <p>
337     * If an entry exists with the given {@code key} it is overwritten.
338     */
339    protected void putStorageEntry(String key, StorageEntry entry) {
340        getL1Cache().put(key, entry);
341    }
342
343}