001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.transientstore.keyvalueblob;
020
021import static java.util.function.Function.identity;
022
023import java.io.IOException;
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.function.Function;
032import java.util.stream.Collectors;
033import java.util.stream.Stream;
034
035import org.apache.commons.lang3.SerializationUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.api.Blob;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.blob.BlobInfo;
041import org.nuxeo.ecm.core.blob.BlobManager;
042import org.nuxeo.ecm.core.blob.BlobProvider;
043import org.nuxeo.ecm.core.blob.ManagedBlob;
044import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
045import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
046import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
047import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.kv.KeyValueService;
050import org.nuxeo.runtime.kv.KeyValueStore;
051import org.nuxeo.runtime.kv.KeyValueStoreProvider;
052
053import com.fasterxml.jackson.core.type.TypeReference;
054import com.fasterxml.jackson.databind.ObjectMapper;
055
056/**
057 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider.
058 * <p>
059 * The key/value store used is the one with the same name as the transient store itself.
060 * <p>
061 * The blob provider used is the one with the same name as the transient store itself.
062 * <p>
063 * The storage format is the following:
064 *
065 * <pre>
066 *   __blobsize__:       storage size; because entries may expire without us being notified due to their TTL,
067 *                       this may be higher than the actual storage size
068 *
069 *   entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence
070 *
071 *   entryKey.paraminfo: ["foo", "bar"]
072 *
073 *   entryKey.param.foo: value for param foo
074 *   entryKey.param.foo__format: "java" for java serializable format, otherwise string
075 *
076 *   entryKey.param.bar: value for param bar
077 *   etc.
078 *
079 *   entryKey.blobinfo:  {"count": number of blobs,
080 *                        "size": storage size of the blobs}
081 *   entryKey.blob.0:    {"key": key in blob provider for first blob,
082 *                        "mimetype": MIME Type,
083 *                        "encoding": encoding,
084 *                        "filename": filename,
085 *                        "digest": digest}
086 *   entryKey.blob.1:    {...} same for second blob
087 *   etc.
088 * </pre>
089 *
090 * @since 9.3
091 */
092public class KeyValueBlobTransientStore implements TransientStoreProvider {
093
094    private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class);
095
096    public static final String SEP = ".";
097
098    public static final String STORAGE_SIZE = "__blobsize__";
099
100    public static final String DOT_COMPLETED = SEP + "completed";
101
102    public static final String DOT_PARAMINFO = SEP + "paraminfo";
103
104    public static final String DOT_PARAM_DOT = SEP + "param" + SEP;
105
106    public static final String FORMAT = "__format";
107
108    public static final String FORMAT_JAVA = "java";
109
110    public static final String DOT_BLOBINFO = SEP + "blobinfo";
111
112    public static final String COUNT = "count";
113
114    public static final String SIZE = "size";
115
116    public static final String DOT_BLOB_DOT = SEP + "blob" + SEP;
117
118    public static final String KEY = "key";
119
120    public static final String MIMETYPE = "mimetype";
121
122    public static final String ENCODING = "encoding";
123
124    public static final String FILENAME = "filename";
125
126    public static final String LENGTH = "length";
127
128    public static final String DIGEST = "digest";
129
130    protected String name;
131
132    /** Basic TTL for all entries. */
133    protected int ttl;
134
135    /** TTL used to keep objects around a bit longer if there's space for them, for caching. */
136    protected int releaseTTL;
137
138    protected long targetMaxSize;
139
140    protected long absoluteMaxSize;
141
142    protected ObjectMapper mapper;
143
144    // ---------- TransientStoreProvider ----------
145
146    @Override
147    public void init(TransientStoreConfig config) {
148        name = config.getName();
149        mapper = new ObjectMapper();
150        ttl = config.getFirstLevelTTL() * 60;
151        releaseTTL = config.getSecondLevelTTL() * 60;
152        targetMaxSize = config.getTargetMaxSizeMB() * 1024 * 1024;
153        absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024 * 1024;
154    }
155
156    protected KeyValueStore getKeyValueStore() {
157        return Framework.getService(KeyValueService.class).getKeyValueStore(name);
158    }
159
160    protected BlobProvider getBlobProvider() {
161        return Framework.getService(BlobManager.class).getBlobProvider(name);
162    }
163
164    @Override
165    public void shutdown() {
166    }
167
168    @Override
169    public Stream<String> keyStream() {
170        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
171        int len = DOT_COMPLETED.length();
172        return kvs.keyStream() //
173                  .filter(key -> key.endsWith(DOT_COMPLETED))
174                  .map(key -> key.substring(0, key.length() - len));
175    }
176
177    @Override
178    public long getStorageSize() {
179        KeyValueStore kvs = getKeyValueStore();
180        String sizeStr = kvs.getString(STORAGE_SIZE);
181        return sizeStr == null ? 0 : Long.parseLong(sizeStr);
182    }
183
184    protected void addStorageSize(long delta) {
185        atomicUpdate(STORAGE_SIZE, size -> {
186            long s = size == null ? 0 : Long.parseLong(size);
187            return String.valueOf(s + delta);
188        }, 0);
189    }
190
191    /**
192     * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY.
193     * <p>
194     * Does not take into account blob de-duplication that may be done by the blob provider.
195     * <p>
196     * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL
197     * expiration or GC not having been done).
198     */
199    protected void computeStorageSize() {
200        KeyValueStore kvs = getKeyValueStore();
201        long size = keyStream().map(this::getBlobs) //
202                               .flatMap(Collection::stream)
203                               .mapToLong(Blob::getLength)
204                               .sum();
205        kvs.put(STORAGE_SIZE, String.valueOf(size));
206    }
207
208    // also recomputes the exact storage size
209    @Override
210    public void doGC() {
211        BlobProvider bp = getBlobProvider();
212        BinaryGarbageCollector gc = bp.getBinaryManager().getGarbageCollector();
213        gc.start();
214        keyStream().map(this::getBlobs) //
215                   .flatMap(Collection::stream)
216                   .map(ManagedBlob.class::cast)
217                   .map(ManagedBlob::getKey)
218                   .forEach(gc::mark);
219        gc.stop(true); // delete
220        computeStorageSize();
221    }
222
223    @Override
224    public void removeAll() {
225        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
226        kvs.clear();
227        doGC();
228    }
229
230    // ---------- TransientStore ----------
231
232    protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() {
233    };
234
235    protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() {
236    };
237
238    protected List<String> jsonToList(String json) {
239        if (json == null) {
240            return null;
241        }
242        try {
243            return mapper.readValue(json, LIST_STRING);
244        } catch (IOException e) {
245            log.error("Invalid JSON array: " + json);
246            return null;
247        }
248    }
249
250    protected Map<String, String> jsonToMap(String json) {
251        if (json == null) {
252            return null;
253        }
254        try {
255            return mapper.readValue(json, MAP_STRING_STRING);
256        } catch (IOException e) {
257            log.error("Invalid JSON object: " + json);
258            return null;
259        }
260    }
261
262    protected String toJson(Object object) {
263        try {
264            return mapper.writeValueAsString(object);
265        } catch (IOException e) {
266            throw new NuxeoException(e);
267        }
268    }
269
270    public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) {
271        KeyValueStore kvs = getKeyValueStore();
272        for (;;) {
273            String oldValue = kvs.getString(key);
274            String newValue = updateFunction.apply(oldValue);
275            if (kvs.compareAndSet(key, oldValue, newValue, ttl)) {
276                break;
277            }
278        }
279    }
280
281    @Override
282    public boolean exists(String key) {
283        KeyValueStore kvs = getKeyValueStore();
284        return kvs.getString(key + DOT_COMPLETED) != null;
285    }
286
287    protected void markEntryExists(String key) {
288        KeyValueStore kvs = getKeyValueStore();
289        kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl);
290    }
291
292    @Override
293    public void putParameter(String key, String parameter, Serializable value) {
294        KeyValueStore kvs = getKeyValueStore();
295        String k = key + DOT_PARAM_DOT + parameter;
296        if (value instanceof String) {
297            kvs.put(k, (String) value, ttl);
298            kvs.put(k + FORMAT, (String) null);
299        } else {
300            byte[] bytes = SerializationUtils.serialize(value);
301            kvs.put(k, bytes, ttl);
302            kvs.put(k + FORMAT, FORMAT_JAVA, ttl);
303        }
304        // atomically add key to param info
305        atomicUpdate(key + DOT_PARAMINFO, json -> {
306            List<String> parameters = jsonToList(json);
307            if (parameters == null) {
308                parameters = new ArrayList<>();
309            }
310            if (!parameters.contains(parameter)) {
311                parameters.add(parameter);
312            }
313            return toJson(parameters);
314        }, ttl);
315        markEntryExists(key);
316    }
317
318    @Override
319    public Serializable getParameter(String key, String parameter) {
320        KeyValueStore kvs = getKeyValueStore();
321        String k = key + DOT_PARAM_DOT + parameter;
322        String format = kvs.getString(k + FORMAT);
323        if (format == null) {
324            return kvs.getString(k);
325        } else {
326            byte[] bytes = kvs.get(k);
327            return SerializationUtils.deserialize(bytes);
328        }
329    }
330
331    @Override
332    public void putParameters(String key, Map<String, Serializable> parameters) {
333        parameters.forEach((param, value) -> putParameter(key, param, value));
334    }
335
336    @Override
337    public Map<String, Serializable> getParameters(String key) {
338        KeyValueStore kvs = getKeyValueStore();
339        // get the list of keys
340        String json = kvs.getString(key + DOT_PARAMINFO);
341        List<String> parameters = jsonToList(json);
342        if (parameters == null) {
343            // if the entry doesn't exist at all return null, otherwise empty
344            if (kvs.getString(key + DOT_COMPLETED) == null) {
345                return null;
346            } else {
347                return Collections.emptyMap();
348            }
349        }
350        // get values
351        return parameters.stream().collect(Collectors.toMap(identity(), p -> getParameter(key, p)));
352    }
353
354    protected void removeParameters(String key) {
355        KeyValueStore kvs = getKeyValueStore();
356        String json = kvs.getString(key + DOT_PARAMINFO);
357        List<String> parameters = jsonToList(json);
358        if (parameters != null) {
359            for (String parameter : parameters) {
360                String k = key + DOT_PARAM_DOT + parameter;
361                kvs.put(k, (String) null);
362                kvs.put(k + FORMAT, (String) null);
363            }
364        }
365        kvs.put(key + DOT_PARAMINFO, (String) null);
366    }
367
368    @Override
369    public void putBlobs(String key, List<Blob> blobs) {
370        if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) {
371            // do the costly computation of the exact storage size if needed
372            doGC();
373            if (getStorageSize() > absoluteMaxSize) {
374                throw new MaximumTransientSpaceExceeded();
375            }
376        }
377
378        // remove previous blobs
379        removeBlobs(key);
380
381        KeyValueStore kvs = getKeyValueStore();
382        BlobProvider bp = getBlobProvider();
383        long totalSize = 0;
384        int i = 0;
385        for (Blob blob : blobs) {
386            long size = blob.getLength();
387            if (size >= 0) {
388                totalSize += size;
389            }
390            // store blob
391            String blobKey;
392            try {
393                blobKey = bp.writeBlob(blob);
394            } catch (IOException e) {
395                throw new NuxeoException(e);
396            }
397            // write blob data
398            Map<String, String> blobMap = new HashMap<>();
399            blobMap.put(KEY, blobKey);
400            blobMap.put(MIMETYPE, blob.getMimeType());
401            blobMap.put(ENCODING, blob.getEncoding());
402            blobMap.put(FILENAME, blob.getFilename());
403            blobMap.put(LENGTH, String.valueOf(size));
404            blobMap.put(DIGEST, blob.getDigest());
405            kvs.put(key + DOT_BLOB_DOT + i, toJson(blobMap), ttl);
406            i++;
407        }
408        Map<String, String> blobInfoMap = new HashMap<>();
409        blobInfoMap.put(COUNT, String.valueOf(blobs.size()));
410        blobInfoMap.put(SIZE, String.valueOf(totalSize));
411        kvs.put(key + DOT_BLOBINFO, toJson(blobInfoMap), ttl);
412        addStorageSize(totalSize);
413        markEntryExists(key);
414    }
415
416    protected void removeBlobs(String key) {
417        KeyValueStore kvs = getKeyValueStore();
418        String json = kvs.getString(key + DOT_BLOBINFO);
419        Map<String, String> map = jsonToMap(json);
420        if (map == null) {
421            return;
422        }
423        String countStr = map.get(COUNT);
424        int count = countStr == null ? 0 : Integer.parseInt(countStr);
425        String sizeStr = map.get(SIZE);
426        long size = sizeStr == null ? 0 : Long.parseLong(sizeStr);
427
428        // remove blobs
429        for (int i = 0; i < count; i++) {
430            kvs.put(key + DOT_BLOB_DOT + i, (String) null);
431        }
432        kvs.put(key + DOT_BLOBINFO, (String) null);
433        // fix storage size
434        addStorageSize(-size);
435    }
436
437    @Override
438    public List<Blob> getBlobs(String key) {
439        KeyValueStore kvs = getKeyValueStore();
440        BlobProvider bp = getBlobProvider();
441        String info = kvs.getString(key + DOT_BLOBINFO);
442        if (info == null) {
443            // if the entry doesn't exist at all return null, otherwise empty
444            if (kvs.getString(key + DOT_COMPLETED) == null) {
445                return null;
446            } else {
447                return Collections.emptyList();
448            }
449        }
450        Map<String, String> blobInfoMap = jsonToMap(info);
451        String countStr = blobInfoMap.get(COUNT);
452        if (countStr == null) {
453            return Collections.emptyList();
454        }
455        int count = Integer.parseInt(countStr);
456        List<Blob> blobs = new ArrayList<>();
457        for (int i = 0; i < count; i++) {
458            String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i);
459            if (blobMapJson == null) {
460                // corrupted entry, bail out
461                break;
462            }
463            Map<String, String> blobMap = jsonToMap(blobMapJson);
464            String blobKey = blobMap.get(KEY);
465            if (blobKey == null) {
466                // corrupted entry, bail out
467                break;
468            }
469            String mimeType = blobMap.get(MIMETYPE);
470            String encoding = blobMap.get(ENCODING);
471            String filename = blobMap.get(FILENAME);
472            String lengthStr = blobMap.get(LENGTH);
473            Long length = lengthStr == null ? null : Long.valueOf(lengthStr);
474            String digest = blobMap.get(DIGEST);
475            BlobInfo blobInfo = new BlobInfo();
476            blobInfo.key = blobKey;
477            blobInfo.mimeType = mimeType;
478            blobInfo.encoding = encoding;
479            blobInfo.filename = filename;
480            blobInfo.length = length;
481            blobInfo.digest = digest;
482            try {
483                Blob blob = bp.readBlob(blobInfo);
484                blobs.add(blob);
485            } catch (IOException e) {
486                throw new NuxeoException(e);
487            }
488        }
489        return blobs;
490    }
491
492    @Override
493    public long getSize(String key) {
494        KeyValueStore kvs = getKeyValueStore();
495        String json = kvs.getString(key + DOT_BLOBINFO);
496        Map<String, String> map = jsonToMap(json);
497        String size;
498        if (map == null || (size = map.get(SIZE)) == null) {
499            return -1;
500        }
501        return Long.parseLong(size);
502    }
503
504    @Override
505    public boolean isCompleted(String key) {
506        KeyValueStore kvs = getKeyValueStore();
507        String completed = kvs.getString(key + DOT_COMPLETED);
508        return Boolean.parseBoolean(completed);
509    }
510
511    @Override
512    public void setCompleted(String key, boolean completed) {
513        KeyValueStore kvs = getKeyValueStore();
514        kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl);
515    }
516
517    protected void removeCompleted(String key) {
518        KeyValueStore kvs = getKeyValueStore();
519        kvs.put(key + DOT_COMPLETED, (String) null);
520    }
521
522    @Override
523    public void release(String key) {
524        if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) {
525            // do the costly computation of the exact storage size if needed
526            doGC();
527            if (getStorageSize() > targetMaxSize) {
528                remove(key);
529                return;
530            }
531        }
532        setReleaseTTL(key);
533    }
534
535    // set TTL on all keys for this entry
536    protected void setReleaseTTL(String key) {
537        KeyValueStore kvs = getKeyValueStore();
538        kvs.setTTL(key + DOT_COMPLETED, releaseTTL);
539        String json = kvs.getString(key + DOT_PARAMINFO);
540        List<String> parameters = jsonToList(json);
541        if (parameters != null) {
542            parameters.stream().forEach(parameter -> {
543                String k = key + DOT_PARAM_DOT + parameter;
544                kvs.setTTL(k, releaseTTL);
545                kvs.setTTL(k + FORMAT, releaseTTL);
546            });
547        }
548        kvs.setTTL(key + DOT_PARAMINFO, releaseTTL);
549        json = kvs.getString(key + DOT_BLOBINFO);
550        Map<String, String> map = jsonToMap(json);
551        if (map != null) {
552            String countStr = map.get(COUNT);
553            int count = countStr == null ? 0 : Integer.parseInt(countStr);
554            for (int i = 0; i < count; i++) {
555                kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL);
556            }
557        }
558        kvs.setTTL(key + DOT_BLOBINFO, releaseTTL);
559    }
560
561    @Override
562    public void remove(String key) {
563        removeBlobs(key);
564        removeParameters(key);
565        removeCompleted(key);
566    }
567
568}