001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.transientstore.keyvalueblob; 020 021import static java.util.function.Function.identity; 022import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 023 024import java.io.ByteArrayInputStream; 025import java.io.IOException; 026import java.io.ObjectInput; 027import java.io.ObjectInputStream; 028import java.io.Serializable; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.function.Function; 036import java.util.stream.Collectors; 037import java.util.stream.Stream; 038 039import org.apache.commons.lang3.SerializationUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.ecm.core.api.Blob; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.blob.BlobInfo; 045import org.nuxeo.ecm.core.blob.BlobManager; 046import org.nuxeo.ecm.core.blob.BlobProvider; 047import org.nuxeo.ecm.core.blob.ManagedBlob; 048import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 049import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded; 050import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 051import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider; 052import org.nuxeo.runtime.api.Framework; 053import org.nuxeo.runtime.kv.KeyValueService; 054import org.nuxeo.runtime.kv.KeyValueStore; 055import org.nuxeo.runtime.kv.KeyValueStoreProvider; 056 057import com.fasterxml.jackson.core.type.TypeReference; 058import com.fasterxml.jackson.databind.ObjectMapper; 059 060/** 061 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider. 062 * <p> 063 * The key/value store used is the one with the same name as the transient store itself. 064 * <p> 065 * The blob provider used is the one with the same name as the transient store itself. 066 * <p> 067 * The storage format is the following: 068 * 069 * <pre> 070 * __blobsize__: storage size; because entries may expire without us being notified due to their TTL, 071 * this may be higher than the actual storage size 072 * 073 * entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence 074 * 075 * entryKey.paraminfo: ["foo", "bar"] 076 * 077 * entryKey.param.foo: value for param foo 078 * entryKey.param.foo__format: "java" for java serializable format, otherwise string 079 * 080 * entryKey.param.bar: value for param bar 081 * etc. 082 * 083 * entryKey.blobinfo: {"count": number of blobs, 084 * "size": storage size of the blobs} 085 * entryKey.blob.0: {"key": key in blob provider for first blob, 086 * "mimetype": MIME Type, 087 * "encoding": encoding, 088 * "filename": filename, 089 * "digest": digest} 090 * entryKey.blob.1: {...} same for second blob 091 * etc. 092 * </pre> 093 * 094 * @since 9.3 095 */ 096public class KeyValueBlobTransientStore implements TransientStoreProvider { 097 098 private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class); 099 100 public static final String SEP = "."; 101 102 public static final String STORAGE_SIZE = "__blobsize__"; 103 104 public static final String DOT_COMPLETED = SEP + "completed"; 105 106 public static final String DOT_PARAMINFO = SEP + "paraminfo"; 107 108 public static final String DOT_PARAM_DOT = SEP + "param" + SEP; 109 110 public static final String FORMAT = "__format"; 111 112 public static final String FORMAT_JAVA = "java"; 113 114 public static final String DOT_BLOBINFO = SEP + "blobinfo"; 115 116 public static final String COUNT = "count"; 117 118 public static final String SIZE = "size"; 119 120 public static final String DOT_BLOB_DOT = SEP + "blob" + SEP; 121 122 public static final String KEY = "key"; 123 124 public static final String MIMETYPE = "mimetype"; 125 126 public static final String ENCODING = "encoding"; 127 128 public static final String FILENAME = "filename"; 129 130 public static final String LENGTH = "length"; 131 132 public static final String DIGEST = "digest"; 133 134 public static final String CONFIG_KEY_VALUE_STORE = "keyValueStore"; 135 136 public static final String CONFIG_BLOB_PROVIDER = "blobProvider"; 137 138 protected String keyValueStoreName; 139 140 protected String blobProviderId; 141 142 /** Basic TTL for all entries. */ 143 protected int ttl; 144 145 /** TTL used to keep objects around a bit longer if there's space for them, for caching. */ 146 protected int releaseTTL; 147 148 protected long targetMaxSize; 149 150 protected long absoluteMaxSize; 151 152 protected ObjectMapper mapper; 153 154 // ---------- TransientStoreProvider ---------- 155 156 @Override 157 public void init(TransientStoreConfig config) { 158 String name = config.getName(); 159 Map<String, String> properties = config.getProperties(); 160 if (properties == null) { 161 properties = Collections.emptyMap(); 162 } 163 keyValueStoreName = defaultIfBlank(properties.get(CONFIG_KEY_VALUE_STORE), name); 164 blobProviderId = defaultIfBlank(properties.get(CONFIG_BLOB_PROVIDER), name); 165 mapper = new ObjectMapper(); 166 ttl = config.getFirstLevelTTL() * 60; 167 releaseTTL = config.getSecondLevelTTL() * 60; 168 targetMaxSize = config.getTargetMaxSizeMB() * 1024L * 1024; 169 absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024L * 1024; 170 } 171 172 protected KeyValueStore getKeyValueStore() { 173 return Framework.getService(KeyValueService.class).getKeyValueStore(keyValueStoreName); 174 } 175 176 protected BlobProvider getBlobProvider() { 177 BlobProvider blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderId); 178 if (blobProvider == null) { 179 throw new NuxeoException("No blob provider with id: " + blobProviderId); 180 } 181 return blobProvider; 182 } 183 184 @Override 185 public void shutdown() { 186 // nothing to do 187 } 188 189 @Override 190 public Stream<String> keyStream() { 191 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 192 int len = DOT_COMPLETED.length(); 193 return kvs.keyStream() // 194 .filter(key -> key.endsWith(DOT_COMPLETED)) 195 .map(key -> key.substring(0, key.length() - len)); 196 } 197 198 @Override 199 public long getStorageSize() { 200 KeyValueStore kvs = getKeyValueStore(); 201 String sizeStr = kvs.getString(STORAGE_SIZE); 202 return sizeStr == null ? 0 : Long.parseLong(sizeStr); 203 } 204 205 protected void addStorageSize(long delta) { 206 atomicUpdate(STORAGE_SIZE, size -> { 207 long s = size == null ? 0 : Long.parseLong(size); 208 return String.valueOf(s + delta); 209 }, 0); 210 } 211 212 /** 213 * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY. 214 * <p> 215 * Does not take into account blob de-duplication that may be done by the blob provider. 216 * <p> 217 * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL 218 * expiration or GC not having been done). 219 */ 220 protected void computeStorageSize() { 221 KeyValueStore kvs = getKeyValueStore(); 222 long size = keyStream().map(this::getBlobs) // 223 .flatMap(Collection::stream) 224 .mapToLong(Blob::getLength) 225 .sum(); 226 kvs.put(STORAGE_SIZE, String.valueOf(size)); 227 } 228 229 // also recomputes the exact storage size 230 @Override 231 public void doGC() { 232 BlobProvider bp = getBlobProvider(); 233 BinaryGarbageCollector gc = bp.getBinaryManager().getGarbageCollector(); 234 gc.start(); 235 keyStream().map(this::getBlobs) // 236 .flatMap(Collection::stream) 237 .map(ManagedBlob.class::cast) 238 .map(ManagedBlob::getKey) 239 .forEach(gc::mark); 240 gc.stop(true); // delete 241 computeStorageSize(); 242 } 243 244 @Override 245 public void removeAll() { 246 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 247 kvs.clear(); 248 doGC(); 249 } 250 251 // ---------- TransientStore ---------- 252 253 protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() { 254 }; 255 256 protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() { 257 }; 258 259 protected List<String> jsonToList(String json) { 260 if (json == null) { 261 return null; 262 } 263 try { 264 return mapper.readValue(json, LIST_STRING); 265 } catch (IOException e) { 266 log.error("Invalid JSON array: " + json); 267 return null; 268 } 269 } 270 271 protected Map<String, String> jsonToMap(String json) { 272 if (json == null) { 273 return null; 274 } 275 try { 276 return mapper.readValue(json, MAP_STRING_STRING); 277 } catch (IOException e) { 278 log.error("Invalid JSON object: " + json); 279 return null; 280 } 281 } 282 283 protected String toJson(Object object) { 284 try { 285 return mapper.writeValueAsString(object); 286 } catch (IOException e) { 287 throw new NuxeoException(e); 288 } 289 } 290 291 public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) { 292 KeyValueStore kvs = getKeyValueStore(); 293 for (;;) { 294 String oldValue = kvs.getString(key); 295 String newValue = updateFunction.apply(oldValue); 296 if (kvs.compareAndSet(key, oldValue, newValue, ttl)) { 297 break; 298 } 299 } 300 } 301 302 @Override 303 public boolean exists(String key) { 304 KeyValueStore kvs = getKeyValueStore(); 305 return kvs.getString(key + DOT_COMPLETED) != null; 306 } 307 308 protected void markEntryExists(String key) { 309 KeyValueStore kvs = getKeyValueStore(); 310 kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl); 311 } 312 313 @Override 314 public void putParameter(String key, String parameter, Serializable value) { 315 KeyValueStore kvs = getKeyValueStore(); 316 String k = key + DOT_PARAM_DOT + parameter; 317 if (value instanceof String) { 318 kvs.put(k, (String) value, ttl); 319 kvs.put(k + FORMAT, (String) null); 320 } else { 321 byte[] bytes = SerializationUtils.serialize(value); 322 kvs.put(k, bytes, ttl); 323 kvs.put(k + FORMAT, FORMAT_JAVA, ttl); 324 } 325 // atomically add key to param info 326 atomicUpdate(key + DOT_PARAMINFO, json -> { 327 List<String> parameters = jsonToList(json); 328 if (parameters == null) { 329 parameters = new ArrayList<>(); 330 } 331 if (!parameters.contains(parameter)) { 332 parameters.add(parameter); 333 } 334 return toJson(parameters); 335 }, ttl); 336 markEntryExists(key); 337 } 338 339 @Override 340 public Serializable getParameter(String key, String parameter) { 341 KeyValueStore kvs = getKeyValueStore(); 342 String k = key + DOT_PARAM_DOT + parameter; 343 String format = kvs.getString(k + FORMAT); 344 if (format == null) { 345 return kvs.getString(k); 346 } else { 347 byte[] bytes = kvs.get(k); 348 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); 349 ObjectInput in = new ObjectInputStream(bis)) { 350 return (Serializable) in.readObject(); 351 } catch (IOException | ClassNotFoundException e) { 352 throw new NuxeoException(e); 353 } 354 } 355 } 356 357 @Override 358 public void putParameters(String key, Map<String, Serializable> parameters) { 359 parameters.forEach((param, value) -> putParameter(key, param, value)); 360 } 361 362 @Override 363 public Map<String, Serializable> getParameters(String key) { 364 KeyValueStore kvs = getKeyValueStore(); 365 // get the list of keys 366 String json = kvs.getString(key + DOT_PARAMINFO); 367 List<String> parameters = jsonToList(json); 368 if (parameters == null) { 369 // if the entry doesn't exist at all return null, otherwise empty 370 if (kvs.getString(key + DOT_COMPLETED) == null) { 371 return null; 372 } else { 373 return Collections.emptyMap(); 374 } 375 } 376 // get values 377 return parameters.stream().collect(Collectors.toMap(identity(), p -> getParameter(key, p))); 378 } 379 380 protected void removeParameters(String key) { 381 KeyValueStore kvs = getKeyValueStore(); 382 String json = kvs.getString(key + DOT_PARAMINFO); 383 List<String> parameters = jsonToList(json); 384 if (parameters != null) { 385 for (String parameter : parameters) { 386 String k = key + DOT_PARAM_DOT + parameter; 387 kvs.put(k, (String) null); 388 kvs.put(k + FORMAT, (String) null); 389 } 390 } 391 kvs.put(key + DOT_PARAMINFO, (String) null); 392 } 393 394 @Override 395 public void putBlobs(String key, List<Blob> blobs) { 396 if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) { 397 // do the costly computation of the exact storage size if needed 398 doGC(); 399 if (getStorageSize() > absoluteMaxSize) { 400 throw new MaximumTransientSpaceExceeded(); 401 } 402 } 403 404 // remove previous blobs 405 removeBlobs(key); 406 407 KeyValueStore kvs = getKeyValueStore(); 408 BlobProvider bp = getBlobProvider(); 409 long totalSize = 0; 410 int i = 0; 411 for (Blob blob : blobs) { 412 long size = blob.getLength(); 413 if (size >= 0) { 414 totalSize += size; 415 } 416 // store blob 417 String blobKey; 418 try { 419 blobKey = bp.writeBlob(blob); 420 } catch (IOException e) { 421 throw new NuxeoException(e); 422 } 423 // write blob data 424 Map<String, String> blobMap = new HashMap<>(); 425 blobMap.put(KEY, blobKey); 426 blobMap.put(MIMETYPE, blob.getMimeType()); 427 blobMap.put(ENCODING, blob.getEncoding()); 428 blobMap.put(FILENAME, blob.getFilename()); 429 blobMap.put(LENGTH, String.valueOf(size)); 430 blobMap.put(DIGEST, blob.getDigest()); 431 kvs.put(key + DOT_BLOB_DOT + i, toJson(blobMap), ttl); 432 i++; 433 } 434 Map<String, String> blobInfoMap = new HashMap<>(); 435 blobInfoMap.put(COUNT, String.valueOf(blobs.size())); 436 blobInfoMap.put(SIZE, String.valueOf(totalSize)); 437 kvs.put(key + DOT_BLOBINFO, toJson(blobInfoMap), ttl); 438 addStorageSize(totalSize); 439 markEntryExists(key); 440 } 441 442 protected void removeBlobs(String key) { 443 KeyValueStore kvs = getKeyValueStore(); 444 String json = kvs.getString(key + DOT_BLOBINFO); 445 Map<String, String> map = jsonToMap(json); 446 if (map == null) { 447 return; 448 } 449 String countStr = map.get(COUNT); 450 int count = countStr == null ? 0 : Integer.parseInt(countStr); 451 String sizeStr = map.get(SIZE); 452 long size = sizeStr == null ? 0 : Long.parseLong(sizeStr); 453 454 // remove blobs 455 for (int i = 0; i < count; i++) { 456 kvs.put(key + DOT_BLOB_DOT + i, (String) null); 457 } 458 kvs.put(key + DOT_BLOBINFO, (String) null); 459 // fix storage size 460 addStorageSize(-size); 461 } 462 463 @Override 464 public List<Blob> getBlobs(String key) { 465 KeyValueStore kvs = getKeyValueStore(); 466 BlobProvider bp = getBlobProvider(); 467 String info = kvs.getString(key + DOT_BLOBINFO); 468 if (info == null) { 469 // if the entry doesn't exist at all return null, otherwise empty 470 if (kvs.getString(key + DOT_COMPLETED) == null) { 471 return null; 472 } else { 473 return Collections.emptyList(); 474 } 475 } 476 Map<String, String> blobInfoMap = jsonToMap(info); 477 String countStr = blobInfoMap.get(COUNT); 478 if (countStr == null) { 479 return Collections.emptyList(); 480 } 481 int count = Integer.parseInt(countStr); 482 List<Blob> blobs = new ArrayList<>(); 483 for (int i = 0; i < count; i++) { 484 String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i); 485 if (blobMapJson == null) { 486 // corrupted entry, bail out 487 break; 488 } 489 Map<String, String> blobMap = jsonToMap(blobMapJson); 490 String blobKey = blobMap.get(KEY); 491 if (blobKey == null) { 492 // corrupted entry, bail out 493 break; 494 } 495 String mimeType = blobMap.get(MIMETYPE); 496 String encoding = blobMap.get(ENCODING); 497 String filename = blobMap.get(FILENAME); 498 String lengthStr = blobMap.get(LENGTH); 499 Long length = lengthStr == null ? null : Long.valueOf(lengthStr); 500 String digest = blobMap.get(DIGEST); 501 BlobInfo blobInfo = new BlobInfo(); 502 blobInfo.key = blobKey; 503 blobInfo.mimeType = mimeType; 504 blobInfo.encoding = encoding; 505 blobInfo.filename = filename; 506 blobInfo.length = length; 507 blobInfo.digest = digest; 508 try { 509 Blob blob = bp.readBlob(blobInfo); 510 blobs.add(blob); 511 } catch (IOException e) { 512 throw new NuxeoException(e); 513 } 514 } 515 return blobs; 516 } 517 518 @Override 519 public long getSize(String key) { 520 KeyValueStore kvs = getKeyValueStore(); 521 String json = kvs.getString(key + DOT_BLOBINFO); 522 Map<String, String> map = jsonToMap(json); 523 String size; 524 if (map == null || (size = map.get(SIZE)) == null) { 525 return -1; 526 } 527 return Long.parseLong(size); 528 } 529 530 @Override 531 public boolean isCompleted(String key) { 532 KeyValueStore kvs = getKeyValueStore(); 533 String completed = kvs.getString(key + DOT_COMPLETED); 534 return Boolean.parseBoolean(completed); 535 } 536 537 @Override 538 public void setCompleted(String key, boolean completed) { 539 KeyValueStore kvs = getKeyValueStore(); 540 kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl); 541 } 542 543 protected void removeCompleted(String key) { 544 KeyValueStore kvs = getKeyValueStore(); 545 kvs.put(key + DOT_COMPLETED, (String) null); 546 } 547 548 @Override 549 public void release(String key) { 550 if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) { 551 // do the costly computation of the exact storage size if needed 552 doGC(); 553 if (getStorageSize() > targetMaxSize) { 554 remove(key); 555 return; 556 } 557 } 558 setReleaseTTL(key); 559 } 560 561 // set TTL on all keys for this entry 562 protected void setReleaseTTL(String key) { 563 KeyValueStore kvs = getKeyValueStore(); 564 kvs.setTTL(key + DOT_COMPLETED, releaseTTL); 565 String json = kvs.getString(key + DOT_PARAMINFO); 566 List<String> parameters = jsonToList(json); 567 if (parameters != null) { 568 parameters.stream().forEach(parameter -> { 569 String k = key + DOT_PARAM_DOT + parameter; 570 kvs.setTTL(k, releaseTTL); 571 kvs.setTTL(k + FORMAT, releaseTTL); 572 }); 573 } 574 kvs.setTTL(key + DOT_PARAMINFO, releaseTTL); 575 json = kvs.getString(key + DOT_BLOBINFO); 576 Map<String, String> map = jsonToMap(json); 577 if (map != null) { 578 String countStr = map.get(COUNT); 579 int count = countStr == null ? 0 : Integer.parseInt(countStr); 580 for (int i = 0; i < count; i++) { 581 kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL); 582 } 583 } 584 kvs.setTTL(key + DOT_BLOBINFO, releaseTTL); 585 } 586 587 @Override 588 public void remove(String key) { 589 removeBlobs(key); 590 removeParameters(key); 591 removeCompleted(key); 592 } 593 594}