001/* 002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.redis.contribs; 022 023import static java.nio.charset.StandardCharsets.UTF_8; 024 025import java.io.ByteArrayInputStream; 026import java.io.ByteArrayOutputStream; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.ObjectInputStream; 030import java.io.ObjectOutputStream; 031import java.io.Serializable; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.function.Function; 038import java.util.regex.Matcher; 039import java.util.regex.Pattern; 040import java.util.stream.Stream; 041 042import org.apache.commons.lang3.StringUtils; 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.nuxeo.ecm.core.api.Blob; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.redis.RedisAdmin; 048import org.nuxeo.ecm.core.redis.RedisCallable; 049import org.nuxeo.ecm.core.redis.RedisExecutor; 050import org.nuxeo.ecm.core.transientstore.AbstractTransientStore; 051import org.nuxeo.ecm.core.transientstore.api.TransientStore; 052import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 053import org.nuxeo.runtime.api.Framework; 054 055/** 056 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}. 057 * <p> 058 * Since hashes cannot be nested, a storage entry is flattened as follows: 059 * 060 * <pre> 061 * - Entry summary: 062 * 063 * transientStore:transientStoreName:entryKey { 064 * "blobCount": number of blobs associated with the entry 065 * "size": storage size of the blobs associated with the entry 066 * "completed": entry status 067 * } 068 * 069 * - Entry parameters: 070 * 071 * transientStore:transientStoreName:entryKey:params { 072 * "param1": value1 073 * "param2": value2 074 * } 075 * 076 * - Entry blobs: 077 * 078 * transientStore:transientStoreName:entryKey:blobs:0 { 079 * "file" 080 * "filename" 081 * "encoding" 082 * "mimetype" 083 * "digest" 084 * } 085 * 086 * transientStore:transientStoreName:entryKey:blobs:1 { 087 * ... 088 * } 089 * 090 * ... 091 * </pre> 092 * 093 * @since 7.2 094 */ 095public class RedisTransientStore extends AbstractTransientStore { 096 097 protected static final String SIZE_KEY = "size"; 098 099 protected RedisExecutor redisExecutor; 100 101 protected String namespace; 102 103 protected String sizeKey; 104 105 protected KeyMatcher keyMatcher; 106 107 protected RedisAdmin redisAdmin; 108 109 protected int firstLevelTTL; 110 111 protected int secondLevelTTL; 112 113 protected Log log = LogFactory.getLog(RedisTransientStore.class); 114 115 public RedisTransientStore() { 116 redisExecutor = Framework.getService(RedisExecutor.class); 117 redisAdmin = Framework.getService(RedisAdmin.class); 118 } 119 120 @Override 121 public void init(TransientStoreConfig config) { 122 log.debug("Initializing RedisTransientStore: " + config.getName()); 123 super.init(config); 124 125 namespace = redisAdmin.namespace("transientStore", config.getName()); 126 sizeKey = namespace + SIZE_KEY; 127 keyMatcher = new KeyMatcher(); 128 129 // Use seconds for Redis EXPIRE command 130 firstLevelTTL = config.getFirstLevelTTL() * 60; 131 secondLevelTTL = config.getSecondLevelTTL() * 60; 132 } 133 134 @Override 135 public void shutdown() { 136 log.debug("Shutting down RedisTransientStore: " + config.getName()); 137 // Nothing to do here. 138 } 139 140 @Override 141 public boolean exists(String key) { 142 // Jedis#exists(String key) doesn't to work for a key created with hset or hmset 143 return getSummary(key) != null || getParameters(key) != null; 144 } 145 146 @Override 147 public Stream<String> keyStream() { 148 return redisExecutor.execute((RedisCallable<Stream<String>>) jedis -> { 149 return jedis.keys(namespace + "*") // 150 .stream() 151 .map(keyMatcher) 152 .filter(key -> !SIZE_KEY.equals(key)); 153 }); 154 } 155 156 protected class KeyMatcher implements Function<String, String> { 157 158 protected final Pattern KEY_PATTERN = Pattern.compile("(.*?)(:(params|blobs:[0-9]+))?"); 159 160 protected final int offset = namespace.length(); 161 162 @Override 163 public String apply(String t) { 164 final Matcher m = KEY_PATTERN.matcher(t.substring(offset)); 165 m.matches(); 166 return m.group(1); 167 } 168 169 } 170 171 @Override 172 public void putParameter(String key, String parameter, Serializable value) { 173 redisExecutor.execute((RedisCallable<Void>) jedis -> { 174 String paramsKey = namespace + join(key, "params"); 175 if (log.isDebugEnabled()) { 176 log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, value, 177 paramsKey)); 178 } 179 jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value)); 180 return null; 181 }); 182 setTTL(key, firstLevelTTL); 183 } 184 185 @Override 186 public Serializable getParameter(String key, String parameter) { 187 return redisExecutor.execute((RedisCallable<Serializable>) jedis -> { 188 String paramsKey = namespace + join(key, "params"); 189 byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter)); 190 if (paramBytes == null) { 191 return null; 192 } 193 Serializable res = deserialize(paramBytes); 194 if (log.isDebugEnabled()) { 195 log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, paramsKey, 196 res)); 197 } 198 return res; 199 }); 200 } 201 202 @Override 203 public void putParameters(String key, Map<String, Serializable> parameters) { 204 redisExecutor.execute((RedisCallable<Void>) jedis -> { 205 String paramsKey = namespace + join(key, "params"); 206 if (log.isDebugEnabled()) { 207 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey)); 208 } 209 jedis.hmset(getBytes(paramsKey), serialize(parameters)); 210 return null; 211 }); 212 setTTL(key, firstLevelTTL); 213 } 214 215 @Override 216 public Map<String, Serializable> getParameters(String key) { 217 // TODO NXP-18236: use a transaction? 218 String paramsKey = namespace + join(key, "params"); 219 Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> { 220 return jedis.hgetAll(getBytes(paramsKey)); 221 }); 222 if (paramBytes.isEmpty()) { 223 if (getSummary(key) == null) { 224 return null; 225 } else { 226 return new HashMap<>(); 227 } 228 } 229 Map<String, Serializable> res = deserialize(paramBytes); 230 if (log.isDebugEnabled()) { 231 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res)); 232 } 233 return res; 234 } 235 236 @Override 237 public List<Blob> getBlobs(String key) { 238 // TODO NXP-18236: use a transaction? 239 240 // Get blob count 241 String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> { 242 return jedis.hget(namespace + key, "blobCount"); 243 }); 244 if (log.isDebugEnabled()) { 245 log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", 246 namespace + key, blobCount)); 247 } 248 if (blobCount == null) { 249 // Check for existing parameters 250 Map<String, Serializable> parameters = getParameters(key); 251 if (parameters == null) { 252 return null; 253 } else { 254 return new ArrayList<>(); 255 } 256 } 257 258 // Get blobs 259 int entryBlobCount = Integer.parseInt(blobCount); 260 if (entryBlobCount <= 0) { 261 return new ArrayList<>(); 262 } 263 List<Map<String, String>> blobInfos = new ArrayList<>(); 264 for (int i = 0; i < entryBlobCount; i++) { 265 String blobInfoIndex = String.valueOf(i); 266 Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 267 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 268 Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey); 269 if (blobInfo.isEmpty()) { 270 throw new NuxeoException( 271 String.format("Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", 272 key, entryBlobCount, blobInfoKey)); 273 } 274 if (log.isDebugEnabled()) { 275 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey, 276 blobInfo)); 277 } 278 return blobInfo; 279 }); 280 blobInfos.add(entryBlobInfo); 281 } 282 283 // Load blobs from the file system 284 return loadBlobs(blobInfos); 285 } 286 287 @Override 288 public long getSize(String key) { 289 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 290 String size = jedis.hget(namespace + key, SIZE_KEY); 291 if (size == null) { 292 return -1L; 293 } 294 if (log.isDebugEnabled()) { 295 log.debug(String.format("Fetched field \"%s\" from Redis hash stored at key %s -> %s", SIZE_KEY, 296 namespace + key, size)); 297 } 298 return Long.parseLong(size); 299 }); 300 } 301 302 @Override 303 public boolean isCompleted(String key) { 304 return redisExecutor.execute((RedisCallable<Boolean>) jedis -> { 305 String completed = jedis.hget(namespace + key, "completed"); 306 if (log.isDebugEnabled()) { 307 log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", 308 namespace + key, completed)); 309 } 310 return Boolean.parseBoolean(completed); 311 }); 312 } 313 314 @Override 315 public void setCompleted(String key, boolean completed) { 316 redisExecutor.execute((RedisCallable<Void>) jedis -> { 317 if (log.isDebugEnabled()) { 318 log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s", 319 completed, namespace + key)); 320 } 321 jedis.hset(namespace + key, "completed", String.valueOf(completed)); 322 return null; 323 }); 324 setTTL(key, firstLevelTTL); 325 } 326 327 @Override 328 public void release(String key) { 329 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 330 setTTL(key, secondLevelTTL); 331 } else { 332 remove(key); 333 } 334 } 335 336 @Override 337 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 338 // TODO NXP-18236: use a transaction? 339 340 Map<String, String> oldSummary = getSummary(key); 341 342 // Update storage size 343 long entrySize = -1; 344 if (oldSummary != null) { 345 String size = oldSummary.get(SIZE_KEY); 346 if (size != null) { 347 entrySize = Long.parseLong(size); 348 } 349 } 350 if (entrySize > 0) { 351 incrementStorageSize(sizeOfBlobs - entrySize); 352 } else { 353 if (sizeOfBlobs > 0) { 354 incrementStorageSize(sizeOfBlobs); 355 } 356 } 357 358 // Delete old blobs 359 if (oldSummary != null) { 360 String oldBlobCount = oldSummary.get("blobCount"); 361 deleteBlobInfos(key, oldBlobCount); 362 } 363 364 // Update entry size and blob count 365 final Map<String, String> entrySummary = new HashMap<>(); 366 int blobCount = 0; 367 if (blobInfos != null) { 368 blobCount = blobInfos.size(); 369 } 370 entrySummary.put("blobCount", String.valueOf(blobCount)); 371 entrySummary.put(SIZE_KEY, String.valueOf(sizeOfBlobs)); 372 redisExecutor.execute((RedisCallable<Void>) jedis -> { 373 if (log.isDebugEnabled()) { 374 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, 375 namespace + key)); 376 } 377 jedis.hmset(namespace + key, entrySummary); 378 jedis.expire(namespace + key, firstLevelTTL); 379 return null; 380 }); 381 382 // Set new blobs 383 if (blobInfos != null) { 384 int blobsTimeout = firstLevelTTL + 60; 385 for (int i = 0; i < blobInfos.size(); i++) { 386 String blobInfoIndex = String.valueOf(i); 387 Map<String, String> blobInfo = blobInfos.get(i); 388 redisExecutor.execute((RedisCallable<Void>) jedis -> { 389 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 390 if (log.isDebugEnabled()) { 391 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo, 392 blobInfoKey)); 393 } 394 jedis.hmset(blobInfoKey, blobInfo); 395 jedis.expire(blobInfoKey, blobsTimeout); 396 return null; 397 }); 398 } 399 } 400 401 // Set params TTL 402 redisExecutor.execute((RedisCallable<Void>) jedis -> { 403 String paramsKey = namespace + join(key, "params"); 404 jedis.expire(getBytes(paramsKey), firstLevelTTL + 60); 405 return null; 406 }); 407 } 408 409 @Override 410 public long getStorageSize() { 411 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 412 String value = jedis.get(sizeKey); 413 if (value == null) { 414 return 0L; 415 } 416 if (log.isDebugEnabled()) { 417 log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value)); 418 } 419 return Long.parseLong(value); 420 }); 421 } 422 423 @Override 424 protected void setStorageSize(final long newSize) { 425 redisExecutor.execute((RedisCallable<Void>) jedis -> { 426 if (log.isDebugEnabled()) { 427 log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize)); 428 } 429 jedis.set(sizeKey, "" + newSize); 430 return null; 431 }); 432 } 433 434 @Override 435 protected long incrementStorageSize(final long size) { 436 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 437 Long incremented = jedis.incrBy(sizeKey, size); 438 if (log.isDebugEnabled()) { 439 log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented)); 440 } 441 return incremented; 442 }); 443 } 444 445 @Override 446 protected long decrementStorageSize(final long size) { 447 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 448 Long decremented = jedis.decrBy(sizeKey, size); 449 if (log.isDebugEnabled()) { 450 log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented)); 451 } 452 return decremented; 453 }); 454 } 455 456 @Override 457 protected void removeEntry(String key) { 458 // TODO NXP-18236: use a transaction? 459 460 Map<String, String> summary = getSummary(key); 461 if (summary != null) { 462 // Remove blobs 463 String blobCount = summary.get("blobCount"); 464 deleteBlobInfos(key, blobCount); 465 466 // Remove summary 467 redisExecutor.execute((RedisCallable<Long>) jedis -> { 468 Long deleted = jedis.del(namespace + key); 469 if (log.isDebugEnabled()) { 470 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key)); 471 } 472 return deleted; 473 }); 474 475 // Decrement storage size 476 String size = summary.get(SIZE_KEY); 477 if (size != null) { 478 long entrySize = Integer.parseInt(size); 479 if (entrySize > 0) { 480 decrementStorageSize(entrySize); 481 } 482 } 483 } 484 485 // Remove parameters 486 redisExecutor.execute((RedisCallable<Long>) jedis -> { 487 String paramsKey = namespace + join(key, "params"); 488 Long deleted = jedis.del(getBytes(paramsKey)); 489 if (log.isDebugEnabled()) { 490 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey)); 491 } 492 return deleted; 493 }); 494 } 495 496 @Override 497 protected void removeAllEntries() { 498 // TODO NXP-18236: use a transaction? 499 Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> { 500 return jedis.keys(namespace + "*"); 501 }); 502 for (String key : keys) { 503 redisExecutor.execute((RedisCallable<Void>) jedis -> { 504 jedis.del(key); 505 return null; 506 }); 507 } 508 } 509 510 public long getTTL(String key) { 511 long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> { 512 return jedis.ttl(namespace + key); 513 }); 514 if (summaryTTL >= 0) { 515 return summaryTTL; 516 } else { 517 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 518 String paramsKey = namespace + join(key, "params"); 519 return jedis.ttl(getBytes(paramsKey)); 520 }); 521 } 522 } 523 524 protected Map<String, String> getSummary(String key) { 525 return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 526 Map<String, String> summary = jedis.hgetAll(namespace + key); 527 if (summary.isEmpty()) { 528 return null; 529 } 530 if (log.isDebugEnabled()) { 531 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key, 532 summary)); 533 } 534 return summary; 535 }); 536 } 537 538 protected void deleteBlobInfos(String key, String blobCountStr) { 539 if (blobCountStr != null) { 540 int blobCount = Integer.parseInt(blobCountStr); 541 if (blobCount > 0) { 542 for (int i = 0; i < blobCount; i++) { 543 String blobInfoIndex = String.valueOf(i); 544 redisExecutor.execute((RedisCallable<Long>) jedis -> { 545 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 546 Long deleted = jedis.del(blobInfoKey); 547 if (log.isDebugEnabled()) { 548 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey)); 549 } 550 return deleted; 551 }); 552 } 553 } 554 } 555 } 556 557 protected String join(String... fragments) { 558 return StringUtils.join(fragments, ":"); 559 } 560 561 protected byte[] getBytes(String key) { 562 return key.getBytes(UTF_8); 563 } 564 565 protected String getString(byte[] bytes) { 566 return new String(bytes, UTF_8); 567 } 568 569 protected byte[] serialize(Serializable value) { 570 try { 571 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 572 ObjectOutputStream out = new ObjectOutputStream(baos); 573 out.writeObject(value); 574 out.flush(); 575 out.close(); 576 return baos.toByteArray(); 577 } catch (IOException e) { 578 throw new NuxeoException(e); 579 } 580 } 581 582 protected Serializable deserialize(byte[] bytes) { 583 try { 584 InputStream bain = new ByteArrayInputStream(bytes); 585 ObjectInputStream in = new ObjectInputStream(bain); 586 return (Serializable) in.readObject(); 587 } catch (IOException | ClassNotFoundException e) { 588 throw new NuxeoException(e); 589 } 590 } 591 592 protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) { 593 Map<byte[], byte[]> serializedMap = new HashMap<>(); 594 for (String key : map.keySet()) { 595 serializedMap.put(getBytes(key), serialize(map.get(key))); 596 } 597 return serializedMap; 598 } 599 600 protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) { 601 Map<String, Serializable> map = new HashMap<>(); 602 for (byte[] key : byteMap.keySet()) { 603 map.put(getString(key), deserialize(byteMap.get(key))); 604 } 605 return map; 606 } 607 608 protected void setTTL(String key, int seconds) { 609 610 Map<String, String> summary = getSummary(key); 611 if (summary != null) { 612 // Summary 613 redisExecutor.execute((RedisCallable<Void>) jedis -> { 614 jedis.expire(namespace + key, seconds); 615 return null; 616 }); 617 // Blobs 618 String blobCountStr = summary.get("blobCount"); 619 if (blobCountStr != null) { 620 int blobCount = Integer.parseInt(blobCountStr); 621 if (blobCount > 0) { 622 final int blobsTimeout = seconds + 60; 623 for (int i = 0; i < blobCount; i++) { 624 String blobInfoIndex = String.valueOf(i); 625 redisExecutor.execute((RedisCallable<Void>) jedis -> { 626 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 627 jedis.expire(blobInfoKey, blobsTimeout); 628 return null; 629 }); 630 } 631 } 632 } 633 } 634 // Parameters 635 final int paramsTimeout; 636 if (summary == null) { 637 paramsTimeout = seconds; 638 } else { 639 paramsTimeout = seconds + 60; 640 } 641 redisExecutor.execute((RedisCallable<Void>) jedis -> { 642 String paramsKey = namespace + join(key, "params"); 643 jedis.expire(getBytes(paramsKey), paramsTimeout); 644 return null; 645 }); 646 } 647 648}