001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.transientstore.keyvalueblob; 020 021import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 022 023import java.io.ByteArrayInputStream; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectInputStream; 027import java.io.Serializable; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.function.Function; 035import java.util.stream.Stream; 036 037import org.apache.commons.lang3.SerializationUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.Blob; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.blob.BlobInfo; 043import org.nuxeo.ecm.core.blob.BlobManager; 044import org.nuxeo.ecm.core.blob.BlobManagerComponent; 045import org.nuxeo.ecm.core.blob.BlobProvider; 046import org.nuxeo.ecm.core.blob.ManagedBlob; 047import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 048import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded; 049import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 050import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider; 051import org.nuxeo.runtime.api.Framework; 052import org.nuxeo.runtime.kv.KeyValueService; 053import org.nuxeo.runtime.kv.KeyValueStore; 054import org.nuxeo.runtime.kv.KeyValueStoreProvider; 055 056import com.fasterxml.jackson.core.type.TypeReference; 057import com.fasterxml.jackson.databind.ObjectMapper; 058 059/** 060 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider. 061 * <p> 062 * The key/value store used is the one with the same name as the transient store itself. 063 * <p> 064 * The blob provider used is the one with the same name as the transient store itself. 065 * <p> 066 * The storage format is the following: 067 * 068 * <pre> 069 * __blobsize__: storage size; because entries may expire without us being notified due to their TTL, 070 * this may be higher than the actual storage size 071 * 072 * entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence 073 * 074 * entryKey.paraminfo: ["foo", "bar"] 075 * 076 * entryKey.param.foo: value for param foo 077 * entryKey.param.foo__format: "java" for java serializable format, otherwise string 078 * 079 * entryKey.param.bar: value for param bar 080 * etc. 081 * 082 * entryKey.blobinfo: {"count": number of blobs, 083 * "size": storage size of the blobs} 084 * entryKey.blob.0: {"key": key in blob provider for first blob, 085 * "mimetype": MIME Type, 086 * "encoding": encoding, 087 * "filename": filename, 088 * "digest": digest} 089 * entryKey.blob.1: {...} same for second blob 090 * etc. 091 * </pre> 092 * 093 * @since 9.3 094 */ 095public class KeyValueBlobTransientStore implements TransientStoreProvider { 096 097 private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class); 098 099 public static final String SEP = "."; 100 101 public static final String STORAGE_SIZE = "__blobsize__"; 102 103 public static final String DOT_COMPLETED = SEP + "completed"; 104 105 public static final String DOT_PARAMINFO = SEP + "paraminfo"; 106 107 public static final String DOT_PARAM_DOT = SEP + "param" + SEP; 108 109 public static final String FORMAT = "__format"; 110 111 public static final String FORMAT_JAVA = "java"; 112 113 public static final String DOT_BLOBINFO = SEP + "blobinfo"; 114 115 public static final String COUNT = "count"; 116 117 public static final String SIZE = "size"; 118 119 public static final String DOT_BLOB_DOT = SEP + "blob" + SEP; 120 121 public static final String KEY = "key"; 122 123 public static final String MIMETYPE = "mimetype"; 124 125 public static final String ENCODING = "encoding"; 126 127 public static final String FILENAME = "filename"; 128 129 public static final String LENGTH = "length"; 130 131 public static final String DIGEST = "digest"; 132 133 public static final String CONFIG_KEY_VALUE_STORE = "keyValueStore"; 134 135 public static final String CONFIG_BLOB_PROVIDER = "blobProvider"; 136 137 protected String keyValueStoreName; 138 139 protected String blobProviderId; 140 141 /** Basic TTL for all entries. */ 142 protected int ttl; 143 144 /** TTL used to keep objects around a bit longer if there's space for them, for caching. */ 145 protected int releaseTTL; 146 147 protected long targetMaxSize; 148 149 protected long absoluteMaxSize; 150 151 protected ObjectMapper mapper; 152 153 // ---------- TransientStoreProvider ---------- 154 155 @Override 156 public void init(TransientStoreConfig config) { 157 String defaultName = config.getName(); 158 if (!defaultName.startsWith(BlobManagerComponent.TRANSIENT_ID_PREFIX)) { 159 defaultName = BlobManagerComponent.TRANSIENT_ID_PREFIX + "_" + defaultName; 160 } 161 Map<String, String> properties = config.getProperties(); 162 if (properties == null) { 163 properties = Collections.emptyMap(); 164 } 165 keyValueStoreName = defaultIfBlank(properties.get(CONFIG_KEY_VALUE_STORE), defaultName); 166 blobProviderId = defaultIfBlank(properties.get(CONFIG_BLOB_PROVIDER), defaultName); 167 mapper = new ObjectMapper(); 168 ttl = config.getFirstLevelTTL() * 60; 169 releaseTTL = config.getSecondLevelTTL() * 60; 170 targetMaxSize = config.getTargetMaxSizeMB() * 1024L * 1024; 171 absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024L * 1024; 172 } 173 174 protected KeyValueStore getKeyValueStore() { 175 return Framework.getService(KeyValueService.class).getKeyValueStore(keyValueStoreName); 176 } 177 178 protected BlobProvider getBlobProvider() { 179 BlobProvider blobProvider = Framework.getService(BlobManager.class) 180 .getBlobProviderWithNamespace(blobProviderId); 181 if (blobProvider == null) { 182 throw new NuxeoException("No blob provider with id: " + blobProviderId); 183 } 184 if (!blobProvider.isTransient()) { 185 throw new NuxeoException("Blob provider: " + blobProviderId + " used for Key/Value store: " 186 + keyValueStoreName + " must be configured as transient"); 187 } 188 return blobProvider; 189 } 190 191 @Override 192 public void shutdown() { 193 // nothing to do 194 } 195 196 @Override 197 public Stream<String> keyStream() { 198 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 199 int len = DOT_COMPLETED.length(); 200 return kvs.keyStream() // 201 .filter(key -> key.endsWith(DOT_COMPLETED)) 202 .map(key -> key.substring(0, key.length() - len)); 203 } 204 205 @Override 206 public long getStorageSize() { 207 KeyValueStore kvs = getKeyValueStore(); 208 String sizeStr = kvs.getString(STORAGE_SIZE); 209 return sizeStr == null ? 0 : Long.parseLong(sizeStr); 210 } 211 212 protected void addStorageSize(long delta) { 213 atomicUpdate(STORAGE_SIZE, size -> { 214 long s = size == null ? 0 : Long.parseLong(size); 215 return String.valueOf(s + delta); 216 }, 0); 217 } 218 219 /** 220 * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY. 221 * <p> 222 * Does not take into account blob de-duplication that may be done by the blob provider. 223 * <p> 224 * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL 225 * expiration or GC not having been done). 226 */ 227 protected void computeStorageSize() { 228 KeyValueStore kvs = getKeyValueStore(); 229 long size = keyStream().map(this::getBlobs) // 230 .flatMap(Collection::stream) 231 .mapToLong(Blob::getLength) 232 .sum(); 233 kvs.put(STORAGE_SIZE, String.valueOf(size)); 234 } 235 236 // also recomputes the exact storage size 237 @Override 238 public void doGC() { 239 BlobProvider bp = getBlobProvider(); 240 BinaryGarbageCollector gc = bp.getBinaryManager().getGarbageCollector(); 241 gc.start(); 242 keyStream().map(this::getBlobs) // 243 .flatMap(Collection::stream) 244 .map(ManagedBlob.class::cast) 245 .map(ManagedBlob::getKey) 246 .forEach(gc::mark); 247 gc.stop(true); // delete 248 computeStorageSize(); 249 } 250 251 @Override 252 public void removeAll() { 253 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 254 kvs.clear(); 255 doGC(); 256 } 257 258 // ---------- TransientStore ---------- 259 260 protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() { 261 }; 262 263 protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() { 264 }; 265 266 protected List<String> jsonToList(String json) { 267 if (json == null) { 268 return null; 269 } 270 try { 271 return mapper.readValue(json, LIST_STRING); 272 } catch (IOException e) { 273 log.error("Invalid JSON array: " + json); 274 return null; 275 } 276 } 277 278 protected Map<String, String> jsonToMap(String json) { 279 if (json == null) { 280 return null; 281 } 282 try { 283 return mapper.readValue(json, MAP_STRING_STRING); 284 } catch (IOException e) { 285 log.error("Invalid JSON object: " + json); 286 return null; 287 } 288 } 289 290 protected String toJson(Object object) { 291 try { 292 return mapper.writeValueAsString(object); 293 } catch (IOException e) { 294 throw new NuxeoException(e); 295 } 296 } 297 298 public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) { 299 KeyValueStore kvs = getKeyValueStore(); 300 for (;;) { 301 String oldValue = kvs.getString(key); 302 String newValue = updateFunction.apply(oldValue); 303 if (kvs.compareAndSet(key, oldValue, newValue, ttl)) { 304 break; 305 } 306 } 307 } 308 309 @Override 310 public boolean exists(String key) { 311 KeyValueStore kvs = getKeyValueStore(); 312 return kvs.getString(key + DOT_COMPLETED) != null; 313 } 314 315 protected void markEntryExists(String key) { 316 KeyValueStore kvs = getKeyValueStore(); 317 kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl); 318 } 319 320 @Override 321 public void putParameter(String key, String parameter, Serializable value) { 322 KeyValueStore kvs = getKeyValueStore(); 323 String k = key + DOT_PARAM_DOT + parameter; 324 if (value instanceof String) { 325 kvs.put(k, (String) value, ttl); 326 kvs.put(k + FORMAT, (String) null); 327 } else { 328 byte[] bytes = SerializationUtils.serialize(value); 329 kvs.put(k, bytes, ttl); 330 kvs.put(k + FORMAT, FORMAT_JAVA, ttl); 331 } 332 // atomically add key to param info 333 atomicUpdate(key + DOT_PARAMINFO, json -> { 334 List<String> parameters = jsonToList(json); 335 if (parameters == null) { 336 parameters = new ArrayList<>(); 337 } 338 if (!parameters.contains(parameter)) { 339 parameters.add(parameter); 340 } 341 return toJson(parameters); 342 }, ttl); 343 markEntryExists(key); 344 } 345 346 @Override 347 public Serializable getParameter(String key, String parameter) { 348 KeyValueStore kvs = getKeyValueStore(); 349 String k = key + DOT_PARAM_DOT + parameter; 350 String format = kvs.getString(k + FORMAT); 351 if (format == null) { 352 return kvs.getString(k); 353 } else { 354 byte[] bytes = kvs.get(k); 355 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); 356 ObjectInput in = new ObjectInputStream(bis)) { 357 return (Serializable) in.readObject(); 358 } catch (IOException | ClassNotFoundException e) { 359 throw new NuxeoException(e); 360 } 361 } 362 } 363 364 @Override 365 public void putParameters(String key, Map<String, Serializable> parameters) { 366 parameters.forEach((param, value) -> putParameter(key, param, value)); 367 } 368 369 @Override 370 public Map<String, Serializable> getParameters(String key) { 371 KeyValueStore kvs = getKeyValueStore(); 372 // get the list of keys 373 String json = kvs.getString(key + DOT_PARAMINFO); 374 List<String> parameters = jsonToList(json); 375 if (parameters == null) { 376 // if the entry doesn't exist at all return null, otherwise empty 377 if (kvs.getString(key + DOT_COMPLETED) == null) { 378 return null; 379 } else { 380 return Collections.emptyMap(); 381 } 382 } 383 // get values 384 Map<String, Serializable> map = new HashMap<>(); 385 for (String p : parameters) { 386 Serializable value = getParameter(key, p); 387 if (value != null) { 388 map.put(p, value); 389 } 390 } 391 return map; 392 } 393 394 protected void removeParameters(String key) { 395 KeyValueStore kvs = getKeyValueStore(); 396 String json = kvs.getString(key + DOT_PARAMINFO); 397 List<String> parameters = jsonToList(json); 398 if (parameters != null) { 399 for (String parameter : parameters) { 400 String k = key + DOT_PARAM_DOT + parameter; 401 kvs.put(k, (String) null); 402 kvs.put(k + FORMAT, (String) null); 403 } 404 } 405 kvs.put(key + DOT_PARAMINFO, (String) null); 406 } 407 408 @Override 409 public void putBlobs(String key, List<Blob> blobs) { 410 if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) { 411 // do the costly computation of the exact storage size if needed 412 doGC(); 413 if (getStorageSize() > absoluteMaxSize) { 414 throw new MaximumTransientSpaceExceeded(); 415 } 416 } 417 418 // remove previous blobs 419 removeBlobs(key); 420 421 KeyValueStore kvs = getKeyValueStore(); 422 BlobProvider bp = getBlobProvider(); 423 long totalSize = 0; 424 int i = 0; 425 for (Blob blob : blobs) { 426 long size = blob.getLength(); 427 if (size >= 0) { 428 totalSize += size; 429 } 430 // store blob 431 String blobKey; 432 try { 433 blobKey = bp.writeBlob(blob); 434 } catch (IOException e) { 435 throw new NuxeoException(e); 436 } 437 // write blob data 438 Map<String, String> blobMap = new HashMap<>(); 439 blobMap.put(KEY, blobKey); 440 blobMap.put(MIMETYPE, blob.getMimeType()); 441 blobMap.put(ENCODING, blob.getEncoding()); 442 blobMap.put(FILENAME, blob.getFilename()); 443 blobMap.put(LENGTH, String.valueOf(size)); 444 blobMap.put(DIGEST, blob.getDigest()); 445 kvs.put(key + DOT_BLOB_DOT + i, toJson(blobMap), ttl); 446 i++; 447 } 448 Map<String, String> blobInfoMap = new HashMap<>(); 449 blobInfoMap.put(COUNT, String.valueOf(blobs.size())); 450 blobInfoMap.put(SIZE, String.valueOf(totalSize)); 451 kvs.put(key + DOT_BLOBINFO, toJson(blobInfoMap), ttl); 452 addStorageSize(totalSize); 453 markEntryExists(key); 454 } 455 456 protected void removeBlobs(String key) { 457 KeyValueStore kvs = getKeyValueStore(); 458 String json = kvs.getString(key + DOT_BLOBINFO); 459 Map<String, String> map = jsonToMap(json); 460 if (map == null) { 461 return; 462 } 463 String countStr = map.get(COUNT); 464 int count = countStr == null ? 0 : Integer.parseInt(countStr); 465 String sizeStr = map.get(SIZE); 466 long size = sizeStr == null ? 0 : Long.parseLong(sizeStr); 467 468 // remove blobs 469 for (int i = 0; i < count; i++) { 470 kvs.put(key + DOT_BLOB_DOT + i, (String) null); 471 } 472 kvs.put(key + DOT_BLOBINFO, (String) null); 473 // fix storage size 474 addStorageSize(-size); 475 } 476 477 @Override 478 public List<Blob> getBlobs(String key) { 479 KeyValueStore kvs = getKeyValueStore(); 480 BlobProvider bp = getBlobProvider(); 481 String info = kvs.getString(key + DOT_BLOBINFO); 482 if (info == null) { 483 // if the entry doesn't exist at all return null, otherwise empty 484 if (kvs.getString(key + DOT_COMPLETED) == null) { 485 return null; 486 } else { 487 return Collections.emptyList(); 488 } 489 } 490 Map<String, String> blobInfoMap = jsonToMap(info); 491 String countStr = blobInfoMap.get(COUNT); 492 if (countStr == null) { 493 return Collections.emptyList(); 494 } 495 int count = Integer.parseInt(countStr); 496 List<Blob> blobs = new ArrayList<>(); 497 for (int i = 0; i < count; i++) { 498 String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i); 499 if (blobMapJson == null) { 500 // corrupted entry, bail out 501 break; 502 } 503 Map<String, String> blobMap = jsonToMap(blobMapJson); 504 String blobKey = blobMap.get(KEY); 505 if (blobKey == null) { 506 // corrupted entry, bail out 507 break; 508 } 509 String mimeType = blobMap.get(MIMETYPE); 510 String encoding = blobMap.get(ENCODING); 511 String filename = blobMap.get(FILENAME); 512 String lengthStr = blobMap.get(LENGTH); 513 Long length = lengthStr == null ? null : Long.valueOf(lengthStr); 514 String digest = blobMap.get(DIGEST); 515 BlobInfo blobInfo = new BlobInfo(); 516 blobInfo.key = blobKey; 517 blobInfo.mimeType = mimeType; 518 blobInfo.encoding = encoding; 519 blobInfo.filename = filename; 520 blobInfo.length = length; 521 blobInfo.digest = digest; 522 try { 523 Blob blob = bp.readBlob(blobInfo); 524 blobs.add(blob); 525 } catch (IOException e) { 526 throw new NuxeoException(e); 527 } 528 } 529 return blobs; 530 } 531 532 @Override 533 public long getSize(String key) { 534 KeyValueStore kvs = getKeyValueStore(); 535 String json = kvs.getString(key + DOT_BLOBINFO); 536 Map<String, String> map = jsonToMap(json); 537 String size; 538 if (map == null || (size = map.get(SIZE)) == null) { 539 return -1; 540 } 541 return Long.parseLong(size); 542 } 543 544 @Override 545 public boolean isCompleted(String key) { 546 KeyValueStore kvs = getKeyValueStore(); 547 String completed = kvs.getString(key + DOT_COMPLETED); 548 return Boolean.parseBoolean(completed); 549 } 550 551 @Override 552 public void setCompleted(String key, boolean completed) { 553 KeyValueStore kvs = getKeyValueStore(); 554 kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl); 555 } 556 557 protected void removeCompleted(String key) { 558 KeyValueStore kvs = getKeyValueStore(); 559 kvs.put(key + DOT_COMPLETED, (String) null); 560 } 561 562 @Override 563 public void release(String key) { 564 if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) { 565 // do the costly computation of the exact storage size if needed 566 doGC(); 567 if (getStorageSize() > targetMaxSize) { 568 remove(key); 569 return; 570 } 571 } 572 setReleaseTTL(key); 573 } 574 575 // set TTL on all keys for this entry 576 protected void setReleaseTTL(String key) { 577 KeyValueStore kvs = getKeyValueStore(); 578 kvs.setTTL(key + DOT_COMPLETED, releaseTTL); 579 String json = kvs.getString(key + DOT_PARAMINFO); 580 List<String> parameters = jsonToList(json); 581 if (parameters != null) { 582 parameters.stream().forEach(parameter -> { 583 String k = key + DOT_PARAM_DOT + parameter; 584 kvs.setTTL(k, releaseTTL); 585 kvs.setTTL(k + FORMAT, releaseTTL); 586 }); 587 } 588 kvs.setTTL(key + DOT_PARAMINFO, releaseTTL); 589 json = kvs.getString(key + DOT_BLOBINFO); 590 Map<String, String> map = jsonToMap(json); 591 if (map != null) { 592 String countStr = map.get(COUNT); 593 int count = countStr == null ? 0 : Integer.parseInt(countStr); 594 for (int i = 0; i < count; i++) { 595 kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL); 596 } 597 } 598 kvs.setTTL(key + DOT_BLOBINFO, releaseTTL); 599 } 600 601 @Override 602 public void remove(String key) { 603 removeBlobs(key); 604 removeParameters(key); 605 removeCompleted(key); 606 } 607 608}