001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Thierry Delprat <tdelprat@nuxeo.com> 016 * Antoine Taillefer <ataillefer@nuxeo.com> 017 */ 018 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.Serializable; 028import java.nio.charset.StandardCharsets; 029import java.util.ArrayList; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034 035import org.apache.commons.lang.StringUtils; 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.ecm.core.api.Blob; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.redis.RedisAdmin; 041import org.nuxeo.ecm.core.redis.RedisCallable; 042import org.nuxeo.ecm.core.redis.RedisExecutor; 043import org.nuxeo.ecm.core.transientstore.AbstractTransientStore; 044import org.nuxeo.ecm.core.transientstore.api.TransientStore; 045import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 046import org.nuxeo.runtime.api.Framework; 047 048/** 049 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}. 050 * <p> 051 * Since hashes cannot be nested, a storage entry is flattened as follows: 052 * 053 * <pre> 054 * - Entry summary: 055 * 056 * transientStore:transientStoreName:entryKey { 057 * "blobCount": number of blobs associated with the entry 058 * "size": storage size of the blobs associated with the entry 059 * "completed": entry status 060 * } 061 * 062 * - Entry parameters: 063 * 064 * transientStore:transientStoreName:entryKey:params { 065 * "param1": value1 066 * "param2": value2 067 * } 068 * 069 * - Entry blobs: 070 * 071 * transientStore:transientStoreName:entryKey:blobs:0 { 072 * "file" 073 * "filename" 074 * "encoding" 075 * "mimetype" 076 * "digest" 077 * } 078 * 079 * transientStore:transientStoreName:entryKey:blobs:1 { 080 * ... 081 * } 082 * 083 * ... 084 * </pre> 085 * 086 * @since 7.2 087 */ 088public class RedisTransientStore extends AbstractTransientStore { 089 090 protected RedisExecutor redisExecutor; 091 092 protected String namespace; 093 094 protected String sizeKey; 095 096 protected RedisAdmin redisAdmin; 097 098 protected int firstLevelTTL; 099 100 protected int secondLevelTTL; 101 102 protected Log log = LogFactory.getLog(RedisTransientStore.class); 103 104 public RedisTransientStore() { 105 redisExecutor = Framework.getService(RedisExecutor.class); 106 redisAdmin = Framework.getService(RedisAdmin.class); 107 } 108 109 @Override 110 public void init(TransientStoreConfig config) { 111 log.debug("Initializing RedisTransientStore: " + config.getName()); 112 super.init(config); 113 114 namespace = redisAdmin.namespace("transientStore", config.getName()); 115 sizeKey = namespace + "size"; 116 117 // Use seconds for Redis EXPIRE command 118 firstLevelTTL = config.getFirstLevelTTL() * 60; 119 secondLevelTTL = config.getSecondLevelTTL() * 60; 120 } 121 122 @Override 123 public void shutdown() { 124 log.debug("Shutting down RedisTransientStore: " + config.getName()); 125 // Nothing to do here. 126 } 127 128 @Override 129 public boolean exists(String key) { 130 // Jedis#exists(String key) doesn't to work for a key created with hset or hmset 131 return getSummary(key) != null || getParameters(key) != null; 132 } 133 134 @Override 135 public void putParameter(String key, String parameter, Serializable value) { 136 redisExecutor.execute((RedisCallable<Void>) jedis -> { 137 String paramsKey = namespace + join(key, "params"); 138 if (log.isDebugEnabled()) { 139 log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, 140 value, paramsKey)); 141 } 142 jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value)); 143 return null; 144 }); 145 setTTL(key, firstLevelTTL); 146 } 147 148 @Override 149 public Serializable getParameter(String key, String parameter) { 150 return redisExecutor.execute((RedisCallable<Serializable>) jedis -> { 151 String paramsKey = namespace + join(key, "params"); 152 byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter)); 153 if (paramBytes == null) { 154 return null; 155 } 156 Serializable res = deserialize(paramBytes); 157 if (log.isDebugEnabled()) { 158 log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, 159 paramsKey, res)); 160 } 161 return res; 162 }); 163 } 164 165 @Override 166 public void putParameters(String key, Map<String, Serializable> parameters) { 167 redisExecutor.execute((RedisCallable<Void>) jedis -> { 168 String paramsKey = namespace + join(key, "params"); 169 if (log.isDebugEnabled()) { 170 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey)); 171 } 172 jedis.hmset(getBytes(paramsKey), serialize(parameters)); 173 return null; 174 }); 175 setTTL(key, firstLevelTTL); 176 } 177 178 @Override 179 public Map<String, Serializable> getParameters(String key) { 180 // TODO NXP-18236: use a transaction? 181 String paramsKey = namespace + join(key, "params"); 182 Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> { 183 return jedis.hgetAll(getBytes(paramsKey)); 184 }); 185 if (paramBytes.isEmpty()) { 186 if (getSummary(key) == null) { 187 return null; 188 } else { 189 return new HashMap<>(); 190 } 191 } 192 Map<String, Serializable> res = deserialize(paramBytes); 193 if (log.isDebugEnabled()) { 194 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res)); 195 } 196 return res; 197 } 198 199 @Override 200 public List<Blob> getBlobs(String key) { 201 // TODO NXP-18236: use a transaction? 202 203 // Get blob count 204 String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> { 205 return jedis.hget(namespace + key, "blobCount"); 206 }); 207 if (log.isDebugEnabled()) { 208 log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", namespace 209 + key, blobCount)); 210 } 211 if (blobCount == null) { 212 // Check for existing parameters 213 Map<String, Serializable> parameters = getParameters(key); 214 if (parameters == null) { 215 return null; 216 } else { 217 return new ArrayList<Blob>(); 218 } 219 } 220 221 // Get blobs 222 int entryBlobCount = Integer.parseInt(blobCount); 223 if (entryBlobCount <= 0) { 224 return new ArrayList<>(); 225 } 226 List<Map<String, String>> blobInfos = new ArrayList<>(); 227 for (int i = 0; i < entryBlobCount; i++) { 228 String blobInfoIndex = String.valueOf(i); 229 Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 230 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 231 Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey); 232 if (blobInfo.isEmpty()) { 233 throw new NuxeoException(String.format( 234 "Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", key, 235 entryBlobCount, blobInfoKey)); 236 } 237 if (log.isDebugEnabled()) { 238 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey, 239 blobInfo)); 240 } 241 return blobInfo; 242 }); 243 blobInfos.add(entryBlobInfo); 244 } 245 246 // Load blobs from the file system 247 return loadBlobs(blobInfos); 248 } 249 250 @Override 251 public long getSize(String key) { 252 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 253 String size = jedis.hget(namespace + key, "size"); 254 if (size == null) { 255 return -1L; 256 } 257 if (log.isDebugEnabled()) { 258 log.debug(String.format("Fetched field \"size\" from Redis hash stored at key %s -> %s", namespace 259 + key, size)); 260 } 261 return Long.parseLong(size); 262 }); 263 } 264 265 @Override 266 public boolean isCompleted(String key) { 267 return redisExecutor.execute((RedisCallable<Boolean>) jedis -> { 268 String completed = jedis.hget(namespace + key, "completed"); 269 if (log.isDebugEnabled()) { 270 log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", namespace 271 + key, completed)); 272 } 273 return Boolean.parseBoolean(completed); 274 }); 275 } 276 277 @Override 278 public void setCompleted(String key, boolean completed) { 279 redisExecutor.execute((RedisCallable<Void>) jedis -> { 280 if (log.isDebugEnabled()) { 281 log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s", 282 completed, namespace + key)); 283 } 284 jedis.hset(namespace + key, "completed", String.valueOf(completed)); 285 return null; 286 }); 287 setTTL(key, firstLevelTTL); 288 } 289 290 @Override 291 public void remove(String key) { 292 // TODO NXP-18236: use a transaction? 293 294 Map<String, String> summary = getSummary(key); 295 if (summary != null) { 296 // Remove blobs 297 String blobCount = summary.get("blobCount"); 298 deleteBlobInfos(key, blobCount); 299 300 // Remove summary 301 redisExecutor.execute((RedisCallable<Long>) jedis -> { 302 Long deleted = jedis.del(namespace + key); 303 if (log.isDebugEnabled()) { 304 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key)); 305 } 306 return deleted; 307 }); 308 309 // Decrement storage size 310 String size = summary.get("size"); 311 if (size != null) { 312 long entrySize = Integer.parseInt(size); 313 if (entrySize > 0) { 314 decrementStorageSize(entrySize); 315 } 316 } 317 } 318 319 // Remove parameters 320 redisExecutor.execute((RedisCallable<Long>) jedis -> { 321 String paramsKey = namespace + join(key, "params"); 322 Long deleted = jedis.del(getBytes(paramsKey)); 323 if (log.isDebugEnabled()) { 324 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey)); 325 } 326 return deleted; 327 }); 328 } 329 330 @Override 331 public void release(String key) { 332 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 333 setTTL(key, secondLevelTTL); 334 } else { 335 remove(key); 336 } 337 } 338 339 @Override 340 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 341 // TODO NXP-18236: use a transaction? 342 343 Map<String, String> oldSummary = getSummary(key); 344 345 // Update storage size 346 long entrySize = -1; 347 if (oldSummary != null) { 348 String size = oldSummary.get("size"); 349 if (size != null) { 350 entrySize = Long.parseLong(size); 351 } 352 } 353 if (entrySize > 0) { 354 incrementStorageSize(sizeOfBlobs - entrySize); 355 } else { 356 if (sizeOfBlobs > 0) { 357 incrementStorageSize(sizeOfBlobs); 358 } 359 } 360 361 // Delete old blobs 362 if (oldSummary != null) { 363 String oldBlobCount = oldSummary.get("blobCount"); 364 deleteBlobInfos(key, oldBlobCount); 365 } 366 367 // Update entry size and blob count 368 final Map<String, String> entrySummary = new HashMap<>(); 369 int blobCount = 0; 370 if (blobInfos != null) { 371 blobCount = blobInfos.size(); 372 } 373 entrySummary.put("blobCount", String.valueOf(blobCount)); 374 entrySummary.put("size", String.valueOf(sizeOfBlobs)); 375 redisExecutor.execute((RedisCallable<Void>) jedis -> { 376 if (log.isDebugEnabled()) { 377 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, namespace 378 + key)); 379 } 380 jedis.hmset(namespace + key, entrySummary); 381 jedis.expire(namespace + key, firstLevelTTL); 382 return null; 383 }); 384 385 // Set new blobs 386 if (blobInfos != null) { 387 int blobsTimeout = firstLevelTTL + 60; 388 for (int i = 0; i < blobInfos.size(); i++) { 389 String blobInfoIndex = String.valueOf(i); 390 Map<String, String> blobInfo = blobInfos.get(i); 391 redisExecutor.execute((RedisCallable<Void>) jedis -> { 392 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 393 if (log.isDebugEnabled()) { 394 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo, 395 blobInfoKey)); 396 } 397 jedis.hmset(blobInfoKey, blobInfo); 398 jedis.expire(blobInfoKey, blobsTimeout); 399 return null; 400 }); 401 } 402 } 403 404 // Set params TTL 405 redisExecutor.execute((RedisCallable<Void>) jedis -> { 406 String paramsKey = namespace + join(key, "params"); 407 jedis.expire(getBytes(paramsKey), firstLevelTTL + 60); 408 return null; 409 }); 410 } 411 412 @Override 413 public long getStorageSize() { 414 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 415 String value = jedis.get(sizeKey); 416 if (value == null) { 417 return 0L; 418 } 419 if (log.isDebugEnabled()) { 420 log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value)); 421 } 422 return Long.parseLong(value); 423 }); 424 } 425 426 @Override 427 protected void setStorageSize(final long newSize) { 428 redisExecutor.execute((RedisCallable<Void>) jedis -> { 429 if (log.isDebugEnabled()) { 430 log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize)); 431 } 432 jedis.set(sizeKey, "" + newSize); 433 return null; 434 }); 435 } 436 437 @Override 438 protected long incrementStorageSize(final long size) { 439 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 440 Long incremented = jedis.incrBy(sizeKey, size); 441 if (log.isDebugEnabled()) { 442 log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented)); 443 } 444 return incremented; 445 }); 446 } 447 448 @Override 449 protected long decrementStorageSize(final long size) { 450 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 451 Long decremented = jedis.decrBy(sizeKey, size); 452 if (log.isDebugEnabled()) { 453 log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented)); 454 } 455 return decremented; 456 }); 457 } 458 459 @Override 460 protected void removeAllEntries() { 461 // TODO NXP-18236: use a transaction? 462 Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> { 463 return jedis.keys(namespace + "*"); 464 }); 465 for (String key : keys) { 466 redisExecutor.execute((RedisCallable<Void>) jedis -> { 467 jedis.del(key); 468 return null; 469 }); 470 } 471 } 472 473 public long getTTL(String key) { 474 long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> { 475 return jedis.ttl(namespace + key); 476 }); 477 if (summaryTTL >= 0) { 478 return summaryTTL; 479 } else { 480 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 481 String paramsKey = namespace + join(key, "params"); 482 return jedis.ttl(getBytes(paramsKey)); 483 }); 484 } 485 } 486 487 protected Map<String, String> getSummary(String key) { 488 return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 489 Map<String, String> summary = jedis.hgetAll(namespace + key); 490 if (summary.isEmpty()) { 491 return null; 492 } 493 if (log.isDebugEnabled()) { 494 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key, 495 summary)); 496 } 497 return summary; 498 }); 499 } 500 501 protected void deleteBlobInfos(String key, String blobCountStr) { 502 if (blobCountStr != null) { 503 int blobCount = Integer.parseInt(blobCountStr); 504 if (blobCount > 0) { 505 for (int i = 0; i < blobCount; i++) { 506 String blobInfoIndex = String.valueOf(i); 507 redisExecutor.execute((RedisCallable<Long>) jedis -> { 508 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 509 Long deleted = jedis.del(blobInfoKey); 510 if (log.isDebugEnabled()) { 511 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey)); 512 } 513 return deleted; 514 }); 515 } 516 } 517 } 518 } 519 520 protected String join(String... fragments) { 521 return StringUtils.join(fragments, ":"); 522 } 523 524 protected byte[] getBytes(String key) { 525 return key.getBytes(StandardCharsets.UTF_8); 526 } 527 528 protected String getString(byte[] bytes) { 529 return new String(bytes, StandardCharsets.UTF_8); 530 } 531 532 protected byte[] serialize(Serializable value) { 533 try { 534 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 535 ObjectOutputStream out = new ObjectOutputStream(baos); 536 out.writeObject(value); 537 out.flush(); 538 out.close(); 539 return baos.toByteArray(); 540 } catch (IOException e) { 541 throw new NuxeoException(e); 542 } 543 } 544 545 protected Serializable deserialize(byte[] bytes) { 546 try { 547 InputStream bain = new ByteArrayInputStream(bytes); 548 ObjectInputStream in = new ObjectInputStream(bain); 549 return (Serializable) in.readObject(); 550 } catch (IOException | ClassNotFoundException e) { 551 throw new NuxeoException(e); 552 } 553 } 554 555 protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) { 556 Map<byte[], byte[]> serializedMap = new HashMap<>(); 557 for (String key : map.keySet()) { 558 serializedMap.put(getBytes(key), serialize(map.get(key))); 559 } 560 return serializedMap; 561 } 562 563 protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) { 564 Map<String, Serializable> map = new HashMap<>(); 565 for (byte[] key : byteMap.keySet()) { 566 map.put(getString(key), deserialize(byteMap.get(key))); 567 } 568 return map; 569 } 570 571 protected void setTTL(String key, int seconds) { 572 573 Map<String, String> summary = getSummary(key); 574 if (summary != null) { 575 // Summary 576 redisExecutor.execute((RedisCallable<Void>) jedis -> { 577 jedis.expire(namespace + key, seconds); 578 return null; 579 }); 580 // Blobs 581 String blobCountStr = summary.get("blobCount"); 582 if (blobCountStr != null) { 583 int blobCount = Integer.parseInt(blobCountStr); 584 if (blobCount > 0) { 585 final int blobsTimeout = seconds + 60; 586 for (int i = 0; i < blobCount; i++) { 587 String blobInfoIndex = String.valueOf(i); 588 redisExecutor.execute((RedisCallable<Void>) jedis -> { 589 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 590 jedis.expire(blobInfoKey, blobsTimeout); 591 return null; 592 }); 593 } 594 } 595 } 596 } 597 // Parameters 598 final int paramsTimeout; 599 if (summary == null) { 600 paramsTimeout = seconds; 601 } else { 602 paramsTimeout = seconds + 60; 603 } 604 redisExecutor.execute((RedisCallable<Void>) jedis -> { 605 String paramsKey = namespace + join(key, "params"); 606 jedis.expire(getBytes(paramsKey), paramsTimeout); 607 return null; 608 }); 609 } 610 611}