001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.transientstore.keyvalueblob; 020 021import static java.util.function.Function.identity; 022 023import java.io.IOException; 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.function.Function; 032import java.util.stream.Collectors; 033import java.util.stream.Stream; 034 035import org.apache.commons.lang3.SerializationUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.api.Blob; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.blob.BlobInfo; 041import org.nuxeo.ecm.core.blob.BlobManager; 042import org.nuxeo.ecm.core.blob.BlobProvider; 043import org.nuxeo.ecm.core.blob.ManagedBlob; 044import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector; 045import org.nuxeo.ecm.core.transientstore.api.MaximumTransientSpaceExceeded; 046import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 047import org.nuxeo.ecm.core.transientstore.api.TransientStoreProvider; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.kv.KeyValueService; 050import org.nuxeo.runtime.kv.KeyValueStore; 051import org.nuxeo.runtime.kv.KeyValueStoreProvider; 052 053import com.fasterxml.jackson.core.type.TypeReference; 054import com.fasterxml.jackson.databind.ObjectMapper; 055 056/** 057 * Transient Store storing properties in a Key/Value store, and storing blobs using a Blob Provider. 058 * <p> 059 * The key/value store used is the one with the same name as the transient store itself. 060 * <p> 061 * The blob provider used is the one with the same name as the transient store itself. 062 * <p> 063 * The storage format is the following: 064 * 065 * <pre> 066 * __blobsize__: storage size; because entries may expire without us being notified due to their TTL, 067 * this may be higher than the actual storage size 068 * 069 * entryKey.completed: "true" if completed, "false" if not; presence of this key marks entry existence 070 * 071 * entryKey.paraminfo: ["foo", "bar"] 072 * 073 * entryKey.param.foo: value for param foo 074 * entryKey.param.foo__format: "java" for java serializable format, otherwise string 075 * 076 * entryKey.param.bar: value for param bar 077 * etc. 078 * 079 * entryKey.blobinfo: {"count": number of blobs, 080 * "size": storage size of the blobs} 081 * entryKey.blob.0: {"key": key in blob provider for first blob, 082 * "mimetype": MIME Type, 083 * "encoding": encoding, 084 * "filename": filename, 085 * "digest": digest} 086 * entryKey.blob.1: {...} same for second blob 087 * etc. 088 * </pre> 089 * 090 * @since 9.3 091 */ 092public class KeyValueBlobTransientStore implements TransientStoreProvider { 093 094 private static final Log log = LogFactory.getLog(KeyValueBlobTransientStore.class); 095 096 public static final String SEP = "."; 097 098 public static final String STORAGE_SIZE = "__blobsize__"; 099 100 public static final String DOT_COMPLETED = SEP + "completed"; 101 102 public static final String DOT_PARAMINFO = SEP + "paraminfo"; 103 104 public static final String DOT_PARAM_DOT = SEP + "param" + SEP; 105 106 public static final String FORMAT = "__format"; 107 108 public static final String FORMAT_JAVA = "java"; 109 110 public static final String DOT_BLOBINFO = SEP + "blobinfo"; 111 112 public static final String COUNT = "count"; 113 114 public static final String SIZE = "size"; 115 116 public static final String DOT_BLOB_DOT = SEP + "blob" + SEP; 117 118 public static final String KEY = "key"; 119 120 public static final String MIMETYPE = "mimetype"; 121 122 public static final String ENCODING = "encoding"; 123 124 public static final String FILENAME = "filename"; 125 126 public static final String LENGTH = "length"; 127 128 public static final String DIGEST = "digest"; 129 130 protected String name; 131 132 /** Basic TTL for all entries. */ 133 protected int ttl; 134 135 /** TTL used to keep objects around a bit longer if there's space for them, for caching. */ 136 protected int releaseTTL; 137 138 protected long targetMaxSize; 139 140 protected long absoluteMaxSize; 141 142 protected ObjectMapper mapper; 143 144 // ---------- TransientStoreProvider ---------- 145 146 @Override 147 public void init(TransientStoreConfig config) { 148 name = config.getName(); 149 mapper = new ObjectMapper(); 150 ttl = config.getFirstLevelTTL() * 60; 151 releaseTTL = config.getSecondLevelTTL() * 60; 152 targetMaxSize = config.getTargetMaxSizeMB() * 1024 * 1024; 153 absoluteMaxSize = config.getAbsoluteMaxSizeMB() * 1024 * 1024; 154 } 155 156 protected KeyValueStore getKeyValueStore() { 157 return Framework.getService(KeyValueService.class).getKeyValueStore(name); 158 } 159 160 protected BlobProvider getBlobProvider() { 161 return Framework.getService(BlobManager.class).getBlobProvider(name); 162 } 163 164 @Override 165 public void shutdown() { 166 } 167 168 @Override 169 public Stream<String> keyStream() { 170 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 171 int len = DOT_COMPLETED.length(); 172 return kvs.keyStream() // 173 .filter(key -> key.endsWith(DOT_COMPLETED)) 174 .map(key -> key.substring(0, key.length() - len)); 175 } 176 177 @Override 178 public long getStorageSize() { 179 KeyValueStore kvs = getKeyValueStore(); 180 String sizeStr = kvs.getString(STORAGE_SIZE); 181 return sizeStr == null ? 0 : Long.parseLong(sizeStr); 182 } 183 184 protected void addStorageSize(long delta) { 185 atomicUpdate(STORAGE_SIZE, size -> { 186 long s = size == null ? 0 : Long.parseLong(size); 187 return String.valueOf(s + delta); 188 }, 0); 189 } 190 191 /** 192 * Computes an exact value for the current storage size (sum of all blobs size). THIS METHOD IS COSTLY. 193 * <p> 194 * Does not take into account blob de-duplication that may be done by the blob provider. 195 * <p> 196 * Does not take into account blobs that still exist in the blob provider but are not referenced anymore (due to TTL 197 * expiration or GC not having been done). 198 */ 199 protected void computeStorageSize() { 200 KeyValueStore kvs = getKeyValueStore(); 201 long size = keyStream().map(this::getBlobs) // 202 .flatMap(Collection::stream) 203 .mapToLong(Blob::getLength) 204 .sum(); 205 kvs.put(STORAGE_SIZE, String.valueOf(size)); 206 } 207 208 // also recomputes the exact storage size 209 @Override 210 public void doGC() { 211 BlobProvider bp = getBlobProvider(); 212 BinaryGarbageCollector gc = bp.getBinaryManager().getGarbageCollector(); 213 gc.start(); 214 keyStream().map(this::getBlobs) // 215 .flatMap(Collection::stream) 216 .map(ManagedBlob.class::cast) 217 .map(ManagedBlob::getKey) 218 .forEach(gc::mark); 219 gc.stop(true); // delete 220 computeStorageSize(); 221 } 222 223 @Override 224 public void removeAll() { 225 KeyValueStoreProvider kvs = (KeyValueStoreProvider) getKeyValueStore(); 226 kvs.clear(); 227 doGC(); 228 } 229 230 // ---------- TransientStore ---------- 231 232 protected static final TypeReference<List<String>> LIST_STRING = new TypeReference<List<String>>() { 233 }; 234 235 protected static final TypeReference<Map<String, String>> MAP_STRING_STRING = new TypeReference<Map<String, String>>() { 236 }; 237 238 protected List<String> jsonToList(String json) { 239 if (json == null) { 240 return null; 241 } 242 try { 243 return mapper.readValue(json, LIST_STRING); 244 } catch (IOException e) { 245 log.error("Invalid JSON array: " + json); 246 return null; 247 } 248 } 249 250 protected Map<String, String> jsonToMap(String json) { 251 if (json == null) { 252 return null; 253 } 254 try { 255 return mapper.readValue(json, MAP_STRING_STRING); 256 } catch (IOException e) { 257 log.error("Invalid JSON object: " + json); 258 return null; 259 } 260 } 261 262 protected String toJson(Object object) { 263 try { 264 return mapper.writeValueAsString(object); 265 } catch (IOException e) { 266 throw new NuxeoException(e); 267 } 268 } 269 270 public void atomicUpdate(String key, Function<String, String> updateFunction, long ttl) { 271 KeyValueStore kvs = getKeyValueStore(); 272 for (;;) { 273 String oldValue = kvs.getString(key); 274 String newValue = updateFunction.apply(oldValue); 275 if (kvs.compareAndSet(key, oldValue, newValue, ttl)) { 276 break; 277 } 278 } 279 } 280 281 @Override 282 public boolean exists(String key) { 283 KeyValueStore kvs = getKeyValueStore(); 284 return kvs.getString(key + DOT_COMPLETED) != null; 285 } 286 287 protected void markEntryExists(String key) { 288 KeyValueStore kvs = getKeyValueStore(); 289 kvs.compareAndSet(key + DOT_COMPLETED, null, "false", ttl); 290 } 291 292 @Override 293 public void putParameter(String key, String parameter, Serializable value) { 294 KeyValueStore kvs = getKeyValueStore(); 295 String k = key + DOT_PARAM_DOT + parameter; 296 if (value instanceof String) { 297 kvs.put(k, (String) value, ttl); 298 kvs.put(k + FORMAT, (String) null); 299 } else { 300 byte[] bytes = SerializationUtils.serialize(value); 301 kvs.put(k, bytes, ttl); 302 kvs.put(k + FORMAT, FORMAT_JAVA, ttl); 303 } 304 // atomically add key to param info 305 atomicUpdate(key + DOT_PARAMINFO, json -> { 306 List<String> parameters = jsonToList(json); 307 if (parameters == null) { 308 parameters = new ArrayList<>(); 309 } 310 if (!parameters.contains(parameter)) { 311 parameters.add(parameter); 312 } 313 return toJson(parameters); 314 }, ttl); 315 markEntryExists(key); 316 } 317 318 @Override 319 public Serializable getParameter(String key, String parameter) { 320 KeyValueStore kvs = getKeyValueStore(); 321 String k = key + DOT_PARAM_DOT + parameter; 322 String format = kvs.getString(k + FORMAT); 323 if (format == null) { 324 return kvs.getString(k); 325 } else { 326 byte[] bytes = kvs.get(k); 327 return SerializationUtils.deserialize(bytes); 328 } 329 } 330 331 @Override 332 public void putParameters(String key, Map<String, Serializable> parameters) { 333 parameters.forEach((param, value) -> putParameter(key, param, value)); 334 } 335 336 @Override 337 public Map<String, Serializable> getParameters(String key) { 338 KeyValueStore kvs = getKeyValueStore(); 339 // get the list of keys 340 String json = kvs.getString(key + DOT_PARAMINFO); 341 List<String> parameters = jsonToList(json); 342 if (parameters == null) { 343 // if the entry doesn't exist at all return null, otherwise empty 344 if (kvs.getString(key + DOT_COMPLETED) == null) { 345 return null; 346 } else { 347 return Collections.emptyMap(); 348 } 349 } 350 // get values 351 return parameters.stream().collect(Collectors.toMap(identity(), p -> getParameter(key, p))); 352 } 353 354 protected void removeParameters(String key) { 355 KeyValueStore kvs = getKeyValueStore(); 356 String json = kvs.getString(key + DOT_PARAMINFO); 357 List<String> parameters = jsonToList(json); 358 if (parameters != null) { 359 for (String parameter : parameters) { 360 String k = key + DOT_PARAM_DOT + parameter; 361 kvs.put(k, (String) null); 362 kvs.put(k + FORMAT, (String) null); 363 } 364 } 365 kvs.put(key + DOT_PARAMINFO, (String) null); 366 } 367 368 @Override 369 public void putBlobs(String key, List<Blob> blobs) { 370 if (absoluteMaxSize > 0 && getStorageSize() > absoluteMaxSize) { 371 // do the costly computation of the exact storage size if needed 372 doGC(); 373 if (getStorageSize() > absoluteMaxSize) { 374 throw new MaximumTransientSpaceExceeded(); 375 } 376 } 377 378 // remove previous blobs 379 removeBlobs(key); 380 381 KeyValueStore kvs = getKeyValueStore(); 382 BlobProvider bp = getBlobProvider(); 383 long totalSize = 0; 384 int i = 0; 385 for (Blob blob : blobs) { 386 long size = blob.getLength(); 387 if (size >= 0) { 388 totalSize += size; 389 } 390 // store blob 391 String blobKey; 392 try { 393 blobKey = bp.writeBlob(blob); 394 } catch (IOException e) { 395 throw new NuxeoException(e); 396 } 397 // write blob data 398 Map<String, String> blobMap = new HashMap<>(); 399 blobMap.put(KEY, blobKey); 400 blobMap.put(MIMETYPE, blob.getMimeType()); 401 blobMap.put(ENCODING, blob.getEncoding()); 402 blobMap.put(FILENAME, blob.getFilename()); 403 blobMap.put(LENGTH, String.valueOf(size)); 404 blobMap.put(DIGEST, blob.getDigest()); 405 kvs.put(key + DOT_BLOB_DOT + i, toJson(blobMap), ttl); 406 i++; 407 } 408 Map<String, String> blobInfoMap = new HashMap<>(); 409 blobInfoMap.put(COUNT, String.valueOf(blobs.size())); 410 blobInfoMap.put(SIZE, String.valueOf(totalSize)); 411 kvs.put(key + DOT_BLOBINFO, toJson(blobInfoMap), ttl); 412 addStorageSize(totalSize); 413 markEntryExists(key); 414 } 415 416 protected void removeBlobs(String key) { 417 KeyValueStore kvs = getKeyValueStore(); 418 String json = kvs.getString(key + DOT_BLOBINFO); 419 Map<String, String> map = jsonToMap(json); 420 if (map == null) { 421 return; 422 } 423 String countStr = map.get(COUNT); 424 int count = countStr == null ? 0 : Integer.parseInt(countStr); 425 String sizeStr = map.get(SIZE); 426 long size = sizeStr == null ? 0 : Long.parseLong(sizeStr); 427 428 // remove blobs 429 for (int i = 0; i < count; i++) { 430 kvs.put(key + DOT_BLOB_DOT + i, (String) null); 431 } 432 kvs.put(key + DOT_BLOBINFO, (String) null); 433 // fix storage size 434 addStorageSize(-size); 435 } 436 437 @Override 438 public List<Blob> getBlobs(String key) { 439 KeyValueStore kvs = getKeyValueStore(); 440 BlobProvider bp = getBlobProvider(); 441 String info = kvs.getString(key + DOT_BLOBINFO); 442 if (info == null) { 443 // if the entry doesn't exist at all return null, otherwise empty 444 if (kvs.getString(key + DOT_COMPLETED) == null) { 445 return null; 446 } else { 447 return Collections.emptyList(); 448 } 449 } 450 Map<String, String> blobInfoMap = jsonToMap(info); 451 String countStr = blobInfoMap.get(COUNT); 452 if (countStr == null) { 453 return Collections.emptyList(); 454 } 455 int count = Integer.parseInt(countStr); 456 List<Blob> blobs = new ArrayList<>(); 457 for (int i = 0; i < count; i++) { 458 String blobMapJson = kvs.getString(key + DOT_BLOB_DOT + i); 459 if (blobMapJson == null) { 460 // corrupted entry, bail out 461 break; 462 } 463 Map<String, String> blobMap = jsonToMap(blobMapJson); 464 String blobKey = blobMap.get(KEY); 465 if (blobKey == null) { 466 // corrupted entry, bail out 467 break; 468 } 469 String mimeType = blobMap.get(MIMETYPE); 470 String encoding = blobMap.get(ENCODING); 471 String filename = blobMap.get(FILENAME); 472 String lengthStr = blobMap.get(LENGTH); 473 Long length = lengthStr == null ? null : Long.valueOf(lengthStr); 474 String digest = blobMap.get(DIGEST); 475 BlobInfo blobInfo = new BlobInfo(); 476 blobInfo.key = blobKey; 477 blobInfo.mimeType = mimeType; 478 blobInfo.encoding = encoding; 479 blobInfo.filename = filename; 480 blobInfo.length = length; 481 blobInfo.digest = digest; 482 try { 483 Blob blob = bp.readBlob(blobInfo); 484 blobs.add(blob); 485 } catch (IOException e) { 486 throw new NuxeoException(e); 487 } 488 } 489 return blobs; 490 } 491 492 @Override 493 public long getSize(String key) { 494 KeyValueStore kvs = getKeyValueStore(); 495 String json = kvs.getString(key + DOT_BLOBINFO); 496 Map<String, String> map = jsonToMap(json); 497 String size; 498 if (map == null || (size = map.get(SIZE)) == null) { 499 return -1; 500 } 501 return Long.parseLong(size); 502 } 503 504 @Override 505 public boolean isCompleted(String key) { 506 KeyValueStore kvs = getKeyValueStore(); 507 String completed = kvs.getString(key + DOT_COMPLETED); 508 return Boolean.parseBoolean(completed); 509 } 510 511 @Override 512 public void setCompleted(String key, boolean completed) { 513 KeyValueStore kvs = getKeyValueStore(); 514 kvs.put(key + DOT_COMPLETED, String.valueOf(completed), ttl); 515 } 516 517 protected void removeCompleted(String key) { 518 KeyValueStore kvs = getKeyValueStore(); 519 kvs.put(key + DOT_COMPLETED, (String) null); 520 } 521 522 @Override 523 public void release(String key) { 524 if (targetMaxSize > 0 && getStorageSize() > targetMaxSize) { 525 // do the costly computation of the exact storage size if needed 526 doGC(); 527 if (getStorageSize() > targetMaxSize) { 528 remove(key); 529 return; 530 } 531 } 532 setReleaseTTL(key); 533 } 534 535 // set TTL on all keys for this entry 536 protected void setReleaseTTL(String key) { 537 KeyValueStore kvs = getKeyValueStore(); 538 kvs.setTTL(key + DOT_COMPLETED, releaseTTL); 539 String json = kvs.getString(key + DOT_PARAMINFO); 540 List<String> parameters = jsonToList(json); 541 if (parameters != null) { 542 parameters.stream().forEach(parameter -> { 543 String k = key + DOT_PARAM_DOT + parameter; 544 kvs.setTTL(k, releaseTTL); 545 kvs.setTTL(k + FORMAT, releaseTTL); 546 }); 547 } 548 kvs.setTTL(key + DOT_PARAMINFO, releaseTTL); 549 json = kvs.getString(key + DOT_BLOBINFO); 550 Map<String, String> map = jsonToMap(json); 551 if (map != null) { 552 String countStr = map.get(COUNT); 553 int count = countStr == null ? 0 : Integer.parseInt(countStr); 554 for (int i = 0; i < count; i++) { 555 kvs.setTTL(key + DOT_BLOB_DOT + i, releaseTTL); 556 } 557 } 558 kvs.setTTL(key + DOT_BLOBINFO, releaseTTL); 559 } 560 561 @Override 562 public void remove(String key) { 563 removeBlobs(key); 564 removeParameters(key); 565 removeCompleted(key); 566 } 567 568}