001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.transientstore.keyvalueblob;
020
021import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
022
023import java.io.ByteArrayInputStream;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectInputStream;
027import java.io.Serializable;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.function.Function;
035import java.util.stream.Stream;
036
037import org.apache.commons.lang3.SerializationUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.Blob;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.blob.BlobInfo;
043import org.nuxeo.ecm.core.blob.BlobManager;
044import org.nuxeo.ecm.core.blob.BlobManagerComponent;
045import org.nuxeo.ecm.core.blob.BlobProvider;
046import org.nuxeo.ecm.core.blob.ManagedBlob;
047import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
048import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
049import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
050import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider;
051import org.nuxeo.runtime.api.Framework;
052import org.nuxeo.runtime.kv.KeyValueService;
053import org.nuxeo.runtime.kv.KeyValueStore;
054import org.nuxeo.runtime.kv.KeyValueStoreProvider;
055
056import com.fasterxml.jackson.core.type.TypeReference;
057import com.fasterxml.jackson.databind.ObjectMapper;
058
059/**
060 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider.
061 * <p>
062 * The key/value store used is the one with the same name as the transient store itself.
063 * <p>
064 * The blob provider used is the one with the same name as the transient store itself.
065 * <p>
066 * The storage format is the following:
067 *
068 * <pre>
069 *   __blobsize__:       storage size; because entries may expire without us being notified due to their TTL,
070 *                       this may be higher than the actual storage size
071 *
072 *   entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence
073 *
074 *   entryKey.paraminfo: ["foo", "bar"]
075 *
076 *   entryKey.param.foo: value for param foo
077 *   entryKey.param.foo__format: "java" for java serializable format, otherwise string
078 *
079 *   entryKey.param.bar: value for param bar
080 *   etc.
081 *
082 *   entryKey.blobinfo:  {"count": number of blobs,
083 *                        "size": storage size of the blobs}
084 *   entryKey.blob.0:    {"key": key in blob provider for first blob,
085 *                        "mimetype": MIME Type,
086 *                        "encoding": encoding,
087 *                        "filename": filename,
088 *                        "digest": digest}
089 *   entryKey.blob.1:    {...} same for second blob
090 *   etc.
091 * </pre>
092 *
093 * @since 9.3
094 */
095public class KeyValueBlobTransientStore implements TransientStoreProvider {
096
097    private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class);
098
099    public static final String SEP = ".";
100
101    public static final String STORAGE_SIZE = "__blobsize__";
102
103    public static final String DOT_COMPLETED = SEP + "completed";
104
105    public static final String DOT_PARAMINFO = SEP + "paraminfo";
106
107    public static final String DOT_PARAM_DOT = SEP + "param" + SEP;
108
109    public static final String FORMAT = "__format";
110
111    public static final String FORMAT_JAVA = "java";
112
113    public static final String DOT_BLOBINFO = SEP + "blobinfo";
114
115    public static final String COUNT = "count";
116
117    public static final String SIZE = "size";
118
119    public static final String DOT_BLOB_DOT = SEP + "blob" + SEP;
120
121    public static final String KEY = "key";
122
123    public static final String MIMETYPE = "mimetype";
124
125    public static final String ENCODING = "encoding";
126
127    public static final String FILENAME = "filename";
128
129    public static final String LENGTH = "length";
130
131    public static final String DIGEST = "digest";
132
133    public static final String CONFIG_KEY_VALUE_STORE = "keyValueStore";
134
135    public static final String CONFIG_BLOB_PROVIDER = "blobProvider";
136
137    protected String keyValueStoreName;
138
139    protected String blobProviderId;
140
141    /** Basic TTL for all entries. */
142    protected int ttl;
143
144    /** TTL used to keep objects around a bit longer if there's space for them, for caching. */
145    protected int releaseTTL;
146
147    protected long targetMaxSize;
148
149    protected long absoluteMaxSize;
150
151    protected ObjectMapper mapper;
152
153    // ---------- TransientStoreProvider ----------
154
155    @Override
156    public void init(TransientStoreConfig config) {
157        String defaultName = config.getName();
158        if (!defaultName.startsWith(BlobManagerComponent.TRANSIENT_ID_PREFIX)) {
159            defaultName = BlobManagerComponent.TRANSIENT_ID_PREFIX + "_" + defaultName;
160        }
161        Map<String, String> properties = config.getProperties();
162        if (properties == null) {
163            properties = Collections.emptyMap();
164        }
165        keyValueStoreName = defaultIfBlank(properties.get(CONFIG_KEY_VALUE_STORE), defaultName);
166        blobProviderId = defaultIfBlank(properties.get(CONFIG_BLOB_PROVIDER), defaultName);
167        mapper = new ObjectMapper();
168        ttl = config.getFirstLevelTTL() * 60;
169        releaseTTL = config.getSecondLevelTTL() * 60;
170        targetMaxSize = config.getTargetMaxSizeMB() * 1024L * 1024;
171        absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024L * 1024;
172    }
173
174    protected KeyValueStore getKeyValueStore() {
175        return Framework.getService(KeyValueService.class).getKeyValueStore(keyValueStoreName);
176    }
177
178    protected BlobProvider getBlobProvider() {
179        BlobProvider blobProvider = Framework.getService(BlobManager.class)
180                                             .getBlobProviderWithNamespace(blobProviderId);
181        if (blobProvider == null) {
182            throw new NuxeoException("No blob provider with id: " + blobProviderId);
183        }
184        if (!blobProvider.isTransient()) {
185            throw new NuxeoException("Blob provider: " + blobProviderId + " used for Key/Value store: "
186                    + keyValueStoreName + " must be configured as transient");
187        }
188        return blobProvider;
189    }
190
191    @Override
192    public void shutdown() {
193        // nothing to do
194    }
195
196    @Override
197    public Stream<String> keyStream() {
198        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
199        int len = DOT_COMPLETED.length();
200        return kvs.keyStream() //
201                  .filter(key -> key.endsWith(DOT_COMPLETED))
202                  .map(key -> key.substring(0, key.length() - len));
203    }
204
205    @Override
206    public long getStorageSize() {
207        KeyValueStore kvs = getKeyValueStore();
208        String sizeStr = kvs.getString(STORAGE_SIZE);
209        return sizeStr == null ? 0 : Long.parseLong(sizeStr);
210    }
211
212    protected void addStorageSize(long delta) {
213        atomicUpdate(STORAGE_SIZE, size -> {
214            long s = size == null ? 0 : Long.parseLong(size);
215            return String.valueOf(s + delta);
216        }, 0);
217    }
218
219    /**
220     * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY.
221     * <p>
222     * Does not take into account blob de-duplication that may be done by the blob provider.
223     * <p>
224     * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL
225     * expiration or GC not having been done).
226     */
227    protected void computeStorageSize() {
228        KeyValueStore kvs = getKeyValueStore();
229        long size = keyStream().map(this::getBlobs) //
230                               .flatMap(Collection::stream)
231                               .mapToLong(Blob::getLength)
232                               .sum();
233        kvs.put(STORAGE_SIZE, String.valueOf(size));
234    }
235
236    // also recomputes the exact storage size
237    @Override
238    public void doGC() {
239        BlobProvider bp = getBlobProvider();
240        BinaryGarbageCollector gc = bp.getBinaryManager().getGarbageCollector();
241        gc.start();
242        keyStream().map(this::getBlobs) //
243                   .flatMap(Collection::stream)
244                   .map(ManagedBlob.class::cast)
245                   .map(ManagedBlob::getKey)
246                   .forEach(gc::mark);
247        gc.stop(true); // delete
248        computeStorageSize();
249    }
250
251    @Override
252    public void removeAll() {
253        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
254        kvs.clear();
255        doGC();
256    }
257
258    // ---------- TransientStore ----------
259
260    protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() {
261    };
262
263    protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() {
264    };
265
266    protected List<String> jsonToList(String json) {
267        if (json == null) {
268            return null;
269        }
270        try {
271            return mapper.readValue(json, LIST_STRING);
272        } catch (IOException e) {
273            log.error("Invalid JSON array: " + json);
274            return null;
275        }
276    }
277
278    protected Map<String, String> jsonToMap(String json) {
279        if (json == null) {
280            return null;
281        }
282        try {
283            return mapper.readValue(json, MAP_STRING_STRING);
284        } catch (IOException e) {
285            log.error("Invalid JSON object: " + json);
286            return null;
287        }
288    }
289
290    protected String toJson(Object object) {
291        try {
292            return mapper.writeValueAsString(object);
293        } catch (IOException e) {
294            throw new NuxeoException(e);
295        }
296    }
297
298    public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) {
299        KeyValueStore kvs = getKeyValueStore();
300        for (;;) {
301            String oldValue = kvs.getString(key);
302            String newValue = updateFunction.apply(oldValue);
303            if (kvs.compareAndSet(key, oldValue, newValue, ttl)) {
304                break;
305            }
306        }
307    }
308
309    @Override
310    public boolean exists(String key) {
311        KeyValueStore kvs = getKeyValueStore();
312        return kvs.getString(key + DOT_COMPLETED) != null;
313    }
314
315    protected void markEntryExists(String key) {
316        KeyValueStore kvs = getKeyValueStore();
317        kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl);
318    }
319
320    @Override
321    public void putParameter(String key, String parameter, Serializable value) {
322        KeyValueStore kvs = getKeyValueStore();
323        String k = key + DOT_PARAM_DOT + parameter;
324        if (value instanceof String) {
325            kvs.put(k, (String) value, ttl);
326            kvs.put(k + FORMAT, (String) null);
327        } else {
328            byte[] bytes = SerializationUtils.serialize(value);
329            kvs.put(k, bytes, ttl);
330            kvs.put(k + FORMAT, FORMAT_JAVA, ttl);
331        }
332        // atomically add key to param info
333        atomicUpdate(key + DOT_PARAMINFO, json -> {
334            List<String> parameters = jsonToList(json);
335            if (parameters == null) {
336                parameters = new ArrayList<>();
337            }
338            if (!parameters.contains(parameter)) {
339                parameters.add(parameter);
340            }
341            return toJson(parameters);
342        }, ttl);
343        markEntryExists(key);
344    }
345
346    @Override
347    public Serializable getParameter(String key, String parameter) {
348        KeyValueStore kvs = getKeyValueStore();
349        String k = key + DOT_PARAM_DOT + parameter;
350        String format = kvs.getString(k + FORMAT);
351        if (format == null) {
352            return kvs.getString(k);
353        } else {
354            byte[] bytes = kvs.get(k);
355            try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
356                    ObjectInput in = new ObjectInputStream(bis)) {
357                return (Serializable) in.readObject();
358            } catch (IOException | ClassNotFoundException e) {
359                throw new NuxeoException(e);
360            }
361        }
362    }
363
364    @Override
365    public void putParameters(String key, Map<String, Serializable> parameters) {
366        parameters.forEach((param, value) -> putParameter(key, param, value));
367    }
368
369    @Override
370    public Map<String, Serializable> getParameters(String key) {
371        KeyValueStore kvs = getKeyValueStore();
372        // get the list of keys
373        String json = kvs.getString(key + DOT_PARAMINFO);
374        List<String> parameters = jsonToList(json);
375        if (parameters == null) {
376            // if the entry doesn't exist at all return null, otherwise empty
377            if (kvs.getString(key + DOT_COMPLETED) == null) {
378                return null;
379            } else {
380                return Collections.emptyMap();
381            }
382        }
383        // get values
384        Map<String, Serializable> map = new HashMap<>();
385        for (String p : parameters) {
386            Serializable value = getParameter(key, p);
387            if (value != null) {
388                map.put(p, value);
389            }
390        }
391        return map;
392    }
393
394    protected void removeParameters(String key) {
395        KeyValueStore kvs = getKeyValueStore();
396        String json = kvs.getString(key + DOT_PARAMINFO);
397        List<String> parameters = jsonToList(json);
398        if (parameters != null) {
399            for (String parameter : parameters) {
400                String k = key + DOT_PARAM_DOT + parameter;
401                kvs.put(k, (String) null);
402                kvs.put(k + FORMAT, (String) null);
403            }
404        }
405        kvs.put(key + DOT_PARAMINFO, (String) null);
406    }
407
408    @Override
409    public void putBlobs(String key, List<Blob> blobs) {
410        if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) {
411            // do the costly computation of the exact storage size if needed
412            doGC();
413            if (getStorageSize() > absoluteMaxSize) {
414                throw new MaximumTransientSpaceExceeded();
415            }
416        }
417
418        // remove previous blobs
419        removeBlobs(key);
420
421        KeyValueStore kvs = getKeyValueStore();
422        BlobProvider bp = getBlobProvider();
423        long totalSize = 0;
424        int i = 0;
425        for (Blob blob : blobs) {
426            long size = blob.getLength();
427            if (size >= 0) {
428                totalSize += size;
429            }
430            // store blob
431            String blobKey;
432            try {
433                blobKey = bp.writeBlob(blob);
434            } catch (IOException e) {
435                throw new NuxeoException(e);
436            }
437            // write blob data
438            Map<String, String> blobMap = new HashMap<>();
439            blobMap.put(KEY, blobKey);
440            blobMap.put(MIMETYPE, blob.getMimeType());
441            blobMap.put(ENCODING, blob.getEncoding());
442            blobMap.put(FILENAME, blob.getFilename());
443            blobMap.put(LENGTH, String.valueOf(size));
444            blobMap.put(DIGEST, blob.getDigest());
445            kvs.put(key + DOT_BLOB_DOT + i, toJson(blobMap), ttl);
446            i++;
447        }
448        Map<String, String> blobInfoMap = new HashMap<>();
449        blobInfoMap.put(COUNT, String.valueOf(blobs.size()));
450        blobInfoMap.put(SIZE, String.valueOf(totalSize));
451        kvs.put(key + DOT_BLOBINFO, toJson(blobInfoMap), ttl);
452        addStorageSize(totalSize);
453        markEntryExists(key);
454    }
455
456    protected void removeBlobs(String key) {
457        KeyValueStore kvs = getKeyValueStore();
458        String json = kvs.getString(key + DOT_BLOBINFO);
459        Map<String, String> map = jsonToMap(json);
460        if (map == null) {
461            return;
462        }
463        String countStr = map.get(COUNT);
464        int count = countStr == null ? 0 : Integer.parseInt(countStr);
465        String sizeStr = map.get(SIZE);
466        long size = sizeStr == null ? 0 : Long.parseLong(sizeStr);
467
468        // remove blobs
469        for (int i = 0; i < count; i++) {
470            kvs.put(key + DOT_BLOB_DOT + i, (String) null);
471        }
472        kvs.put(key + DOT_BLOBINFO, (String) null);
473        // fix storage size
474        addStorageSize(-size);
475    }
476
477    @Override
478    public List<Blob> getBlobs(String key) {
479        KeyValueStore kvs = getKeyValueStore();
480        BlobProvider bp = getBlobProvider();
481        String info = kvs.getString(key + DOT_BLOBINFO);
482        if (info == null) {
483            // if the entry doesn't exist at all return null, otherwise empty
484            if (kvs.getString(key + DOT_COMPLETED) == null) {
485                return null;
486            } else {
487                return Collections.emptyList();
488            }
489        }
490        Map<String, String> blobInfoMap = jsonToMap(info);
491        String countStr = blobInfoMap.get(COUNT);
492        if (countStr == null) {
493            return Collections.emptyList();
494        }
495        int count = Integer.parseInt(countStr);
496        List<Blob> blobs = new ArrayList<>();
497        for (int i = 0; i < count; i++) {
498            String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i);
499            if (blobMapJson == null) {
500                // corrupted entry, bail out
501                break;
502            }
503            Map<String, String> blobMap = jsonToMap(blobMapJson);
504            String blobKey = blobMap.get(KEY);
505            if (blobKey == null) {
506                // corrupted entry, bail out
507                break;
508            }
509            String mimeType = blobMap.get(MIMETYPE);
510            String encoding = blobMap.get(ENCODING);
511            String filename = blobMap.get(FILENAME);
512            String lengthStr = blobMap.get(LENGTH);
513            Long length = lengthStr == null ? null : Long.valueOf(lengthStr);
514            String digest = blobMap.get(DIGEST);
515            BlobInfo blobInfo = new BlobInfo();
516            blobInfo.key = blobKey;
517            blobInfo.mimeType = mimeType;
518            blobInfo.encoding = encoding;
519            blobInfo.filename = filename;
520            blobInfo.length = length;
521            blobInfo.digest = digest;
522            try {
523                Blob blob = bp.readBlob(blobInfo);
524                blobs.add(blob);
525            } catch (IOException e) {
526                throw new NuxeoException(e);
527            }
528        }
529        return blobs;
530    }
531
532    @Override
533    public long getSize(String key) {
534        KeyValueStore kvs = getKeyValueStore();
535        String json = kvs.getString(key + DOT_BLOBINFO);
536        Map<String, String> map = jsonToMap(json);
537        String size;
538        if (map == null || (size = map.get(SIZE)) == null) {
539            return -1;
540        }
541        return Long.parseLong(size);
542    }
543
544    @Override
545    public boolean isCompleted(String key) {
546        KeyValueStore kvs = getKeyValueStore();
547        String completed = kvs.getString(key + DOT_COMPLETED);
548        return Boolean.parseBoolean(completed);
549    }
550
551    @Override
552    public void setCompleted(String key, boolean completed) {
553        KeyValueStore kvs = getKeyValueStore();
554        kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl);
555    }
556
557    protected void removeCompleted(String key) {
558        KeyValueStore kvs = getKeyValueStore();
559        kvs.put(key + DOT_COMPLETED, (String) null);
560    }
561
562    @Override
563    public void release(String key) {
564        if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) {
565            // do the costly computation of the exact storage size if needed
566            doGC();
567            if (getStorageSize() > targetMaxSize) {
568                remove(key);
569                return;
570            }
571        }
572        setReleaseTTL(key);
573    }
574
575    // set TTL on all keys for this entry
576    protected void setReleaseTTL(String key) {
577        KeyValueStore kvs = getKeyValueStore();
578        kvs.setTTL(key + DOT_COMPLETED, releaseTTL);
579        String json = kvs.getString(key + DOT_PARAMINFO);
580        List<String> parameters = jsonToList(json);
581        if (parameters != null) {
582            parameters.stream().forEach(parameter -> {
583                String k = key + DOT_PARAM_DOT + parameter;
584                kvs.setTTL(k, releaseTTL);
585                kvs.setTTL(k + FORMAT, releaseTTL);
586            });
587        }
588        kvs.setTTL(key + DOT_PARAMINFO, releaseTTL);
589        json = kvs.getString(key + DOT_BLOBINFO);
590        Map<String, String> map = jsonToMap(json);
591        if (map != null) {
592            String countStr = map.get(COUNT);
593            int count = countStr == null ? 0 : Integer.parseInt(countStr);
594            for (int i = 0; i < count; i++) {
595                kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL);
596            }
597        }
598        kvs.setTTL(key + DOT_BLOBINFO, releaseTTL);
599    }
600
601    @Override
602    public void remove(String key) {
603        removeBlobs(key);
604        removeParameters(key);
605        removeCompleted(key);
606    }
607
608}