001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.transientstore.keyvalueblob; 020 021import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 022 023import java.io.ByteArrayInputStream; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectInputStream; 027import java.io.Serializable; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.TimeUnit; 035import java.util.function.BooleanSupplier; 036import java.util.function.Function; 037import java.util.stream.Stream; 038 039import org.apache.commons.lang3.SerializationUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.ecm.core.api.Blob; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.blob.BlobInfo; 045import org.nuxeo.ecm.core.blob.BlobManager; 046import org.nuxeo.ecm.core.blob.BlobManagerComponent; 047import org.nuxeo.ecm.core.blob.BlobProvider; 048import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 049import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded; 050import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 051import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider; 052import org.nuxeo.runtime.api.Framework; 053import org.nuxeo.runtime.kv.KeyValueService; 054import org.nuxeo.runtime.kv.KeyValueStore; 055import org.nuxeo.runtime.kv.KeyValueStoreProvider; 056 057import com.fasterxml.jackson.core.type.TypeReference; 058import com.fasterxml.jackson.databind.ObjectMapper; 059 060/** 061 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider. 062 * <p> 063 * This transient store is configured with the following properties: 064 * <ul> 065 * <li><em>keyValueStore</em>: the name of the key/value store to use. If not provided, it defaults to "transient_" + 066 * the transient store name. 067 * <li><em>blobProvider</em>: the name of the blob provider to use. If not provided, it defaults to "transient_" + the 068 * transient store name. 069 * <li><em>defaultBlobProvider</em>: if the configured or defaulted blob provider doesn't exist, a namespaced copy of 070 * this one will be used instead. The default is "default". 071 * </ul> 072 * <p> 073 * The storage format is the following: 074 * 075 * <pre> 076 * __blobsize__: storage size; because entries may expire without us being notified due to their TTL, 077 * this may be higher than the actual storage size 078 * 079 * entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence 080 * 081 * entryKey.paraminfo: ["foo", "bar"] 082 * 083 * entryKey.param.foo: value for param foo 084 * entryKey.param.foo__format: "java" for java serializable format, otherwise string 085 * 086 * entryKey.param.bar: value for param bar 087 * etc. 088 * 089 * entryKey.bloblock: "true" if there is a blob read/write in progress, null otherwise 090 * entryKey.blobinfo: {"count": number of blobs, 091 * "size": storage size of the blobs} 092 * entryKey.blob.0: {"key": key in blob provider for first blob, 093 * "mimetype": MIME Type, 094 * "encoding": encoding, 095 * "filename": filename, 096 * "digest": digest} 097 * entryKey.blob.1: {...} same for second blob 098 * etc. 099 * </pre> 100 * 101 * @since 9.3 102 */ 103public class KeyValueBlobTransientStore implements TransientStoreProvider { 104 105 private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class); 106 107 public static final String SEP = "."; 108 109 public static final String STORAGE_SIZE = "__blobsize__"; 110 111 public static final String DOT_COMPLETED = SEP + "completed"; 112 113 public static final String DOT_PARAMINFO = SEP + "paraminfo"; 114 115 public static final String DOT_PARAM_DOT = SEP + "param" + SEP; 116 117 public static final String FORMAT = "__format"; 118 119 public static final String FORMAT_JAVA = "java"; 120 121 /** @since 11.1 */ 122 public static final String DOT_BLOBLOCK = SEP + "bloblock"; 123 124 public static final String DOT_BLOBINFO = SEP + "blobinfo"; 125 126 public static final String COUNT = "count"; 127 128 public static final String SIZE = "size"; 129 130 public static final String DOT_BLOB_DOT = SEP + "blob" + SEP; 131 132 public static final String KEY = "key"; 133 134 public static final String MIMETYPE = "mimetype"; 135 136 public static final String ENCODING = "encoding"; 137 138 public static final String FILENAME = "filename"; 139 140 public static final String LENGTH = "length"; 141 142 public static final String DIGEST = "digest"; 143 144 public static final String CONFIG_KEY_VALUE_STORE = "keyValueStore"; 145 146 public static final String CONFIG_BLOB_PROVIDER = "blobProvider"; 147 148 /** @since 11.1 */ 149 public static final String CONFIG_DEFAULT_BLOB_PROVIDER = "defaultBlobProvider"; 150 151 /** @since 11.1 */ 152 public static final String CONFIG_DEFAULT_BLOB_PROVIDER_DEFAULT = "default"; 153 154 /** @since 11.1 */ 155 protected static final int BLOB_LOCK_TTL = 60; // don't keep any lock longer than 60s 156 157 /** @since 11.1 */ 158 protected static final long LOCK_ACQUIRE_TIME_NANOS = TimeUnit.SECONDS.toNanos(5); 159 160 /** @since 11.1 */ 161 protected static final long LOCK_EXPONENTIAL_BACKOFF_AFTER_NANOS = TimeUnit.MILLISECONDS.toNanos(100); 162 163 protected String name; 164 165 protected String keyValueStoreName; 166 167 protected String blobProviderId; 168 169 protected String defaultBlobProviderId; 170 171 /** Basic TTL for all entries. */ 172 protected int ttl; 173 174 /** TTL used to keep objects around a bit longer if there's space for them, for caching. */ 175 protected int releaseTTL; 176 177 protected long targetMaxSize; 178 179 protected long absoluteMaxSize; 180 181 protected ObjectMapper mapper; 182 183 // ---------- TransientStoreProvider ---------- 184 185 @Override 186 public void init(TransientStoreConfig config) { 187 name = config.getName(); 188 String defaultName = name; 189 if (!defaultName.startsWith(BlobManagerComponent.TRANSIENT_ID_PREFIX)) { 190 defaultName = BlobManagerComponent.TRANSIENT_ID_PREFIX + "_" + defaultName; 191 } 192 Map<String, String> properties = config.getProperties(); 193 if (properties == null) { 194 properties = Collections.emptyMap(); 195 } 196 keyValueStoreName = defaultIfBlank(properties.get(CONFIG_KEY_VALUE_STORE), defaultName); 197 blobProviderId = defaultIfBlank(properties.get(CONFIG_BLOB_PROVIDER), defaultName); 198 defaultBlobProviderId = defaultIfBlank(properties.get(CONFIG_DEFAULT_BLOB_PROVIDER), 199 CONFIG_DEFAULT_BLOB_PROVIDER_DEFAULT); 200 mapper = new ObjectMapper(); 201 ttl = config.getFirstLevelTTL() * 60; 202 releaseTTL = config.getSecondLevelTTL() * 60; 203 targetMaxSize = config.getTargetMaxSizeMB() * 1024L * 1024; 204 absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024L * 1024; 205 } 206 207 protected KeyValueStore getKeyValueStore() { 208 return Framework.getService(KeyValueService.class).getKeyValueStore(keyValueStoreName); 209 } 210 211 protected BlobProvider getBlobProvider() { 212 BlobProvider blobProvider = Framework.getService(BlobManager.class) 213 .getBlobProviderWithNamespace(blobProviderId, defaultBlobProviderId); 214 if (blobProvider == null) { 215 throw new NuxeoException("No blob provider with id: " + blobProviderId); 216 } 217 if (!blobProvider.isTransient()) { 218 throw new NuxeoException("Blob provider: " + blobProviderId + " used for Key/Value store: " 219 + keyValueStoreName + " must be configured as transient"); 220 } 221 return blobProvider; 222 } 223 224 @Override 225 public void shutdown() { 226 // nothing to do 227 } 228 229 @Override 230 public Stream<String> keyStream() { 231 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 232 int len = DOT_COMPLETED.length(); 233 return kvs.keyStream() // 234 .filter(key -> key.endsWith(DOT_COMPLETED)) 235 .map(key -> key.substring(0, key.length() - len)); 236 } 237 238 @Override 239 public long getStorageSize() { 240 KeyValueStore kvs = getKeyValueStore(); 241 String sizeStr = kvs.getString(STORAGE_SIZE); 242 return sizeStr == null ? 0 : Long.parseLong(sizeStr); 243 } 244 245 /** @deprecated since 11.1 */ 246 @Deprecated 247 protected void addStorageSize(long delta) { 248 KeyValueStore kvs = getKeyValueStore(); 249 addStorageSize(delta, kvs); 250 } 251 252 protected void addStorageSize(long delta, KeyValueStore kvs) { 253 atomicUpdate(STORAGE_SIZE, size -> { 254 long s = size == null ? 0 : Long.parseLong(size); 255 return String.valueOf(s + delta); 256 }, 0, kvs); 257 } 258 259 /** 260 * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY. 261 * <p> 262 * Does not take into account blob de-duplication that may be done by the blob provider. 263 * <p> 264 * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL 265 * expiration or GC not having been done). 266 */ 267 protected void computeStorageSize() { 268 KeyValueStore kvs = getKeyValueStore(); 269 long size = keyStream().map(this::getBlobs) // 270 .flatMap(Collection::stream) 271 .mapToLong(Blob::getLength) 272 .sum(); 273 kvs.put(STORAGE_SIZE, String.valueOf(size)); 274 } 275 276 // also recomputes the exact storage size 277 @Override 278 public void doGC() { 279 BlobProvider bp = getBlobProvider(); 280 BinaryGarbageCollector gc = bp.getBinaryGarbageCollector(); 281 boolean delete = false; 282 gc.start(); 283 try { 284 keyStream().map(this::getBlobKeys) // 285 .flatMap(Collection::stream) 286 .forEach(gc::mark); 287 delete = true; 288 } finally { 289 // don't delete if there's an exception, but still stop the GC 290 gc.stop(delete); 291 } 292 computeStorageSize(); 293 } 294 295 @Override 296 public void removeAll() { 297 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 298 kvs.clear(); 299 doGC(); 300 } 301 302 // ---------- TransientStore ---------- 303 304 protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() { 305 }; 306 307 protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() { 308 }; 309 310 protected List<String> jsonToList(String json) { 311 if (json == null) { 312 return null; 313 } 314 try { 315 return mapper.readValue(json, LIST_STRING); 316 } catch (IOException e) { 317 log.error("Invalid JSON array: " + json); 318 return null; 319 } 320 } 321 322 protected Map<String, String> jsonToMap(String json) { 323 if (json == null) { 324 return null; 325 } 326 try { 327 return mapper.readValue(json, MAP_STRING_STRING); 328 } catch (IOException e) { 329 log.error("Invalid JSON object: " + json); 330 return null; 331 } 332 } 333 334 protected String toJson(Object object) { 335 try { 336 return mapper.writeValueAsString(object); 337 } catch (IOException e) { 338 throw new NuxeoException(e); 339 } 340 } 341 342 /** @deprecated since 11.1 */ 343 @Deprecated 344 public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) { 345 KeyValueStore kvs = getKeyValueStore(); 346 atomicUpdate(key, updateFunction, ttl, kvs); 347 } 348 349 protected void atomicUpdate(String key, Function<String, String> updateFunction, long ttl, KeyValueStore kvs) { 350 for (;;) { 351 String oldValue = kvs.getString(key); 352 String newValue = updateFunction.apply(oldValue); 353 if (kvs.compareAndSet(key, oldValue, newValue, ttl)) { 354 break; 355 } 356 } 357 } 358 359 @Override 360 public boolean exists(String key) { 361 KeyValueStore kvs = getKeyValueStore(); 362 return kvs.getString(key + DOT_COMPLETED) != null; 363 } 364 365 /** @deprecated since 11.1 */ 366 @Deprecated 367 protected void markEntryExists(String key) { 368 KeyValueStore kvs = getKeyValueStore(); 369 markEntryExists(key, kvs); 370 } 371 372 protected void markEntryExists(String key, KeyValueStore kvs) { 373 kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl); 374 } 375 376 @Override 377 public void putParameter(String key, String parameter, Serializable value) { 378 KeyValueStore kvs = getKeyValueStore(); 379 String k = key + DOT_PARAM_DOT + parameter; 380 if (value instanceof String) { 381 kvs.put(k, (String) value, ttl); 382 kvs.put(k + FORMAT, (String) null); 383 } else { 384 byte[] bytes = SerializationUtils.serialize(value); 385 kvs.put(k, bytes, ttl); 386 kvs.put(k + FORMAT, FORMAT_JAVA, ttl); 387 } 388 // atomically add key to param info 389 atomicUpdate(key + DOT_PARAMINFO, json -> { 390 List<String> parameters = jsonToList(json); 391 if (parameters == null) { 392 parameters = new ArrayList<>(); 393 } 394 if (!parameters.contains(parameter)) { 395 parameters.add(parameter); 396 } 397 return toJson(parameters); 398 }, ttl, kvs); 399 markEntryExists(key, kvs); 400 } 401 402 @Override 403 public Serializable getParameter(String key, String parameter) { 404 KeyValueStore kvs = getKeyValueStore(); 405 String k = key + DOT_PARAM_DOT + parameter; 406 String format = kvs.getString(k + FORMAT); 407 if (format == null) { 408 return kvs.getString(k); 409 } else { 410 byte[] bytes = kvs.get(k); 411 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); 412 ObjectInput in = new ObjectInputStream(bis)) { 413 return (Serializable) in.readObject(); 414 } catch (IOException | ClassNotFoundException e) { 415 throw new NuxeoException(e); 416 } 417 } 418 } 419 420 @Override 421 public void putParameters(String key, Map<String, Serializable> parameters) { 422 parameters.forEach((param, value) -> putParameter(key, param, value)); 423 } 424 425 @Override 426 public Map<String, Serializable> getParameters(String key) { 427 KeyValueStore kvs = getKeyValueStore(); 428 // get the list of keys 429 String json = kvs.getString(key + DOT_PARAMINFO); 430 List<String> parameters = jsonToList(json); 431 if (parameters == null) { 432 // if the entry doesn't exist at all return null, otherwise empty 433 if (kvs.getString(key + DOT_COMPLETED) == null) { 434 return null; 435 } else { 436 return Collections.emptyMap(); 437 } 438 } 439 // get values 440 Map<String, Serializable> map = new HashMap<>(); 441 for (String p : parameters) { 442 Serializable value = getParameter(key, p); 443 if (value != null) { 444 map.put(p, value); 445 } 446 } 447 return map; 448 } 449 450 /** @deprecated since 11.1 */ 451 @Deprecated 452 protected void removeParameters(String key) { 453 KeyValueStore kvs = getKeyValueStore(); 454 removeParameters(key, kvs); 455 } 456 457 protected void removeParameters(String key, KeyValueStore kvs) { 458 String json = kvs.getString(key + DOT_PARAMINFO); 459 List<String> parameters = jsonToList(json); 460 if (parameters != null) { 461 for (String parameter : parameters) { 462 String k = key + DOT_PARAM_DOT + parameter; 463 kvs.put(k, (String) null); 464 kvs.put(k + FORMAT, (String) null); 465 } 466 } 467 kvs.put(key + DOT_PARAMINFO, (String) null); 468 } 469 470 @Override 471 public void putBlobs(String key, List<Blob> blobs) { 472 if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) { 473 // do the costly computation of the exact storage size if needed 474 doGC(); 475 if (getStorageSize() > absoluteMaxSize) { 476 throw new MaximumTransientSpaceExceeded(); 477 } 478 } 479 480 // first, outside the lock 481 // store the blobs, and compute the total size and the blob maps 482 BlobProvider bp = getBlobProvider(); 483 long totalSize = 0; 484 List<String> blobMapJsons = new ArrayList<>(); 485 for (Blob blob : blobs) { 486 long size = blob.getLength(); 487 if (size >= 0) { 488 totalSize += size; 489 } 490 // store blob 491 String blobKey; 492 try { 493 blobKey = bp.writeBlob(blob); 494 } catch (IOException e) { 495 throw new NuxeoException(e); 496 } 497 // compute blob data 498 Map<String, String> blobMap = new HashMap<>(); 499 blobMap.put(KEY, blobKey); 500 blobMap.put(MIMETYPE, blob.getMimeType()); 501 blobMap.put(ENCODING, blob.getEncoding()); 502 blobMap.put(FILENAME, blob.getFilename()); 503 blobMap.put(LENGTH, String.valueOf(size)); 504 blobMap.put(DIGEST, blob.getDigest()); 505 String blobMapJson = toJson(blobMap); 506 blobMapJsons.add(blobMapJson); 507 } 508 Map<String, String> blobInfoMap = new HashMap<>(); 509 blobInfoMap.put(COUNT, String.valueOf(blobs.size())); 510 blobInfoMap.put(SIZE, String.valueOf(totalSize)); 511 String blobInfoMapJson = toJson(blobInfoMap); 512 513 // acquire a lock while writing 514 KeyValueStore kvs = getKeyValueStore(); 515 acquireBlobLockOrThrow(key, kvs); 516 try { 517 // remove previous blobs 518 removeBlobs(key, kvs); 519 // write new blobs maps 520 int i = 0; 521 for (String blobMapJson : blobMapJsons) { 522 kvs.put(key + DOT_BLOB_DOT + i, blobMapJson, ttl); 523 i++; 524 } 525 // write blob info 526 kvs.put(key + DOT_BLOBINFO, blobInfoMapJson, ttl); 527 addStorageSize(totalSize, kvs); 528 markEntryExists(key, kvs); 529 } finally { 530 releaseBlobLock(key, kvs); 531 } 532 } 533 534 /** @deprecated since 11.1 */ 535 @Deprecated 536 protected void removeBlobs(String key) { 537 KeyValueStore kvs = getKeyValueStore(); 538 removeBlobs(key, kvs); 539 } 540 541 protected void removeBlobs(String key, KeyValueStore kvs) { 542 String json = kvs.getString(key + DOT_BLOBINFO); 543 Map<String, String> map = jsonToMap(json); 544 if (map == null) { 545 return; 546 } 547 String countStr = map.get(COUNT); 548 int count = countStr == null ? 0 : Integer.parseInt(countStr); 549 String sizeStr = map.get(SIZE); 550 long size = sizeStr == null ? 0 : Long.parseLong(sizeStr); 551 552 // remove blobs 553 for (int i = 0; i < count; i++) { 554 kvs.put(key + DOT_BLOB_DOT + i, (String) null); 555 } 556 kvs.put(key + DOT_BLOBINFO, (String) null); 557 // fix storage size 558 addStorageSize(-size, kvs); 559 } 560 561 @Override 562 public List<Blob> getBlobs(String key) { 563 KeyValueStore kvs = getKeyValueStore(); 564 BlobProvider bp = getBlobProvider(); 565 List<String> blobMapJsons = new ArrayList<>(); 566 567 // try to acquire a lock but still proceed without the lock (best effort) 568 boolean lockAcquired = tryAcquireBlobLock(key, kvs); 569 try { 570 String info = kvs.getString(key + DOT_BLOBINFO); 571 if (info == null) { 572 // if the entry doesn't exist at all return null, otherwise empty 573 if (kvs.getString(key + DOT_COMPLETED) == null) { 574 return null; 575 } else { 576 return Collections.emptyList(); 577 } 578 } 579 Map<String, String> blobInfoMap = jsonToMap(info); 580 String countStr = blobInfoMap.get(COUNT); 581 if (countStr == null) { 582 return Collections.emptyList(); 583 } 584 int count = Integer.parseInt(countStr); 585 for (int i = 0; i < count; i++) { 586 String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i); 587 blobMapJsons.add(blobMapJson); 588 } 589 } finally { 590 if (lockAcquired) { 591 releaseBlobLock(key, kvs); 592 } 593 } 594 595 // compute blobs from read blob maps 596 List<Blob> blobs = new ArrayList<>(); 597 for (String blobMapJson : blobMapJsons) { 598 if (blobMapJson == null) { 599 // corrupted entry, bail out 600 break; 601 } 602 Map<String, String> blobMap = jsonToMap(blobMapJson); 603 String blobKey = blobMap.get(KEY); 604 if (blobKey == null) { 605 // corrupted entry, bail out 606 break; 607 } 608 String mimeType = blobMap.get(MIMETYPE); 609 String encoding = blobMap.get(ENCODING); 610 String filename = blobMap.get(FILENAME); 611 String lengthStr = blobMap.get(LENGTH); 612 Long length = lengthStr == null ? null : Long.valueOf(lengthStr); 613 String digest = blobMap.get(DIGEST); 614 BlobInfo blobInfo = new BlobInfo(); 615 blobInfo.key = blobKey; 616 blobInfo.mimeType = mimeType; 617 blobInfo.encoding = encoding; 618 blobInfo.filename = filename; 619 blobInfo.length = length; 620 blobInfo.digest = digest; 621 try { 622 Blob blob = bp.readBlob(blobInfo); 623 blobs.add(blob); 624 } catch (IOException e) { 625 // ignore, the blob was removed from the blob provider 626 // maybe by a concurrent GC from this transient store 627 // or from the blob provider itself (if it's incorrectly shared) 628 log.debug("Failed to read blob: " + digest + " in blob provider: " + blobProviderId 629 + " for transient store: " + name); 630 } 631 } 632 return blobs; 633 } 634 635 // used by GC 636 protected List<String> getBlobKeys(String key) { 637 KeyValueStore kvs = getKeyValueStore(); 638 String info = kvs.getString(key + DOT_BLOBINFO); 639 if (info == null) { 640 return Collections.emptyList(); 641 } 642 Map<String, String> blobInfoMap = jsonToMap(info); 643 String countStr = blobInfoMap.get(COUNT); 644 if (countStr == null) { 645 return Collections.emptyList(); 646 } 647 int count = Integer.parseInt(countStr); 648 List<String> blobKeys = new ArrayList<>(count); 649 for (int i = 0; i < count; i++) { 650 String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i); 651 if (blobMapJson == null) { 652 // corrupted entry, bail out 653 break; 654 } 655 Map<String, String> blobMap = jsonToMap(blobMapJson); 656 String blobKey = blobMap.get(KEY); 657 if (blobKey == null) { 658 // corrupted entry, bail out 659 break; 660 } 661 blobKeys.add(blobKey); 662 } 663 return blobKeys; 664 } 665 666 protected void acquireBlobLockOrThrow(String key, KeyValueStore kvs) { 667 if (tryAcquireBlobLock(key, kvs)) { 668 return; 669 } 670 throw new NuxeoException("Failed to acquire blob lock for: " + key); 671 } 672 673 protected boolean tryAcquireBlobLock(String key, KeyValueStore kvs) { 674 return acquireLock(() -> tryAcquireOnceBlobLock(key, kvs)); 675 } 676 677 protected boolean tryAcquireOnceBlobLock(String key, KeyValueStore kvs) { 678 return kvs.compareAndSet(key + DOT_BLOBLOCK, null, "true", BLOB_LOCK_TTL); 679 } 680 681 protected void releaseBlobLock(String key, KeyValueStore kvs) { 682 kvs.put(key + DOT_BLOBLOCK, (String) null); 683 } 684 685 protected boolean acquireLock(BooleanSupplier tryAcquireOnce) { 686 long start = System.nanoTime(); 687 long sleep = 1; // ms 688 long elapsed; 689 while ((elapsed = System.nanoTime() - start) < LOCK_ACQUIRE_TIME_NANOS) { 690 if (tryAcquireOnce.getAsBoolean()) { 691 return true; 692 } 693 try { 694 Thread.sleep(sleep); 695 if (elapsed > LOCK_EXPONENTIAL_BACKOFF_AFTER_NANOS) { 696 sleep *= 2; 697 } 698 } catch (InterruptedException e) { 699 Thread.currentThread().interrupt(); 700 throw new RuntimeException(e); 701 } 702 } 703 return false; 704 } 705 706 @Override 707 public long getSize(String key) { 708 KeyValueStore kvs = getKeyValueStore(); 709 String json = kvs.getString(key + DOT_BLOBINFO); 710 Map<String, String> map = jsonToMap(json); 711 String size; 712 if (map == null || (size = map.get(SIZE)) == null) { 713 return -1; 714 } 715 return Long.parseLong(size); 716 } 717 718 @Override 719 public boolean isCompleted(String key) { 720 KeyValueStore kvs = getKeyValueStore(); 721 String completed = kvs.getString(key + DOT_COMPLETED); 722 return Boolean.parseBoolean(completed); 723 } 724 725 @Override 726 public void setCompleted(String key, boolean completed) { 727 KeyValueStore kvs = getKeyValueStore(); 728 kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl); 729 } 730 731 /** @deprecated since 11.1 */ 732 @Deprecated 733 protected void removeCompleted(String key) { 734 KeyValueStore kvs = getKeyValueStore(); 735 removeCompleted(key, kvs); 736 } 737 738 protected void removeCompleted(String key, KeyValueStore kvs) { 739 kvs.put(key + DOT_COMPLETED, (String) null); 740 } 741 742 @Override 743 public void release(String key) { 744 if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) { 745 // do the costly computation of the exact storage size if needed 746 doGC(); 747 if (getStorageSize() > targetMaxSize) { 748 remove(key); 749 return; 750 } 751 } 752 setReleaseTTL(key); 753 } 754 755 // set TTL on all keys for this entry 756 protected void setReleaseTTL(String key) { 757 KeyValueStore kvs = getKeyValueStore(); 758 kvs.setTTL(key + DOT_COMPLETED, releaseTTL); 759 String json = kvs.getString(key + DOT_PARAMINFO); 760 List<String> parameters = jsonToList(json); 761 if (parameters != null) { 762 parameters.stream().forEach(parameter -> { 763 String k = key + DOT_PARAM_DOT + parameter; 764 kvs.setTTL(k, releaseTTL); 765 kvs.setTTL(k + FORMAT, releaseTTL); 766 }); 767 } 768 kvs.setTTL(key + DOT_PARAMINFO, releaseTTL); 769 json = kvs.getString(key + DOT_BLOBINFO); 770 Map<String, String> map = jsonToMap(json); 771 if (map != null) { 772 String countStr = map.get(COUNT); 773 int count = countStr == null ? 0 : Integer.parseInt(countStr); 774 for (int i = 0; i < count; i++) { 775 kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL); 776 } 777 } 778 kvs.setTTL(key + DOT_BLOBINFO, releaseTTL); 779 } 780 781 @Override 782 public void remove(String key) { 783 KeyValueStore kvs = getKeyValueStore(); 784 removeBlobs(key, kvs); 785 removeParameters(key, kvs); 786 removeCompleted(key, kvs); 787 } 788 789}