001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.redis.contribs; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.ObjectInputStream; 028import java.io.ObjectOutputStream; 029import java.io.Serializable; 030import java.nio.charset.StandardCharsets; 031import java.util.ArrayList; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.function.Function; 037import java.util.regex.Matcher; 038import java.util.regex.Pattern; 039import java.util.stream.Stream; 040 041import org.apache.commons.lang.StringUtils; 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.nuxeo.ecm.core.api.Blob; 045import org.nuxeo.ecm.core.api.NuxeoException; 046import org.nuxeo.ecm.core.redis.RedisAdmin; 047import org.nuxeo.ecm.core.redis.RedisCallable; 048import org.nuxeo.ecm.core.redis.RedisExecutor; 049import org.nuxeo.ecm.core.transientstore.AbstractTransientStore; 050import org.nuxeo.ecm.core.transientstore.api.TransientStore; 051import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 052import org.nuxeo.runtime.api.Framework; 053 054/** 055 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}. 056 * <p> 057 * Since hashes cannot be nested, a storage entry is flattened as follows: 058 * 059 * <pre> 060 * - Entry summary: 061 * 062 * transientStore:transientStoreName:entryKey { 063 * "blobCount": number of blobs associated with the entry 064 * "size": storage size of the blobs associated with the entry 065 * "completed": entry status 066 * } 067 * 068 * - Entry parameters: 069 * 070 * transientStore:transientStoreName:entryKey:params { 071 * "param1": value1 072 * "param2": value2 073 * } 074 * 075 * - Entry blobs: 076 * 077 * transientStore:transientStoreName:entryKey:blobs:0 { 078 * "file" 079 * "filename" 080 * "encoding" 081 * "mimetype" 082 * "digest" 083 * } 084 * 085 * transientStore:transientStoreName:entryKey:blobs:1 { 086 * ... 087 * } 088 * 089 * ... 090 * </pre> 091 * 092 * @since 7.2 093 */ 094public class RedisTransientStore extends AbstractTransientStore { 095 096 protected static final String SIZE_KEY = "size"; 097 098 protected RedisExecutor redisExecutor; 099 100 protected String namespace; 101 102 protected String sizeKey; 103 104 protected KeyMatcher keyMatcher; 105 106 protected RedisAdmin redisAdmin; 107 108 protected int firstLevelTTL; 109 110 protected int secondLevelTTL; 111 112 protected Log log = LogFactory.getLog(RedisTransientStore.class); 113 114 public RedisTransientStore() { 115 redisExecutor = Framework.getService(RedisExecutor.class); 116 redisAdmin = Framework.getService(RedisAdmin.class); 117 } 118 119 @Override 120 public void init(TransientStoreConfig config) { 121 log.debug("Initializing RedisTransientStore: " + config.getName()); 122 super.init(config); 123 124 namespace = redisAdmin.namespace("transientStore", config.getName()); 125 sizeKey = namespace + SIZE_KEY; 126 keyMatcher = new KeyMatcher(); 127 128 // Use seconds for Redis EXPIRE command 129 firstLevelTTL = config.getFirstLevelTTL() * 60; 130 secondLevelTTL = config.getSecondLevelTTL() * 60; 131 } 132 133 @Override 134 public void shutdown() { 135 log.debug("Shutting down RedisTransientStore: " + config.getName()); 136 // Nothing to do here. 137 } 138 139 @Override 140 public boolean exists(String key) { 141 // Jedis#exists(String key) doesn't to work for a key created with hset or hmset 142 return getSummary(key) != null || getParameters(key) != null; 143 } 144 145 @Override 146 public Stream<String> keyStream() { 147 return redisExecutor.execute((RedisCallable<Stream<String>>) jedis -> { 148 return jedis.keys(namespace + "*") // 149 .stream() 150 .map(keyMatcher) 151 .filter(key -> !SIZE_KEY.equals(key)); 152 }); 153 } 154 155 protected class KeyMatcher implements Function<String, String> { 156 157 protected final Pattern KEY_PATTERN = Pattern.compile("(.*?)(:(params|blobs:[0-9]+))?"); 158 159 protected final int offset = namespace.length(); 160 161 @Override 162 public String apply(String t) { 163 final Matcher m = KEY_PATTERN.matcher(t.substring(offset)); 164 m.matches(); 165 return m.group(1); 166 } 167 168 } 169 170 @Override 171 public void putParameter(String key, String parameter, Serializable value) { 172 redisExecutor.execute((RedisCallable<Void>) jedis -> { 173 String paramsKey = namespace + join(key, "params"); 174 if (log.isDebugEnabled()) { 175 log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, value, 176 paramsKey)); 177 } 178 jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value)); 179 return null; 180 }); 181 setTTL(key, firstLevelTTL); 182 } 183 184 @Override 185 public Serializable getParameter(String key, String parameter) { 186 return redisExecutor.execute((RedisCallable<Serializable>) jedis -> { 187 String paramsKey = namespace + join(key, "params"); 188 byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter)); 189 if (paramBytes == null) { 190 return null; 191 } 192 Serializable res = deserialize(paramBytes); 193 if (log.isDebugEnabled()) { 194 log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, paramsKey, 195 res)); 196 } 197 return res; 198 }); 199 } 200 201 @Override 202 public void putParameters(String key, Map<String, Serializable> parameters) { 203 redisExecutor.execute((RedisCallable<Void>) jedis -> { 204 String paramsKey = namespace + join(key, "params"); 205 if (log.isDebugEnabled()) { 206 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey)); 207 } 208 jedis.hmset(getBytes(paramsKey), serialize(parameters)); 209 return null; 210 }); 211 setTTL(key, firstLevelTTL); 212 } 213 214 @Override 215 public Map<String, Serializable> getParameters(String key) { 216 // TODO NXP-18236: use a transaction? 217 String paramsKey = namespace + join(key, "params"); 218 Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> { 219 return jedis.hgetAll(getBytes(paramsKey)); 220 }); 221 if (paramBytes.isEmpty()) { 222 if (getSummary(key) == null) { 223 return null; 224 } else { 225 return new HashMap<>(); 226 } 227 } 228 Map<String, Serializable> res = deserialize(paramBytes); 229 if (log.isDebugEnabled()) { 230 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res)); 231 } 232 return res; 233 } 234 235 @Override 236 public List<Blob> getBlobs(String key) { 237 // TODO NXP-18236: use a transaction? 238 239 // Get blob count 240 String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> { 241 return jedis.hget(namespace + key, "blobCount"); 242 }); 243 if (log.isDebugEnabled()) { 244 log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", 245 namespace + key, blobCount)); 246 } 247 if (blobCount == null) { 248 // Check for existing parameters 249 Map<String, Serializable> parameters = getParameters(key); 250 if (parameters == null) { 251 return null; 252 } else { 253 return new ArrayList<Blob>(); 254 } 255 } 256 257 // Get blobs 258 int entryBlobCount = Integer.parseInt(blobCount); 259 if (entryBlobCount <= 0) { 260 return new ArrayList<>(); 261 } 262 List<Map<String, String>> blobInfos = new ArrayList<>(); 263 for (int i = 0; i < entryBlobCount; i++) { 264 String blobInfoIndex = String.valueOf(i); 265 Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 266 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 267 Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey); 268 if (blobInfo.isEmpty()) { 269 throw new NuxeoException( 270 String.format("Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", 271 key, entryBlobCount, blobInfoKey)); 272 } 273 if (log.isDebugEnabled()) { 274 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey, 275 blobInfo)); 276 } 277 return blobInfo; 278 }); 279 blobInfos.add(entryBlobInfo); 280 } 281 282 // Load blobs from the file system 283 return loadBlobs(blobInfos); 284 } 285 286 @Override 287 public long getSize(String key) { 288 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 289 String size = jedis.hget(namespace + key, SIZE_KEY); 290 if (size == null) { 291 return -1L; 292 } 293 if (log.isDebugEnabled()) { 294 log.debug(String.format("Fetched field \"%s\" from Redis hash stored at key %s -> %s", SIZE_KEY, 295 namespace + key, size)); 296 } 297 return Long.parseLong(size); 298 }); 299 } 300 301 @Override 302 public boolean isCompleted(String key) { 303 return redisExecutor.execute((RedisCallable<Boolean>) jedis -> { 304 String completed = jedis.hget(namespace + key, "completed"); 305 if (log.isDebugEnabled()) { 306 log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", 307 namespace + key, completed)); 308 } 309 return Boolean.parseBoolean(completed); 310 }); 311 } 312 313 @Override 314 public void setCompleted(String key, boolean completed) { 315 redisExecutor.execute((RedisCallable<Void>) jedis -> { 316 if (log.isDebugEnabled()) { 317 log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s", 318 completed, namespace + key)); 319 } 320 jedis.hset(namespace + key, "completed", String.valueOf(completed)); 321 return null; 322 }); 323 setTTL(key, firstLevelTTL); 324 } 325 326 @Override 327 public void release(String key) { 328 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 329 setTTL(key, secondLevelTTL); 330 } else { 331 remove(key); 332 } 333 } 334 335 @Override 336 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 337 // TODO NXP-18236: use a transaction? 338 339 Map<String, String> oldSummary = getSummary(key); 340 341 // Update storage size 342 long entrySize = -1; 343 if (oldSummary != null) { 344 String size = oldSummary.get(SIZE_KEY); 345 if (size != null) { 346 entrySize = Long.parseLong(size); 347 } 348 } 349 if (entrySize > 0) { 350 incrementStorageSize(sizeOfBlobs - entrySize); 351 } else { 352 if (sizeOfBlobs > 0) { 353 incrementStorageSize(sizeOfBlobs); 354 } 355 } 356 357 // Delete old blobs 358 if (oldSummary != null) { 359 String oldBlobCount = oldSummary.get("blobCount"); 360 deleteBlobInfos(key, oldBlobCount); 361 } 362 363 // Update entry size and blob count 364 final Map<String, String> entrySummary = new HashMap<>(); 365 int blobCount = 0; 366 if (blobInfos != null) { 367 blobCount = blobInfos.size(); 368 } 369 entrySummary.put("blobCount", String.valueOf(blobCount)); 370 entrySummary.put(SIZE_KEY, String.valueOf(sizeOfBlobs)); 371 redisExecutor.execute((RedisCallable<Void>) jedis -> { 372 if (log.isDebugEnabled()) { 373 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, 374 namespace + key)); 375 } 376 jedis.hmset(namespace + key, entrySummary); 377 jedis.expire(namespace + key, firstLevelTTL); 378 return null; 379 }); 380 381 // Set new blobs 382 if (blobInfos != null) { 383 int blobsTimeout = firstLevelTTL + 60; 384 for (int i = 0; i < blobInfos.size(); i++) { 385 String blobInfoIndex = String.valueOf(i); 386 Map<String, String> blobInfo = blobInfos.get(i); 387 redisExecutor.execute((RedisCallable<Void>) jedis -> { 388 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 389 if (log.isDebugEnabled()) { 390 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo, 391 blobInfoKey)); 392 } 393 jedis.hmset(blobInfoKey, blobInfo); 394 jedis.expire(blobInfoKey, blobsTimeout); 395 return null; 396 }); 397 } 398 } 399 400 // Set params TTL 401 redisExecutor.execute((RedisCallable<Void>) jedis -> { 402 String paramsKey = namespace + join(key, "params"); 403 jedis.expire(getBytes(paramsKey), firstLevelTTL + 60); 404 return null; 405 }); 406 } 407 408 @Override 409 public long getStorageSize() { 410 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 411 String value = jedis.get(sizeKey); 412 if (value == null) { 413 return 0L; 414 } 415 if (log.isDebugEnabled()) { 416 log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value)); 417 } 418 return Long.parseLong(value); 419 }); 420 } 421 422 @Override 423 protected void setStorageSize(final long newSize) { 424 redisExecutor.execute((RedisCallable<Void>) jedis -> { 425 if (log.isDebugEnabled()) { 426 log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize)); 427 } 428 jedis.set(sizeKey, "" + newSize); 429 return null; 430 }); 431 } 432 433 @Override 434 protected long incrementStorageSize(final long size) { 435 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 436 Long incremented = jedis.incrBy(sizeKey, size); 437 if (log.isDebugEnabled()) { 438 log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented)); 439 } 440 return incremented; 441 }); 442 } 443 444 @Override 445 protected long decrementStorageSize(final long size) { 446 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 447 Long decremented = jedis.decrBy(sizeKey, size); 448 if (log.isDebugEnabled()) { 449 log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented)); 450 } 451 return decremented; 452 }); 453 } 454 455 @Override 456 protected void removeEntry(String key) { 457 // TODO NXP-18236: use a transaction? 458 459 Map<String, String> summary = getSummary(key); 460 if (summary != null) { 461 // Remove blobs 462 String blobCount = summary.get("blobCount"); 463 deleteBlobInfos(key, blobCount); 464 465 // Remove summary 466 redisExecutor.execute((RedisCallable<Long>) jedis -> { 467 Long deleted = jedis.del(namespace + key); 468 if (log.isDebugEnabled()) { 469 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key)); 470 } 471 return deleted; 472 }); 473 474 // Decrement storage size 475 String size = summary.get(SIZE_KEY); 476 if (size != null) { 477 long entrySize = Integer.parseInt(size); 478 if (entrySize > 0) { 479 decrementStorageSize(entrySize); 480 } 481 } 482 } 483 484 // Remove parameters 485 redisExecutor.execute((RedisCallable<Long>) jedis -> { 486 String paramsKey = namespace + join(key, "params"); 487 Long deleted = jedis.del(getBytes(paramsKey)); 488 if (log.isDebugEnabled()) { 489 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey)); 490 } 491 return deleted; 492 }); 493 } 494 495 @Override 496 protected void removeAllEntries() { 497 // TODO NXP-18236: use a transaction? 498 Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> { 499 return jedis.keys(namespace + "*"); 500 }); 501 for (String key : keys) { 502 redisExecutor.execute((RedisCallable<Void>) jedis -> { 503 jedis.del(key); 504 return null; 505 }); 506 } 507 } 508 509 public long getTTL(String key) { 510 long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> { 511 return jedis.ttl(namespace + key); 512 }); 513 if (summaryTTL >= 0) { 514 return summaryTTL; 515 } else { 516 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 517 String paramsKey = namespace + join(key, "params"); 518 return jedis.ttl(getBytes(paramsKey)); 519 }); 520 } 521 } 522 523 protected Map<String, String> getSummary(String key) { 524 return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 525 Map<String, String> summary = jedis.hgetAll(namespace + key); 526 if (summary.isEmpty()) { 527 return null; 528 } 529 if (log.isDebugEnabled()) { 530 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key, 531 summary)); 532 } 533 return summary; 534 }); 535 } 536 537 protected void deleteBlobInfos(String key, String blobCountStr) { 538 if (blobCountStr != null) { 539 int blobCount = Integer.parseInt(blobCountStr); 540 if (blobCount > 0) { 541 for (int i = 0; i < blobCount; i++) { 542 String blobInfoIndex = String.valueOf(i); 543 redisExecutor.execute((RedisCallable<Long>) jedis -> { 544 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 545 Long deleted = jedis.del(blobInfoKey); 546 if (log.isDebugEnabled()) { 547 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey)); 548 } 549 return deleted; 550 }); 551 } 552 } 553 } 554 } 555 556 protected String join(String... fragments) { 557 return StringUtils.join(fragments, ":"); 558 } 559 560 protected byte[] getBytes(String key) { 561 return key.getBytes(StandardCharsets.UTF_8); 562 } 563 564 protected String getString(byte[] bytes) { 565 return new String(bytes, StandardCharsets.UTF_8); 566 } 567 568 protected byte[] serialize(Serializable value) { 569 try { 570 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 571 ObjectOutputStream out = new ObjectOutputStream(baos); 572 out.writeObject(value); 573 out.flush(); 574 out.close(); 575 return baos.toByteArray(); 576 } catch (IOException e) { 577 throw new NuxeoException(e); 578 } 579 } 580 581 protected Serializable deserialize(byte[] bytes) { 582 try { 583 InputStream bain = new ByteArrayInputStream(bytes); 584 ObjectInputStream in = new ObjectInputStream(bain); 585 return (Serializable) in.readObject(); 586 } catch (IOException | ClassNotFoundException e) { 587 throw new NuxeoException(e); 588 } 589 } 590 591 protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) { 592 Map<byte[], byte[]> serializedMap = new HashMap<>(); 593 for (String key : map.keySet()) { 594 serializedMap.put(getBytes(key), serialize(map.get(key))); 595 } 596 return serializedMap; 597 } 598 599 protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) { 600 Map<String, Serializable> map = new HashMap<>(); 601 for (byte[] key : byteMap.keySet()) { 602 map.put(getString(key), deserialize(byteMap.get(key))); 603 } 604 return map; 605 } 606 607 protected void setTTL(String key, int seconds) { 608 609 Map<String, String> summary = getSummary(key); 610 if (summary != null) { 611 // Summary 612 redisExecutor.execute((RedisCallable<Void>) jedis -> { 613 jedis.expire(namespace + key, seconds); 614 return null; 615 }); 616 // Blobs 617 String blobCountStr = summary.get("blobCount"); 618 if (blobCountStr != null) { 619 int blobCount = Integer.parseInt(blobCountStr); 620 if (blobCount > 0) { 621 final int blobsTimeout = seconds + 60; 622 for (int i = 0; i < blobCount; i++) { 623 String blobInfoIndex = String.valueOf(i); 624 redisExecutor.execute((RedisCallable<Void>) jedis -> { 625 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 626 jedis.expire(blobInfoKey, blobsTimeout); 627 return null; 628 }); 629 } 630 } 631 } 632 } 633 // Parameters 634 final int paramsTimeout; 635 if (summary == null) { 636 paramsTimeout = seconds; 637 } else { 638 paramsTimeout = seconds + 60; 639 } 640 redisExecutor.execute((RedisCallable<Void>) jedis -> { 641 String paramsKey = namespace + join(key, "params"); 642 jedis.expire(getBytes(paramsKey), paramsTimeout); 643 return null; 644 }); 645 } 646 647}