001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.redis.contribs; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.ObjectInputStream; 028import java.io.ObjectOutputStream; 029import java.io.Serializable; 030import java.nio.charset.StandardCharsets; 031import java.util.ArrayList; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.function.Function; 037import java.util.regex.Matcher; 038import java.util.regex.Pattern; 039import java.util.stream.Collectors; 040 041import org.apache.commons.lang.StringUtils; 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.nuxeo.ecm.core.api.Blob; 045import org.nuxeo.ecm.core.api.NuxeoException; 046import org.nuxeo.ecm.core.redis.RedisAdmin; 047import org.nuxeo.ecm.core.redis.RedisCallable; 048import org.nuxeo.ecm.core.redis.RedisExecutor; 049import org.nuxeo.ecm.core.transientstore.AbstractTransientStore; 050import org.nuxeo.ecm.core.transientstore.api.TransientStore; 051import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 052import org.nuxeo.runtime.api.Framework; 053 054/** 055 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}. 056 * <p> 057 * Since hashes cannot be nested, a storage entry is flattened as follows: 058 * 059 * <pre> 060 * - Entry summary: 061 * 062 * transientStore:transientStoreName:entryKey { 063 * "blobCount": number of blobs associated with the entry 064 * "size": storage size of the blobs associated with the entry 065 * "completed": entry status 066 * } 067 * 068 * - Entry parameters: 069 * 070 * transientStore:transientStoreName:entryKey:params { 071 * "param1": value1 072 * "param2": value2 073 * } 074 * 075 * - Entry blobs: 076 * 077 * transientStore:transientStoreName:entryKey:blobs:0 { 078 * "file" 079 * "filename" 080 * "encoding" 081 * "mimetype" 082 * "digest" 083 * } 084 * 085 * transientStore:transientStoreName:entryKey:blobs:1 { 086 * ... 087 * } 088 * 089 * ... 090 * </pre> 091 * 092 * @since 7.2 093 */ 094public class RedisTransientStore extends AbstractTransientStore { 095 096 protected static final String SIZE_KEY = "size"; 097 098 protected RedisExecutor redisExecutor; 099 100 protected String namespace; 101 102 protected String sizeKey; 103 104 protected KeyMatcher keyMatcher; 105 106 protected RedisAdmin redisAdmin; 107 108 protected int firstLevelTTL; 109 110 protected int secondLevelTTL; 111 112 protected Log log = LogFactory.getLog(RedisTransientStore.class); 113 114 public RedisTransientStore() { 115 redisExecutor = Framework.getService(RedisExecutor.class); 116 redisAdmin = Framework.getService(RedisAdmin.class); 117 } 118 119 @Override 120 public void init(TransientStoreConfig config) { 121 log.debug("Initializing RedisTransientStore: " + config.getName()); 122 super.init(config); 123 124 namespace = redisAdmin.namespace("transientStore", config.getName()); 125 sizeKey = namespace + SIZE_KEY; 126 keyMatcher = new KeyMatcher(); 127 128 // Use seconds for Redis EXPIRE command 129 firstLevelTTL = config.getFirstLevelTTL() * 60; 130 secondLevelTTL = config.getSecondLevelTTL() * 60; 131 } 132 133 @Override 134 public void shutdown() { 135 log.debug("Shutting down RedisTransientStore: " + config.getName()); 136 // Nothing to do here. 137 } 138 139 @Override 140 public boolean exists(String key) { 141 // Jedis#exists(String key) doesn't to work for a key created with hset or hmset 142 return getSummary(key) != null || getParameters(key) != null; 143 } 144 145 @Override 146 public Set<String> keySet() { 147 return redisExecutor.execute((RedisCallable<Set<String>>) jedis -> { 148 return jedis.keys(namespace + "*") 149 .stream() 150 .map(keyMatcher) 151 .filter(key -> !SIZE_KEY.equals(key)) 152 .collect(Collectors.toSet()); 153 }); 154 } 155 156 protected class KeyMatcher implements Function<String, String> { 157 158 protected final Pattern KEY_PATTERN = Pattern.compile("(.*?)(:(params|blobs:[0-9]+))?"); 159 160 protected final int offset = namespace.length(); 161 162 @Override 163 public String apply(String t) { 164 final Matcher m = KEY_PATTERN.matcher(t.substring(offset)); 165 m.matches(); 166 return m.group(1); 167 } 168 169 } 170 171 @Override 172 public void putParameter(String key, String parameter, Serializable value) { 173 redisExecutor.execute((RedisCallable<Void>) jedis -> { 174 String paramsKey = namespace + join(key, "params"); 175 if (log.isDebugEnabled()) { 176 log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, 177 value, paramsKey)); 178 } 179 jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value)); 180 return null; 181 }); 182 setTTL(key, firstLevelTTL); 183 } 184 185 @Override 186 public Serializable getParameter(String key, String parameter) { 187 return redisExecutor.execute((RedisCallable<Serializable>) jedis -> { 188 String paramsKey = namespace + join(key, "params"); 189 byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter)); 190 if (paramBytes == null) { 191 return null; 192 } 193 Serializable res = deserialize(paramBytes); 194 if (log.isDebugEnabled()) { 195 log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, 196 paramsKey, res)); 197 } 198 return res; 199 }); 200 } 201 202 @Override 203 public void putParameters(String key, Map<String, Serializable> parameters) { 204 redisExecutor.execute((RedisCallable<Void>) jedis -> { 205 String paramsKey = namespace + join(key, "params"); 206 if (log.isDebugEnabled()) { 207 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey)); 208 } 209 jedis.hmset(getBytes(paramsKey), serialize(parameters)); 210 return null; 211 }); 212 setTTL(key, firstLevelTTL); 213 } 214 215 @Override 216 public Map<String, Serializable> getParameters(String key) { 217 // TODO NXP-18236: use a transaction? 218 String paramsKey = namespace + join(key, "params"); 219 Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> { 220 return jedis.hgetAll(getBytes(paramsKey)); 221 }); 222 if (paramBytes.isEmpty()) { 223 if (getSummary(key) == null) { 224 return null; 225 } else { 226 return new HashMap<>(); 227 } 228 } 229 Map<String, Serializable> res = deserialize(paramBytes); 230 if (log.isDebugEnabled()) { 231 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res)); 232 } 233 return res; 234 } 235 236 @Override 237 public List<Blob> getBlobs(String key) { 238 // TODO NXP-18236: use a transaction? 239 240 // Get blob count 241 String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> { 242 return jedis.hget(namespace + key, "blobCount"); 243 }); 244 if (log.isDebugEnabled()) { 245 log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", namespace 246 + key, blobCount)); 247 } 248 if (blobCount == null) { 249 // Check for existing parameters 250 Map<String, Serializable> parameters = getParameters(key); 251 if (parameters == null) { 252 return null; 253 } else { 254 return new ArrayList<Blob>(); 255 } 256 } 257 258 // Get blobs 259 int entryBlobCount = Integer.parseInt(blobCount); 260 if (entryBlobCount <= 0) { 261 return new ArrayList<>(); 262 } 263 List<Map<String, String>> blobInfos = new ArrayList<>(); 264 for (int i = 0; i < entryBlobCount; i++) { 265 String blobInfoIndex = String.valueOf(i); 266 Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 267 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 268 Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey); 269 if (blobInfo.isEmpty()) { 270 throw new NuxeoException(String.format( 271 "Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", key, 272 entryBlobCount, blobInfoKey)); 273 } 274 if (log.isDebugEnabled()) { 275 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey, 276 blobInfo)); 277 } 278 return blobInfo; 279 }); 280 blobInfos.add(entryBlobInfo); 281 } 282 283 // Load blobs from the file system 284 return loadBlobs(blobInfos); 285 } 286 287 @Override 288 public long getSize(String key) { 289 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 290 String size = jedis.hget(namespace + key, SIZE_KEY); 291 if (size == null) { 292 return -1L; 293 } 294 if (log.isDebugEnabled()) { 295 log.debug(String.format("Fetched field \"%s\" from Redis hash stored at key %s -> %s", SIZE_KEY, 296 namespace + key, size)); 297 } 298 return Long.parseLong(size); 299 }); 300 } 301 302 @Override 303 public boolean isCompleted(String key) { 304 return redisExecutor.execute((RedisCallable<Boolean>) jedis -> { 305 String completed = jedis.hget(namespace + key, "completed"); 306 if (log.isDebugEnabled()) { 307 log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", namespace 308 + key, completed)); 309 } 310 return Boolean.parseBoolean(completed); 311 }); 312 } 313 314 @Override 315 public void setCompleted(String key, boolean completed) { 316 redisExecutor.execute((RedisCallable<Void>) jedis -> { 317 if (log.isDebugEnabled()) { 318 log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s", 319 completed, namespace + key)); 320 } 321 jedis.hset(namespace + key, "completed", String.valueOf(completed)); 322 return null; 323 }); 324 setTTL(key, firstLevelTTL); 325 } 326 327 @Override 328 public void remove(String key) { 329 // TODO NXP-18236: use a transaction? 330 331 Map<String, String> summary = getSummary(key); 332 if (summary != null) { 333 // Remove blobs 334 String blobCount = summary.get("blobCount"); 335 deleteBlobInfos(key, blobCount); 336 337 // Remove summary 338 redisExecutor.execute((RedisCallable<Long>) jedis -> { 339 Long deleted = jedis.del(namespace + key); 340 if (log.isDebugEnabled()) { 341 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key)); 342 } 343 return deleted; 344 }); 345 346 // Decrement storage size 347 String size = summary.get(SIZE_KEY); 348 if (size != null) { 349 long entrySize = Integer.parseInt(size); 350 if (entrySize > 0) { 351 decrementStorageSize(entrySize); 352 } 353 } 354 } 355 356 // Remove parameters 357 redisExecutor.execute((RedisCallable<Long>) jedis -> { 358 String paramsKey = namespace + join(key, "params"); 359 Long deleted = jedis.del(getBytes(paramsKey)); 360 if (log.isDebugEnabled()) { 361 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey)); 362 } 363 return deleted; 364 }); 365 } 366 367 @Override 368 public void release(String key) { 369 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 370 setTTL(key, secondLevelTTL); 371 } else { 372 remove(key); 373 } 374 } 375 376 @Override 377 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 378 // TODO NXP-18236: use a transaction? 379 380 Map<String, String> oldSummary = getSummary(key); 381 382 // Update storage size 383 long entrySize = -1; 384 if (oldSummary != null) { 385 String size = oldSummary.get(SIZE_KEY); 386 if (size != null) { 387 entrySize = Long.parseLong(size); 388 } 389 } 390 if (entrySize > 0) { 391 incrementStorageSize(sizeOfBlobs - entrySize); 392 } else { 393 if (sizeOfBlobs > 0) { 394 incrementStorageSize(sizeOfBlobs); 395 } 396 } 397 398 // Delete old blobs 399 if (oldSummary != null) { 400 String oldBlobCount = oldSummary.get("blobCount"); 401 deleteBlobInfos(key, oldBlobCount); 402 } 403 404 // Update entry size and blob count 405 final Map<String, String> entrySummary = new HashMap<>(); 406 int blobCount = 0; 407 if (blobInfos != null) { 408 blobCount = blobInfos.size(); 409 } 410 entrySummary.put("blobCount", String.valueOf(blobCount)); 411 entrySummary.put(SIZE_KEY, String.valueOf(sizeOfBlobs)); 412 redisExecutor.execute((RedisCallable<Void>) jedis -> { 413 if (log.isDebugEnabled()) { 414 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, namespace 415 + key)); 416 } 417 jedis.hmset(namespace + key, entrySummary); 418 jedis.expire(namespace + key, firstLevelTTL); 419 return null; 420 }); 421 422 // Set new blobs 423 if (blobInfos != null) { 424 int blobsTimeout = firstLevelTTL + 60; 425 for (int i = 0; i < blobInfos.size(); i++) { 426 String blobInfoIndex = String.valueOf(i); 427 Map<String, String> blobInfo = blobInfos.get(i); 428 redisExecutor.execute((RedisCallable<Void>) jedis -> { 429 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 430 if (log.isDebugEnabled()) { 431 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo, 432 blobInfoKey)); 433 } 434 jedis.hmset(blobInfoKey, blobInfo); 435 jedis.expire(blobInfoKey, blobsTimeout); 436 return null; 437 }); 438 } 439 } 440 441 // Set params TTL 442 redisExecutor.execute((RedisCallable<Void>) jedis -> { 443 String paramsKey = namespace + join(key, "params"); 444 jedis.expire(getBytes(paramsKey), firstLevelTTL + 60); 445 return null; 446 }); 447 } 448 449 @Override 450 public long getStorageSize() { 451 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 452 String value = jedis.get(sizeKey); 453 if (value == null) { 454 return 0L; 455 } 456 if (log.isDebugEnabled()) { 457 log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value)); 458 } 459 return Long.parseLong(value); 460 }); 461 } 462 463 @Override 464 protected void setStorageSize(final long newSize) { 465 redisExecutor.execute((RedisCallable<Void>) jedis -> { 466 if (log.isDebugEnabled()) { 467 log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize)); 468 } 469 jedis.set(sizeKey, "" + newSize); 470 return null; 471 }); 472 } 473 474 @Override 475 protected long incrementStorageSize(final long size) { 476 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 477 Long incremented = jedis.incrBy(sizeKey, size); 478 if (log.isDebugEnabled()) { 479 log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented)); 480 } 481 return incremented; 482 }); 483 } 484 485 @Override 486 protected long decrementStorageSize(final long size) { 487 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 488 Long decremented = jedis.decrBy(sizeKey, size); 489 if (log.isDebugEnabled()) { 490 log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented)); 491 } 492 return decremented; 493 }); 494 } 495 496 @Override 497 protected void removeAllEntries() { 498 // TODO NXP-18236: use a transaction? 499 Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> { 500 return jedis.keys(namespace + "*"); 501 }); 502 for (String key : keys) { 503 redisExecutor.execute((RedisCallable<Void>) jedis -> { 504 jedis.del(key); 505 return null; 506 }); 507 } 508 } 509 510 public long getTTL(String key) { 511 long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> { 512 return jedis.ttl(namespace + key); 513 }); 514 if (summaryTTL >= 0) { 515 return summaryTTL; 516 } else { 517 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 518 String paramsKey = namespace + join(key, "params"); 519 return jedis.ttl(getBytes(paramsKey)); 520 }); 521 } 522 } 523 524 protected Map<String, String> getSummary(String key) { 525 return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 526 Map<String, String> summary = jedis.hgetAll(namespace + key); 527 if (summary.isEmpty()) { 528 return null; 529 } 530 if (log.isDebugEnabled()) { 531 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key, 532 summary)); 533 } 534 return summary; 535 }); 536 } 537 538 protected void deleteBlobInfos(String key, String blobCountStr) { 539 if (blobCountStr != null) { 540 int blobCount = Integer.parseInt(blobCountStr); 541 if (blobCount > 0) { 542 for (int i = 0; i < blobCount; i++) { 543 String blobInfoIndex = String.valueOf(i); 544 redisExecutor.execute((RedisCallable<Long>) jedis -> { 545 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 546 Long deleted = jedis.del(blobInfoKey); 547 if (log.isDebugEnabled()) { 548 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey)); 549 } 550 return deleted; 551 }); 552 } 553 } 554 } 555 } 556 557 protected String join(String... fragments) { 558 return StringUtils.join(fragments, ":"); 559 } 560 561 protected byte[] getBytes(String key) { 562 return key.getBytes(StandardCharsets.UTF_8); 563 } 564 565 protected String getString(byte[] bytes) { 566 return new String(bytes, StandardCharsets.UTF_8); 567 } 568 569 protected byte[] serialize(Serializable value) { 570 try { 571 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 572 ObjectOutputStream out = new ObjectOutputStream(baos); 573 out.writeObject(value); 574 out.flush(); 575 out.close(); 576 return baos.toByteArray(); 577 } catch (IOException e) { 578 throw new NuxeoException(e); 579 } 580 } 581 582 protected Serializable deserialize(byte[] bytes) { 583 try { 584 InputStream bain = new ByteArrayInputStream(bytes); 585 ObjectInputStream in = new ObjectInputStream(bain); 586 return (Serializable) in.readObject(); 587 } catch (IOException | ClassNotFoundException e) { 588 throw new NuxeoException(e); 589 } 590 } 591 592 protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) { 593 Map<byte[], byte[]> serializedMap = new HashMap<>(); 594 for (String key : map.keySet()) { 595 serializedMap.put(getBytes(key), serialize(map.get(key))); 596 } 597 return serializedMap; 598 } 599 600 protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) { 601 Map<String, Serializable> map = new HashMap<>(); 602 for (byte[] key : byteMap.keySet()) { 603 map.put(getString(key), deserialize(byteMap.get(key))); 604 } 605 return map; 606 } 607 608 protected void setTTL(String key, int seconds) { 609 610 Map<String, String> summary = getSummary(key); 611 if (summary != null) { 612 // Summary 613 redisExecutor.execute((RedisCallable<Void>) jedis -> { 614 jedis.expire(namespace + key, seconds); 615 return null; 616 }); 617 // Blobs 618 String blobCountStr = summary.get("blobCount"); 619 if (blobCountStr != null) { 620 int blobCount = Integer.parseInt(blobCountStr); 621 if (blobCount > 0) { 622 final int blobsTimeout = seconds + 60; 623 for (int i = 0; i < blobCount; i++) { 624 String blobInfoIndex = String.valueOf(i); 625 redisExecutor.execute((RedisCallable<Void>) jedis -> { 626 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 627 jedis.expire(blobInfoKey, blobsTimeout); 628 return null; 629 }); 630 } 631 } 632 } 633 } 634 // Parameters 635 final int paramsTimeout; 636 if (summary == null) { 637 paramsTimeout = seconds; 638 } else { 639 paramsTimeout = seconds + 60; 640 } 641 redisExecutor.execute((RedisCallable<Void>) jedis -> { 642 String paramsKey = namespace + join(key, "params"); 643 jedis.expire(getBytes(paramsKey), paramsTimeout); 644 return null; 645 }); 646 } 647 648}