001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.transientstore.keyvalueblob;
020
021import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
022
023import java.io.ByteArrayInputStream;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectInputStream;
027import java.io.Serializable;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.concurrent.TimeUnit;
035import java.util.function.BooleanSupplier;
036import java.util.function.Function;
037import java.util.stream.Stream;
038
039import org.apache.commons.lang3.SerializationUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.nuxeo.ecm.core.api.Blob;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.blob.BlobInfo;
045import org.nuxeo.ecm.core.blob.BlobManager;
046import org.nuxeo.ecm.core.blob.BlobManagerComponent;
047import org.nuxeo.ecm.core.blob.BlobProvider;
048import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
049import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded;
050import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
051import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider;
052import org.nuxeo.runtime.api.Framework;
053import org.nuxeo.runtime.kv.KeyValueService;
054import org.nuxeo.runtime.kv.KeyValueStore;
055import org.nuxeo.runtime.kv.KeyValueStoreProvider;
056
057import com.fasterxml.jackson.core.type.TypeReference;
058import com.fasterxml.jackson.databind.ObjectMapper;
059
060/**
061 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider.
062 * <p>
063 * This transient store is configured with the following properties:
064 * <ul>
065 * <li><em>keyValueStore</em>: the name of the key/value store to use. If not provided, it defaults to "transient_" +
066 * the transient store name.
067 * <li><em>blobProvider</em>: the name of the blob provider to use. If not provided, it defaults to "transient_" + the
068 * transient store name.
069 * <li><em>defaultBlobProvider</em>: if the configured or defaulted blob provider doesn't exist, a namespaced copy of
070 * this one will be used instead. The default is "default".
071 * </ul>
072 * <p>
073 * The storage format is the following:
074 *
075 * <pre>
076 *   __blobsize__:       storage size; because entries may expire without us being notified due to their TTL,
077 *                       this may be higher than the actual storage size
078 *
079 *   entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence
080 *
081 *   entryKey.paraminfo: ["foo", "bar"]
082 *
083 *   entryKey.param.foo: value for param foo
084 *   entryKey.param.foo__format: "java" for java serializable format, otherwise string
085 *
086 *   entryKey.param.bar: value for param bar
087 *   etc.
088 *
089 *   entryKey.bloblock:  "true" if there is a blob read/write in progress, null otherwise
090 *   entryKey.blobinfo:  {"count": number of blobs,
091 *                        "size": storage size of the blobs}
092 *   entryKey.blob.0:    {"key": key in blob provider for first blob,
093 *                        "mimetype": MIME Type,
094 *                        "encoding": encoding,
095 *                        "filename": filename,
096 *                        "digest": digest}
097 *   entryKey.blob.1:    {...} same for second blob
098 *   etc.
099 * </pre>
100 *
101 * @since 9.3
102 */
103public class KeyValueBlobTransientStore implements TransientStoreProvider {
104
105    private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class);
106
107    public static final String SEP = ".";
108
109    public static final String STORAGE_SIZE = "__blobsize__";
110
111    public static final String DOT_COMPLETED = SEP + "completed";
112
113    public static final String DOT_PARAMINFO = SEP + "paraminfo";
114
115    public static final String DOT_PARAM_DOT = SEP + "param" + SEP;
116
117    public static final String FORMAT = "__format";
118
119    public static final String FORMAT_JAVA = "java";
120
121    /** @since 11.1 */
122    public static final String DOT_BLOBLOCK = SEP + "bloblock";
123
124    public static final String DOT_BLOBINFO = SEP + "blobinfo";
125
126    public static final String COUNT = "count";
127
128    public static final String SIZE = "size";
129
130    public static final String DOT_BLOB_DOT = SEP + "blob" + SEP;
131
132    public static final String KEY = "key";
133
134    public static final String MIMETYPE = "mimetype";
135
136    public static final String ENCODING = "encoding";
137
138    public static final String FILENAME = "filename";
139
140    public static final String LENGTH = "length";
141
142    public static final String DIGEST = "digest";
143
144    public static final String CONFIG_KEY_VALUE_STORE = "keyValueStore";
145
146    public static final String CONFIG_BLOB_PROVIDER = "blobProvider";
147
148    /** @since 11.1 */
149    public static final String CONFIG_DEFAULT_BLOB_PROVIDER = "defaultBlobProvider";
150
151    /** @since 11.1 */
152    public static final String CONFIG_DEFAULT_BLOB_PROVIDER_DEFAULT = "default";
153
154    /** @since 11.1 */
155    protected static final int BLOB_LOCK_TTL = 60; // don't keep any lock longer than 60s
156
157    /** @since 11.1 */
158    protected static final long LOCK_ACQUIRE_TIME_NANOS = TimeUnit.SECONDS.toNanos(5);
159
160    /** @since 11.1 */
161    protected static final long LOCK_EXPONENTIAL_BACKOFF_AFTER_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
162
163    protected String name;
164
165    protected String keyValueStoreName;
166
167    protected String blobProviderId;
168
169    protected String defaultBlobProviderId;
170
171    /** Basic TTL for all entries. */
172    protected int ttl;
173
174    /** TTL used to keep objects around a bit longer if there's space for them, for caching. */
175    protected int releaseTTL;
176
177    protected long targetMaxSize;
178
179    protected long absoluteMaxSize;
180
181    protected ObjectMapper mapper;
182
183    // ---------- TransientStoreProvider ----------
184
185    @Override
186    public void init(TransientStoreConfig config) {
187        name = config.getName();
188        String defaultName = name;
189        if (!defaultName.startsWith(BlobManagerComponent.TRANSIENT_ID_PREFIX)) {
190            defaultName = BlobManagerComponent.TRANSIENT_ID_PREFIX + "_" + defaultName;
191        }
192        Map<String, String> properties = config.getProperties();
193        if (properties == null) {
194            properties = Collections.emptyMap();
195        }
196        keyValueStoreName = defaultIfBlank(properties.get(CONFIG_KEY_VALUE_STORE), defaultName);
197        blobProviderId = defaultIfBlank(properties.get(CONFIG_BLOB_PROVIDER), defaultName);
198        defaultBlobProviderId = defaultIfBlank(properties.get(CONFIG_DEFAULT_BLOB_PROVIDER),
199                CONFIG_DEFAULT_BLOB_PROVIDER_DEFAULT);
200        mapper = new ObjectMapper();
201        ttl = config.getFirstLevelTTL() * 60;
202        releaseTTL = config.getSecondLevelTTL() * 60;
203        targetMaxSize = config.getTargetMaxSizeMB() * 1024L * 1024;
204        absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024L * 1024;
205    }
206
207    protected KeyValueStore getKeyValueStore() {
208        return Framework.getService(KeyValueService.class).getKeyValueStore(keyValueStoreName);
209    }
210
211    protected BlobProvider getBlobProvider() {
212        BlobProvider blobProvider = Framework.getService(BlobManager.class)
213                                             .getBlobProviderWithNamespace(blobProviderId, defaultBlobProviderId);
214        if (blobProvider == null) {
215            throw new NuxeoException("No blob provider with id: " + blobProviderId);
216        }
217        if (!blobProvider.isTransient()) {
218            throw new NuxeoException("Blob provider: " + blobProviderId + " used for Key/Value store: "
219                    + keyValueStoreName + " must be configured as transient");
220        }
221        return blobProvider;
222    }
223
224    @Override
225    public void shutdown() {
226        // nothing to do
227    }
228
229    @Override
230    public Stream<String> keyStream() {
231        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
232        int len = DOT_COMPLETED.length();
233        return kvs.keyStream() //
234                  .filter(key -> key.endsWith(DOT_COMPLETED))
235                  .map(key -> key.substring(0, key.length() - len));
236    }
237
238    @Override
239    public long getStorageSize() {
240        KeyValueStore kvs = getKeyValueStore();
241        String sizeStr = kvs.getString(STORAGE_SIZE);
242        return sizeStr == null ? 0 : Long.parseLong(sizeStr);
243    }
244
245    /** @deprecated since 11.1 */
246    @Deprecated
247    protected void addStorageSize(long delta) {
248        KeyValueStore kvs = getKeyValueStore();
249        addStorageSize(delta, kvs);
250    }
251
252    protected void addStorageSize(long delta, KeyValueStore kvs) {
253        atomicUpdate(STORAGE_SIZE, size -> {
254            long s = size == null ? 0 : Long.parseLong(size);
255            return String.valueOf(s + delta);
256        }, 0, kvs);
257    }
258
259    /**
260     * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY.
261     * <p>
262     * Does not take into account blob de-duplication that may be done by the blob provider.
263     * <p>
264     * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL
265     * expiration or GC not having been done).
266     */
267    protected void computeStorageSize() {
268        KeyValueStore kvs = getKeyValueStore();
269        long size = keyStream().map(this::getBlobs) //
270                               .flatMap(Collection::stream)
271                               .mapToLong(Blob::getLength)
272                               .sum();
273        kvs.put(STORAGE_SIZE, String.valueOf(size));
274    }
275
276    // also recomputes the exact storage size
277    @Override
278    public void doGC() {
279        BlobProvider bp = getBlobProvider();
280        BinaryGarbageCollector gc = bp.getBinaryGarbageCollector();
281        boolean delete = false;
282        gc.start();
283        try {
284            keyStream().map(this::getBlobKeys) //
285                       .flatMap(Collection::stream)
286                       .forEach(gc::mark);
287            delete = true;
288        } finally {
289            // don't delete if there's an exception, but still stop the GC
290            gc.stop(delete);
291        }
292        computeStorageSize();
293    }
294
295    @Override
296    public void removeAll() {
297        KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore();
298        kvs.clear();
299        doGC();
300    }
301
302    // ---------- TransientStore ----------
303
304    protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() {
305    };
306
307    protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() {
308    };
309
310    protected List<String> jsonToList(String json) {
311        if (json == null) {
312            return null;
313        }
314        try {
315            return mapper.readValue(json, LIST_STRING);
316        } catch (IOException e) {
317            log.error("Invalid JSON array: " + json);
318            return null;
319        }
320    }
321
322    protected Map<String, String> jsonToMap(String json) {
323        if (json == null) {
324            return null;
325        }
326        try {
327            return mapper.readValue(json, MAP_STRING_STRING);
328        } catch (IOException e) {
329            log.error("Invalid JSON object: " + json);
330            return null;
331        }
332    }
333
334    protected String toJson(Object object) {
335        try {
336            return mapper.writeValueAsString(object);
337        } catch (IOException e) {
338            throw new NuxeoException(e);
339        }
340    }
341
342    /** @deprecated since 11.1 */
343    @Deprecated
344    public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) {
345        KeyValueStore kvs = getKeyValueStore();
346        atomicUpdate(key, updateFunction, ttl, kvs);
347    }
348
349    protected void atomicUpdate(String key, Function<String, String> updateFunction, long ttl, KeyValueStore kvs) {
350        for (;;) {
351            String oldValue = kvs.getString(key);
352            String newValue = updateFunction.apply(oldValue);
353            if (kvs.compareAndSet(key, oldValue, newValue, ttl)) {
354                break;
355            }
356        }
357    }
358
359    @Override
360    public boolean exists(String key) {
361        KeyValueStore kvs = getKeyValueStore();
362        return kvs.getString(key + DOT_COMPLETED) != null;
363    }
364
365    /** @deprecated since 11.1 */
366    @Deprecated
367    protected void markEntryExists(String key) {
368        KeyValueStore kvs = getKeyValueStore();
369        markEntryExists(key, kvs);
370    }
371
372    protected void markEntryExists(String key, KeyValueStore kvs) {
373        kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl);
374    }
375
376    @Override
377    public void putParameter(String key, String parameter, Serializable value) {
378        KeyValueStore kvs = getKeyValueStore();
379        String k = key + DOT_PARAM_DOT + parameter;
380        if (value instanceof String) {
381            kvs.put(k, (String) value, ttl);
382            kvs.put(k + FORMAT, (String) null);
383        } else {
384            byte[] bytes = SerializationUtils.serialize(value);
385            kvs.put(k, bytes, ttl);
386            kvs.put(k + FORMAT, FORMAT_JAVA, ttl);
387        }
388        // atomically add key to param info
389        atomicUpdate(key + DOT_PARAMINFO, json -> {
390            List<String> parameters = jsonToList(json);
391            if (parameters == null) {
392                parameters = new ArrayList<>();
393            }
394            if (!parameters.contains(parameter)) {
395                parameters.add(parameter);
396            }
397            return toJson(parameters);
398        }, ttl, kvs);
399        markEntryExists(key, kvs);
400    }
401
402    @Override
403    public Serializable getParameter(String key, String parameter) {
404        KeyValueStore kvs = getKeyValueStore();
405        String k = key + DOT_PARAM_DOT + parameter;
406        String format = kvs.getString(k + FORMAT);
407        if (format == null) {
408            return kvs.getString(k);
409        } else {
410            byte[] bytes = kvs.get(k);
411            try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
412                    ObjectInput in = new ObjectInputStream(bis)) {
413                return (Serializable) in.readObject();
414            } catch (IOException | ClassNotFoundException e) {
415                throw new NuxeoException(e);
416            }
417        }
418    }
419
420    @Override
421    public void putParameters(String key, Map<String, Serializable> parameters) {
422        parameters.forEach((param, value) -> putParameter(key, param, value));
423    }
424
425    @Override
426    public Map<String, Serializable> getParameters(String key) {
427        KeyValueStore kvs = getKeyValueStore();
428        // get the list of keys
429        String json = kvs.getString(key + DOT_PARAMINFO);
430        List<String> parameters = jsonToList(json);
431        if (parameters == null) {
432            // if the entry doesn't exist at all return null, otherwise empty
433            if (kvs.getString(key + DOT_COMPLETED) == null) {
434                return null;
435            } else {
436                return Collections.emptyMap();
437            }
438        }
439        // get values
440        Map<String, Serializable> map = new HashMap<>();
441        for (String p : parameters) {
442            Serializable value = getParameter(key, p);
443            if (value != null) {
444                map.put(p, value);
445            }
446        }
447        return map;
448    }
449
450    /** @deprecated since 11.1 */
451    @Deprecated
452    protected void removeParameters(String key) {
453        KeyValueStore kvs = getKeyValueStore();
454        removeParameters(key, kvs);
455    }
456
457    protected void removeParameters(String key, KeyValueStore kvs) {
458        String json = kvs.getString(key + DOT_PARAMINFO);
459        List<String> parameters = jsonToList(json);
460        if (parameters != null) {
461            for (String parameter : parameters) {
462                String k = key + DOT_PARAM_DOT + parameter;
463                kvs.put(k, (String) null);
464                kvs.put(k + FORMAT, (String) null);
465            }
466        }
467        kvs.put(key + DOT_PARAMINFO, (String) null);
468    }
469
470    @Override
471    public void putBlobs(String key, List<Blob> blobs) {
472        if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) {
473            // do the costly computation of the exact storage size if needed
474            doGC();
475            if (getStorageSize() > absoluteMaxSize) {
476                throw new MaximumTransientSpaceExceeded();
477            }
478        }
479
480        // first, outside the lock
481        // store the blobs, and compute the total size and the blob maps
482        BlobProvider bp = getBlobProvider();
483        long totalSize = 0;
484        List<String> blobMapJsons = new ArrayList<>();
485        for (Blob blob : blobs) {
486            long size = blob.getLength();
487            if (size >= 0) {
488                totalSize += size;
489            }
490            // store blob
491            String blobKey;
492            try {
493                blobKey = bp.writeBlob(blob);
494            } catch (IOException e) {
495                throw new NuxeoException(e);
496            }
497            // compute blob data
498            Map<String, String> blobMap = new HashMap<>();
499            blobMap.put(KEY, blobKey);
500            blobMap.put(MIMETYPE, blob.getMimeType());
501            blobMap.put(ENCODING, blob.getEncoding());
502            blobMap.put(FILENAME, blob.getFilename());
503            blobMap.put(LENGTH, String.valueOf(size));
504            blobMap.put(DIGEST, blob.getDigest());
505            String blobMapJson = toJson(blobMap);
506            blobMapJsons.add(blobMapJson);
507        }
508        Map<String, String> blobInfoMap = new HashMap<>();
509        blobInfoMap.put(COUNT, String.valueOf(blobs.size()));
510        blobInfoMap.put(SIZE, String.valueOf(totalSize));
511        String blobInfoMapJson = toJson(blobInfoMap);
512
513        // acquire a lock while writing
514        KeyValueStore kvs = getKeyValueStore();
515        acquireBlobLockOrThrow(key, kvs);
516        try {
517            // remove previous blobs
518            removeBlobs(key, kvs);
519            // write new blobs maps
520            int i = 0;
521            for (String blobMapJson : blobMapJsons) {
522                kvs.put(key + DOT_BLOB_DOT + i, blobMapJson, ttl);
523                i++;
524            }
525            // write blob info
526            kvs.put(key + DOT_BLOBINFO, blobInfoMapJson, ttl);
527            addStorageSize(totalSize, kvs);
528            markEntryExists(key, kvs);
529        } finally {
530            releaseBlobLock(key, kvs);
531        }
532    }
533
534    /** @deprecated since 11.1 */
535    @Deprecated
536    protected void removeBlobs(String key) {
537        KeyValueStore kvs = getKeyValueStore();
538        removeBlobs(key, kvs);
539    }
540
541    protected void removeBlobs(String key, KeyValueStore kvs) {
542        String json = kvs.getString(key + DOT_BLOBINFO);
543        Map<String, String> map = jsonToMap(json);
544        if (map == null) {
545            return;
546        }
547        String countStr = map.get(COUNT);
548        int count = countStr == null ? 0 : Integer.parseInt(countStr);
549        String sizeStr = map.get(SIZE);
550        long size = sizeStr == null ? 0 : Long.parseLong(sizeStr);
551
552        // remove blobs
553        for (int i = 0; i < count; i++) {
554            kvs.put(key + DOT_BLOB_DOT + i, (String) null);
555        }
556        kvs.put(key + DOT_BLOBINFO, (String) null);
557        // fix storage size
558        addStorageSize(-size, kvs);
559    }
560
561    @Override
562    public List<Blob> getBlobs(String key) {
563        KeyValueStore kvs = getKeyValueStore();
564        BlobProvider bp = getBlobProvider();
565        List<String> blobMapJsons = new ArrayList<>();
566
567        // try to acquire a lock but still proceed without the lock (best effort)
568        boolean lockAcquired = tryAcquireBlobLock(key, kvs);
569        try {
570            String info = kvs.getString(key + DOT_BLOBINFO);
571            if (info == null) {
572                // if the entry doesn't exist at all return null, otherwise empty
573                if (kvs.getString(key + DOT_COMPLETED) == null) {
574                    return null;
575                } else {
576                    return Collections.emptyList();
577                }
578            }
579            Map<String, String> blobInfoMap = jsonToMap(info);
580            String countStr = blobInfoMap.get(COUNT);
581            if (countStr == null) {
582                return Collections.emptyList();
583            }
584            int count = Integer.parseInt(countStr);
585            for (int i = 0; i < count; i++) {
586                String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i);
587                blobMapJsons.add(blobMapJson);
588            }
589        } finally {
590            if (lockAcquired) {
591                releaseBlobLock(key, kvs);
592            }
593        }
594
595        // compute blobs from read blob maps
596        List<Blob> blobs = new ArrayList<>();
597        for (String blobMapJson : blobMapJsons) {
598            if (blobMapJson == null) {
599                // corrupted entry, bail out
600                break;
601            }
602            Map<String, String> blobMap = jsonToMap(blobMapJson);
603            String blobKey = blobMap.get(KEY);
604            if (blobKey == null) {
605                // corrupted entry, bail out
606                break;
607            }
608            String mimeType = blobMap.get(MIMETYPE);
609            String encoding = blobMap.get(ENCODING);
610            String filename = blobMap.get(FILENAME);
611            String lengthStr = blobMap.get(LENGTH);
612            Long length = lengthStr == null ? null : Long.valueOf(lengthStr);
613            String digest = blobMap.get(DIGEST);
614            BlobInfo blobInfo = new BlobInfo();
615            blobInfo.key = blobKey;
616            blobInfo.mimeType = mimeType;
617            blobInfo.encoding = encoding;
618            blobInfo.filename = filename;
619            blobInfo.length = length;
620            blobInfo.digest = digest;
621            try {
622                Blob blob = bp.readBlob(blobInfo);
623                blobs.add(blob);
624            } catch (IOException e) {
625                // ignore, the blob was removed from the blob provider
626                // maybe by a concurrent GC from this transient store
627                // or from the blob provider itself (if it's incorrectly shared)
628                log.debug("Failed to read blob: " + digest + " in blob provider: " + blobProviderId
629                        + " for transient store: " + name);
630            }
631        }
632        return blobs;
633    }
634
635    // used by GC
636    protected List<String> getBlobKeys(String key) {
637        KeyValueStore kvs = getKeyValueStore();
638        String info = kvs.getString(key + DOT_BLOBINFO);
639        if (info == null) {
640            return Collections.emptyList();
641        }
642        Map<String, String> blobInfoMap = jsonToMap(info);
643        String countStr = blobInfoMap.get(COUNT);
644        if (countStr == null) {
645            return Collections.emptyList();
646        }
647        int count = Integer.parseInt(countStr);
648        List<String> blobKeys = new ArrayList<>(count);
649        for (int i = 0; i < count; i++) {
650            String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i);
651            if (blobMapJson == null) {
652                // corrupted entry, bail out
653                break;
654            }
655            Map<String, String> blobMap = jsonToMap(blobMapJson);
656            String blobKey = blobMap.get(KEY);
657            if (blobKey == null) {
658                // corrupted entry, bail out
659                break;
660            }
661            blobKeys.add(blobKey);
662        }
663        return blobKeys;
664    }
665
666    protected void acquireBlobLockOrThrow(String key, KeyValueStore kvs) {
667        if (tryAcquireBlobLock(key, kvs)) {
668            return;
669        }
670        throw new NuxeoException("Failed to acquire blob lock for: " + key);
671    }
672
673    protected boolean tryAcquireBlobLock(String key, KeyValueStore kvs) {
674        return acquireLock(() -> tryAcquireOnceBlobLock(key, kvs));
675    }
676
677    protected boolean tryAcquireOnceBlobLock(String key, KeyValueStore kvs) {
678        return kvs.compareAndSet(key + DOT_BLOBLOCK, null, "true", BLOB_LOCK_TTL);
679    }
680
681    protected void releaseBlobLock(String key, KeyValueStore kvs) {
682        kvs.put(key + DOT_BLOBLOCK, (String) null);
683    }
684
685    protected boolean acquireLock(BooleanSupplier tryAcquireOnce) {
686        long start = System.nanoTime();
687        long sleep = 1; // ms
688        long elapsed;
689        while ((elapsed = System.nanoTime() - start) < LOCK_ACQUIRE_TIME_NANOS) {
690            if (tryAcquireOnce.getAsBoolean()) {
691                return true;
692            }
693            try {
694                Thread.sleep(sleep);
695                if (elapsed > LOCK_EXPONENTIAL_BACKOFF_AFTER_NANOS) {
696                    sleep *= 2;
697                }
698            } catch (InterruptedException e) {
699                Thread.currentThread().interrupt();
700                throw new RuntimeException(e);
701            }
702        }
703        return false;
704    }
705
706    @Override
707    public long getSize(String key) {
708        KeyValueStore kvs = getKeyValueStore();
709        String json = kvs.getString(key + DOT_BLOBINFO);
710        Map<String, String> map = jsonToMap(json);
711        String size;
712        if (map == null || (size = map.get(SIZE)) == null) {
713            return -1;
714        }
715        return Long.parseLong(size);
716    }
717
718    @Override
719    public boolean isCompleted(String key) {
720        KeyValueStore kvs = getKeyValueStore();
721        String completed = kvs.getString(key + DOT_COMPLETED);
722        return Boolean.parseBoolean(completed);
723    }
724
725    @Override
726    public void setCompleted(String key, boolean completed) {
727        KeyValueStore kvs = getKeyValueStore();
728        kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl);
729    }
730
731    /** @deprecated since 11.1 */
732    @Deprecated
733    protected void removeCompleted(String key) {
734        KeyValueStore kvs = getKeyValueStore();
735        removeCompleted(key, kvs);
736    }
737
738    protected void removeCompleted(String key, KeyValueStore kvs) {
739        kvs.put(key + DOT_COMPLETED, (String) null);
740    }
741
742    @Override
743    public void release(String key) {
744        if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) {
745            // do the costly computation of the exact storage size if needed
746            doGC();
747            if (getStorageSize() > targetMaxSize) {
748                remove(key);
749                return;
750            }
751        }
752        setReleaseTTL(key);
753    }
754
755    // set TTL on all keys for this entry
756    protected void setReleaseTTL(String key) {
757        KeyValueStore kvs = getKeyValueStore();
758        kvs.setTTL(key + DOT_COMPLETED, releaseTTL);
759        String json = kvs.getString(key + DOT_PARAMINFO);
760        List<String> parameters = jsonToList(json);
761        if (parameters != null) {
762            parameters.stream().forEach(parameter -> {
763                String k = key + DOT_PARAM_DOT + parameter;
764                kvs.setTTL(k, releaseTTL);
765                kvs.setTTL(k + FORMAT, releaseTTL);
766            });
767        }
768        kvs.setTTL(key + DOT_PARAMINFO, releaseTTL);
769        json = kvs.getString(key + DOT_BLOBINFO);
770        Map<String, String> map = jsonToMap(json);
771        if (map != null) {
772            String countStr = map.get(COUNT);
773            int count = countStr == null ? 0 : Integer.parseInt(countStr);
774            for (int i = 0; i < count; i++) {
775                kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL);
776            }
777        }
778        kvs.setTTL(key + DOT_BLOBINFO, releaseTTL);
779    }
780
781    @Override
782    public void remove(String key) {
783        KeyValueStore kvs = getKeyValueStore();
784        removeBlobs(key, kvs);
785        removeParameters(key, kvs);
786        removeCompleted(key, kvs);
787    }
788
789}