001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.transientstore.keyvalueblob;
020
021import static java.util.function.Function.identity;
022import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
023
024import java.io.ByteArrayInputStream;
025import java.io.IOException;
026import java.io.ObjectInput;
027import java.io.ObjectInputStream;
028import java.io.Serializable;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.function.Function;
036import java.util.stream.Collectors;
037import java.util.stream.Stream;
038
039import org.apache.commons.lang3.SerializationUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.ecm.core.api.Blob;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.blob.BlobInfo;
045import org.nuxeo.ecm.core.blob.BlobManager;
046import org.nuxeo.ecm.core.blob.BlobProvider;
047import org.nuxeo.ecm.core.blob.ManagedBlob;
048import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
049import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
050import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
051import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider;
052import org.nuxeo.runtime.api.Framework;
053import org.nuxeo.runtime.kv.KeyValueService;
054import org.nuxeo.runtime.kv.KeyValueStore;
055import org.nuxeo.runtime.kv.KeyValueStoreProvider;
056
057import com.fasterxml.jackson.core.type.TypeReference;
058import com.fasterxml.jackson.databind.ObjectMapper;
059
060/**
061 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider.
062 * <p>
063 * The key/value store used is the one with the same name as the transient store itself.
064 * <p>
065 * The blob provider used is the one with the same name as the transient store itself.
066 * <p>
067 * The storage format is the following:
068 *
069 * <pre>
070 *   __blobsize__:       storage size; because entries may expire without us being notified due to their TTL,
071 *                       this may be higher than the actual storage size
072 *
073 *   entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence
074 *
075 *   entryKey.paraminfo: ["foo", "bar"]
076 *
077 *   entryKey.param.foo: value for param foo
078 *   entryKey.param.foo__format: "java" for java serializable format, otherwise string
079 *
080 *   entryKey.param.bar: value for param bar
081 *   etc.
082 *
083 *   entryKey.blobinfo:  {"count": number of blobs,
084 *                        "size": storage size of the blobs}
085 *   entryKey.blob.0:    {"key": key in blob provider for first blob,
086 *                        "mimetype": MIME Type,
087 *                        "encoding": encoding,
088 *                        "filename": filename,
089 *                        "digest": digest}
090 *   entryKey.blob.1:    {...} same for second blob
091 *   etc.
092 * </pre>
093 *
094 * @since 9.3
095 */
096public class KeyValueBlobTransientStore implements TransientStoreProvider {
097
098    private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class);
099
100    public static final String SEP = ".";
101
102    public static final String STORAGE_SIZE = "__blobsize__";
103
104    public static final String DOT_COMPLETED = SEP + "completed";
105
106    public static final String DOT_PARAMINFO = SEP + "paraminfo";
107
108    public static final String DOT_PARAM_DOT = SEP + "param" + SEP;
109
110    public static final String FORMAT = "__format";
111
112    public static final String FORMAT_JAVA = "java";
113
114    public static final String DOT_BLOBINFO = SEP + "blobinfo";
115
116    public static final String COUNT = "count";
117
118    public static final String SIZE = "size";
119
120    public static final String DOT_BLOB_DOT = SEP + "blob" + SEP;
121
122    public static final String KEY = "key";
123
124    public static final String MIMETYPE = "mimetype";
125
126    public static final String ENCODING = "encoding";
127
128    public static final String FILENAME = "filename";
129
130    public static final String LENGTH = "length";
131
132    public static final String DIGEST = "digest";
133
134    public static final String CONFIG_KEY_VALUE_STORE = "keyValueStore";
135
136    public static final String CONFIG_BLOB_PROVIDER = "blobProvider";
137
138    protected String keyValueStoreName;
139
140    protected String blobProviderId;
141
142    /** Basic TTL for all entries. */
143    protected int ttl;
144
145    /** TTL used to keep objects around a bit longer if there's space for them, for caching. */
146    protected int releaseTTL;
147
148    protected long targetMaxSize;
149
150    protected long absoluteMaxSize;
151
152    protected ObjectMapper mapper;
153
154    // ---------- TransientStoreProvider ----------
155
156    @Override
157    public void init(TransientStoreConfig config) {
158        String name = config.getName();
159        Map<String, String> properties = config.getProperties();
160        if (properties == null) {
161            properties = Collections.emptyMap();
162        }
163        keyValueStoreName = defaultIfBlank(properties.get(CONFIG_KEY_VALUE_STORE), name);
164        blobProviderId = defaultIfBlank(properties.get(CONFIG_BLOB_PROVIDER), name);
165        mapper = new ObjectMapper();
166        ttl = config.getFirstLevelTTL() * 60;
167        releaseTTL = config.getSecondLevelTTL() * 60;
168        targetMaxSize = config.getTargetMaxSizeMB() * 1024L * 1024;
169        absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024L * 1024;
170    }
171
172    protected KeyValueStore getKeyValueStore() {
173        return Framework.getService(KeyValueService.class).getKeyValueStore(keyValueStoreName);
174    }
175
176    protected BlobProvider getBlobProvider() {
177        BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderId);
178        if (blobProvider == null) {
179            throw new NuxeoException("No blob provider with id: " + blobProviderId);
180        }
181        return blobProvider;
182    }
183
184    @Override
185    public void shutdown() {
186        // nothing to do
187    }
188
189    @Override
190    public Stream<String> keyStream() {
191        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
192        int len = DOT_COMPLETED.length();
193        return kvs.keyStream() //
194                  .filter(key -> key.endsWith(DOT_COMPLETED))
195                  .map(key -> key.substring(0, key.length() - len));
196    }
197
198    @Override
199    public long getStorageSize() {
200        KeyValueStore kvs = getKeyValueStore();
201        String sizeStr = kvs.getString(STORAGE_SIZE);
202        return sizeStr == null ? 0 : Long.parseLong(sizeStr);
203    }
204
205    protected void addStorageSize(long delta) {
206        atomicUpdate(STORAGE_SIZE, size -> {
207            long s = size == null ? 0 : Long.parseLong(size);
208            return String.valueOf(s + delta);
209        }, 0);
210    }
211
212    /**
213     * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY.
214     * <p>
215     * Does not take into account blob de-duplication that may be done by the blob provider.
216     * <p>
217     * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL
218     * expiration or GC not having been done).
219     */
220    protected void computeStorageSize() {
221        KeyValueStore kvs = getKeyValueStore();
222        long size = keyStream().map(this::getBlobs) //
223                               .flatMap(Collection::stream)
224                               .mapToLong(Blob::getLength)
225                               .sum();
226        kvs.put(STORAGE_SIZE, String.valueOf(size));
227    }
228
229    // also recomputes the exact storage size
230    @Override
231    public void doGC() {
232        BlobProvider bp = getBlobProvider();
233        BinaryGarbageCollector gc = bp.getBinaryManager().getGarbageCollector();
234        gc.start();
235        keyStream().map(this::getBlobs) //
236                   .flatMap(Collection::stream)
237                   .map(ManagedBlob.class::cast)
238                   .map(ManagedBlob::getKey)
239                   .forEach(gc::mark);
240        gc.stop(true); // delete
241        computeStorageSize();
242    }
243
244    @Override
245    public void removeAll() {
246        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
247        kvs.clear();
248        doGC();
249    }
250
251    // ---------- TransientStore ----------
252
253    protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() {
254    };
255
256    protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() {
257    };
258
259    protected List<String> jsonToList(String json) {
260        if (json == null) {
261            return null;
262        }
263        try {
264            return mapper.readValue(json, LIST_STRING);
265        } catch (IOException e) {
266            log.error("Invalid JSON array: " + json);
267            return null;
268        }
269    }
270
271    protected Map<String, String> jsonToMap(String json) {
272        if (json == null) {
273            return null;
274        }
275        try {
276            return mapper.readValue(json, MAP_STRING_STRING);
277        } catch (IOException e) {
278            log.error("Invalid JSON object: " + json);
279            return null;
280        }
281    }
282
283    protected String toJson(Object object) {
284        try {
285            return mapper.writeValueAsString(object);
286        } catch (IOException e) {
287            throw new NuxeoException(e);
288        }
289    }
290
291    public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) {
292        KeyValueStore kvs = getKeyValueStore();
293        for (;;) {
294            String oldValue = kvs.getString(key);
295            String newValue = updateFunction.apply(oldValue);
296            if (kvs.compareAndSet(key, oldValue, newValue, ttl)) {
297                break;
298            }
299        }
300    }
301
302    @Override
303    public boolean exists(String key) {
304        KeyValueStore kvs = getKeyValueStore();
305        return kvs.getString(key + DOT_COMPLETED) != null;
306    }
307
308    protected void markEntryExists(String key) {
309        KeyValueStore kvs = getKeyValueStore();
310        kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl);
311    }
312
313    @Override
314    public void putParameter(String key, String parameter, Serializable value) {
315        KeyValueStore kvs = getKeyValueStore();
316        String k = key + DOT_PARAM_DOT + parameter;
317        if (value instanceof String) {
318            kvs.put(k, (String) value, ttl);
319            kvs.put(k + FORMAT, (String) null);
320        } else {
321            byte[] bytes = SerializationUtils.serialize(value);
322            kvs.put(k, bytes, ttl);
323            kvs.put(k + FORMAT, FORMAT_JAVA, ttl);
324        }
325        // atomically add key to param info
326        atomicUpdate(key + DOT_PARAMINFO, json -> {
327            List<String> parameters = jsonToList(json);
328            if (parameters == null) {
329                parameters = new ArrayList<>();
330            }
331            if (!parameters.contains(parameter)) {
332                parameters.add(parameter);
333            }
334            return toJson(parameters);
335        }, ttl);
336        markEntryExists(key);
337    }
338
339    @Override
340    public Serializable getParameter(String key, String parameter) {
341        KeyValueStore kvs = getKeyValueStore();
342        String k = key + DOT_PARAM_DOT + parameter;
343        String format = kvs.getString(k + FORMAT);
344        if (format == null) {
345            return kvs.getString(k);
346        } else {
347            byte[] bytes = kvs.get(k);
348            try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
349                    ObjectInput in = new ObjectInputStream(bis)) {
350                return (Serializable) in.readObject();
351            } catch (IOException | ClassNotFoundException e) {
352                throw new NuxeoException(e);
353            }
354        }
355    }
356
357    @Override
358    public void putParameters(String key, Map<String, Serializable> parameters) {
359        parameters.forEach((param, value) -> putParameter(key, param, value));
360    }
361
362    @Override
363    public Map<String, Serializable> getParameters(String key) {
364        KeyValueStore kvs = getKeyValueStore();
365        // get the list of keys
366        String json = kvs.getString(key + DOT_PARAMINFO);
367        List<String> parameters = jsonToList(json);
368        if (parameters == null) {
369            // if the entry doesn't exist at all return null, otherwise empty
370            if (kvs.getString(key + DOT_COMPLETED) == null) {
371                return null;
372            } else {
373                return Collections.emptyMap();
374            }
375        }
376        // get values
377        return parameters.stream().collect(Collectors.toMap(identity(), p -> getParameter(key, p)));
378    }
379
380    protected void removeParameters(String key) {
381        KeyValueStore kvs = getKeyValueStore();
382        String json = kvs.getString(key + DOT_PARAMINFO);
383        List<String> parameters = jsonToList(json);
384        if (parameters != null) {
385            for (String parameter : parameters) {
386                String k = key + DOT_PARAM_DOT + parameter;
387                kvs.put(k, (String) null);
388                kvs.put(k + FORMAT, (String) null);
389            }
390        }
391        kvs.put(key + DOT_PARAMINFO, (String) null);
392    }
393
394    @Override
395    public void putBlobs(String key, List<Blob> blobs) {
396        if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) {
397            // do the costly computation of the exact storage size if needed
398            doGC();
399            if (getStorageSize() > absoluteMaxSize) {
400                throw new MaximumTransientSpaceExceeded();
401            }
402        }
403
404        // remove previous blobs
405        removeBlobs(key);
406
407        KeyValueStore kvs = getKeyValueStore();
408        BlobProvider bp = getBlobProvider();
409        long totalSize = 0;
410        int i = 0;
411        for (Blob blob : blobs) {
412            long size = blob.getLength();
413            if (size >= 0) {
414                totalSize += size;
415            }
416            // store blob
417            String blobKey;
418            try {
419                blobKey = bp.writeBlob(blob);
420            } catch (IOException e) {
421                throw new NuxeoException(e);
422            }
423            // write blob data
424            Map<String, String> blobMap = new HashMap<>();
425            blobMap.put(KEY, blobKey);
426            blobMap.put(MIMETYPE, blob.getMimeType());
427            blobMap.put(ENCODING, blob.getEncoding());
428            blobMap.put(FILENAME, blob.getFilename());
429            blobMap.put(LENGTH, String.valueOf(size));
430            blobMap.put(DIGEST, blob.getDigest());
431            kvs.put(key + DOT_BLOB_DOT + i, toJson(blobMap), ttl);
432            i++;
433        }
434        Map<String, String> blobInfoMap = new HashMap<>();
435        blobInfoMap.put(COUNT, String.valueOf(blobs.size()));
436        blobInfoMap.put(SIZE, String.valueOf(totalSize));
437        kvs.put(key + DOT_BLOBINFO, toJson(blobInfoMap), ttl);
438        addStorageSize(totalSize);
439        markEntryExists(key);
440    }
441
442    protected void removeBlobs(String key) {
443        KeyValueStore kvs = getKeyValueStore();
444        String json = kvs.getString(key + DOT_BLOBINFO);
445        Map<String, String> map = jsonToMap(json);
446        if (map == null) {
447            return;
448        }
449        String countStr = map.get(COUNT);
450        int count = countStr == null ? 0 : Integer.parseInt(countStr);
451        String sizeStr = map.get(SIZE);
452        long size = sizeStr == null ? 0 : Long.parseLong(sizeStr);
453
454        // remove blobs
455        for (int i = 0; i < count; i++) {
456            kvs.put(key + DOT_BLOB_DOT + i, (String) null);
457        }
458        kvs.put(key + DOT_BLOBINFO, (String) null);
459        // fix storage size
460        addStorageSize(-size);
461    }
462
463    @Override
464    public List<Blob> getBlobs(String key) {
465        KeyValueStore kvs = getKeyValueStore();
466        BlobProvider bp = getBlobProvider();
467        String info = kvs.getString(key + DOT_BLOBINFO);
468        if (info == null) {
469            // if the entry doesn't exist at all return null, otherwise empty
470            if (kvs.getString(key + DOT_COMPLETED) == null) {
471                return null;
472            } else {
473                return Collections.emptyList();
474            }
475        }
476        Map<String, String> blobInfoMap = jsonToMap(info);
477        String countStr = blobInfoMap.get(COUNT);
478        if (countStr == null) {
479            return Collections.emptyList();
480        }
481        int count = Integer.parseInt(countStr);
482        List<Blob> blobs = new ArrayList<>();
483        for (int i = 0; i < count; i++) {
484            String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i);
485            if (blobMapJson == null) {
486                // corrupted entry, bail out
487                break;
488            }
489            Map<String, String> blobMap = jsonToMap(blobMapJson);
490            String blobKey = blobMap.get(KEY);
491            if (blobKey == null) {
492                // corrupted entry, bail out
493                break;
494            }
495            String mimeType = blobMap.get(MIMETYPE);
496            String encoding = blobMap.get(ENCODING);
497            String filename = blobMap.get(FILENAME);
498            String lengthStr = blobMap.get(LENGTH);
499            Long length = lengthStr == null ? null : Long.valueOf(lengthStr);
500            String digest = blobMap.get(DIGEST);
501            BlobInfo blobInfo = new BlobInfo();
502            blobInfo.key = blobKey;
503            blobInfo.mimeType = mimeType;
504            blobInfo.encoding = encoding;
505            blobInfo.filename = filename;
506            blobInfo.length = length;
507            blobInfo.digest = digest;
508            try {
509                Blob blob = bp.readBlob(blobInfo);
510                blobs.add(blob);
511            } catch (IOException e) {
512                throw new NuxeoException(e);
513            }
514        }
515        return blobs;
516    }
517
518    @Override
519    public long getSize(String key) {
520        KeyValueStore kvs = getKeyValueStore();
521        String json = kvs.getString(key + DOT_BLOBINFO);
522        Map<String, String> map = jsonToMap(json);
523        String size;
524        if (map == null || (size = map.get(SIZE)) == null) {
525            return -1;
526        }
527        return Long.parseLong(size);
528    }
529
530    @Override
531    public boolean isCompleted(String key) {
532        KeyValueStore kvs = getKeyValueStore();
533        String completed = kvs.getString(key + DOT_COMPLETED);
534        return Boolean.parseBoolean(completed);
535    }
536
537    @Override
538    public void setCompleted(String key, boolean completed) {
539        KeyValueStore kvs = getKeyValueStore();
540        kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl);
541    }
542
543    protected void removeCompleted(String key) {
544        KeyValueStore kvs = getKeyValueStore();
545        kvs.put(key + DOT_COMPLETED, (String) null);
546    }
547
548    @Override
549    public void release(String key) {
550        if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) {
551            // do the costly computation of the exact storage size if needed
552            doGC();
553            if (getStorageSize() > targetMaxSize) {
554                remove(key);
555                return;
556            }
557        }
558        setReleaseTTL(key);
559    }
560
561    // set TTL on all keys for this entry
562    protected void setReleaseTTL(String key) {
563        KeyValueStore kvs = getKeyValueStore();
564        kvs.setTTL(key + DOT_COMPLETED, releaseTTL);
565        String json = kvs.getString(key + DOT_PARAMINFO);
566        List<String> parameters = jsonToList(json);
567        if (parameters != null) {
568            parameters.stream().forEach(parameter -> {
569                String k = key + DOT_PARAM_DOT + parameter;
570                kvs.setTTL(k, releaseTTL);
571                kvs.setTTL(k + FORMAT, releaseTTL);
572            });
573        }
574        kvs.setTTL(key + DOT_PARAMINFO, releaseTTL);
575        json = kvs.getString(key + DOT_BLOBINFO);
576        Map<String, String> map = jsonToMap(json);
577        if (map != null) {
578            String countStr = map.get(COUNT);
579            int count = countStr == null ? 0 : Integer.parseInt(countStr);
580            for (int i = 0; i < count; i++) {
581                kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL);
582            }
583        }
584        kvs.setTTL(key + DOT_BLOBINFO, releaseTTL);
585    }
586
587    @Override
588    public void remove(String key) {
589        removeBlobs(key);
590        removeParameters(key);
591        removeCompleted(key);
592    }
593
594}