001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <tdelprat@nuxeo.com> 018 * Antoine Taillefer <ataillefer@nuxeo.com> 019 */ 020 021package org.nuxeo.ecm.core.redis.contribs; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.ObjectInputStream; 028import java.io.ObjectOutputStream; 029import java.io.Serializable; 030import java.nio.charset.StandardCharsets; 031import java.util.ArrayList; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036 037import org.apache.commons.lang.StringUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.Blob; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.redis.RedisAdmin; 043import org.nuxeo.ecm.core.redis.RedisCallable; 044import org.nuxeo.ecm.core.redis.RedisExecutor; 045import org.nuxeo.ecm.core.transientstore.AbstractTransientStore; 046import org.nuxeo.ecm.core.transientstore.api.TransientStore; 047import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig; 048import org.nuxeo.runtime.api.Framework; 049 050/** 051 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}. 052 * <p> 053 * Since hashes cannot be nested, a storage entry is flattened as follows: 054 * 055 * <pre> 056 * - Entry summary: 057 * 058 * transientStore:transientStoreName:entryKey { 059 * "blobCount": number of blobs associated with the entry 060 * "size": storage size of the blobs associated with the entry 061 * "completed": entry status 062 * } 063 * 064 * - Entry parameters: 065 * 066 * transientStore:transientStoreName:entryKey:params { 067 * "param1": value1 068 * "param2": value2 069 * } 070 * 071 * - Entry blobs: 072 * 073 * transientStore:transientStoreName:entryKey:blobs:0 { 074 * "file" 075 * "filename" 076 * "encoding" 077 * "mimetype" 078 * "digest" 079 * } 080 * 081 * transientStore:transientStoreName:entryKey:blobs:1 { 082 * ... 083 * } 084 * 085 * ... 086 * </pre> 087 * 088 * @since 7.2 089 */ 090public class RedisTransientStore extends AbstractTransientStore { 091 092 protected RedisExecutor redisExecutor; 093 094 protected String namespace; 095 096 protected String sizeKey; 097 098 protected RedisAdmin redisAdmin; 099 100 protected int firstLevelTTL; 101 102 protected int secondLevelTTL; 103 104 protected Log log = LogFactory.getLog(RedisTransientStore.class); 105 106 public RedisTransientStore() { 107 redisExecutor = Framework.getService(RedisExecutor.class); 108 redisAdmin = Framework.getService(RedisAdmin.class); 109 } 110 111 @Override 112 public void init(TransientStoreConfig config) { 113 log.debug("Initializing RedisTransientStore: " + config.getName()); 114 super.init(config); 115 116 namespace = redisAdmin.namespace("transientStore", config.getName()); 117 sizeKey = namespace + "size"; 118 119 // Use seconds for Redis EXPIRE command 120 firstLevelTTL = config.getFirstLevelTTL() * 60; 121 secondLevelTTL = config.getSecondLevelTTL() * 60; 122 } 123 124 @Override 125 public void shutdown() { 126 log.debug("Shutting down RedisTransientStore: " + config.getName()); 127 // Nothing to do here. 128 } 129 130 @Override 131 public boolean exists(String key) { 132 // Jedis#exists(String key) doesn't to work for a key created with hset or hmset 133 return getSummary(key) != null || getParameters(key) != null; 134 } 135 136 @Override 137 public void putParameter(String key, String parameter, Serializable value) { 138 redisExecutor.execute((RedisCallable<Void>) jedis -> { 139 String paramsKey = namespace + join(key, "params"); 140 if (log.isDebugEnabled()) { 141 log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, 142 value, paramsKey)); 143 } 144 jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value)); 145 return null; 146 }); 147 setTTL(key, firstLevelTTL); 148 } 149 150 @Override 151 public Serializable getParameter(String key, String parameter) { 152 return redisExecutor.execute((RedisCallable<Serializable>) jedis -> { 153 String paramsKey = namespace + join(key, "params"); 154 byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter)); 155 if (paramBytes == null) { 156 return null; 157 } 158 Serializable res = deserialize(paramBytes); 159 if (log.isDebugEnabled()) { 160 log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, 161 paramsKey, res)); 162 } 163 return res; 164 }); 165 } 166 167 @Override 168 public void putParameters(String key, Map<String, Serializable> parameters) { 169 redisExecutor.execute((RedisCallable<Void>) jedis -> { 170 String paramsKey = namespace + join(key, "params"); 171 if (log.isDebugEnabled()) { 172 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey)); 173 } 174 jedis.hmset(getBytes(paramsKey), serialize(parameters)); 175 return null; 176 }); 177 setTTL(key, firstLevelTTL); 178 } 179 180 @Override 181 public Map<String, Serializable> getParameters(String key) { 182 // TODO NXP-18236: use a transaction? 183 String paramsKey = namespace + join(key, "params"); 184 Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> { 185 return jedis.hgetAll(getBytes(paramsKey)); 186 }); 187 if (paramBytes.isEmpty()) { 188 if (getSummary(key) == null) { 189 return null; 190 } else { 191 return new HashMap<>(); 192 } 193 } 194 Map<String, Serializable> res = deserialize(paramBytes); 195 if (log.isDebugEnabled()) { 196 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res)); 197 } 198 return res; 199 } 200 201 @Override 202 public List<Blob> getBlobs(String key) { 203 // TODO NXP-18236: use a transaction? 204 205 // Get blob count 206 String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> { 207 return jedis.hget(namespace + key, "blobCount"); 208 }); 209 if (log.isDebugEnabled()) { 210 log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", namespace 211 + key, blobCount)); 212 } 213 if (blobCount == null) { 214 // Check for existing parameters 215 Map<String, Serializable> parameters = getParameters(key); 216 if (parameters == null) { 217 return null; 218 } else { 219 return new ArrayList<Blob>(); 220 } 221 } 222 223 // Get blobs 224 int entryBlobCount = Integer.parseInt(blobCount); 225 if (entryBlobCount <= 0) { 226 return new ArrayList<>(); 227 } 228 List<Map<String, String>> blobInfos = new ArrayList<>(); 229 for (int i = 0; i < entryBlobCount; i++) { 230 String blobInfoIndex = String.valueOf(i); 231 Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 232 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 233 Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey); 234 if (blobInfo.isEmpty()) { 235 throw new NuxeoException(String.format( 236 "Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", key, 237 entryBlobCount, blobInfoKey)); 238 } 239 if (log.isDebugEnabled()) { 240 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey, 241 blobInfo)); 242 } 243 return blobInfo; 244 }); 245 blobInfos.add(entryBlobInfo); 246 } 247 248 // Load blobs from the file system 249 return loadBlobs(blobInfos); 250 } 251 252 @Override 253 public long getSize(String key) { 254 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 255 String size = jedis.hget(namespace + key, "size"); 256 if (size == null) { 257 return -1L; 258 } 259 if (log.isDebugEnabled()) { 260 log.debug(String.format("Fetched field \"size\" from Redis hash stored at key %s -> %s", namespace 261 + key, size)); 262 } 263 return Long.parseLong(size); 264 }); 265 } 266 267 @Override 268 public boolean isCompleted(String key) { 269 return redisExecutor.execute((RedisCallable<Boolean>) jedis -> { 270 String completed = jedis.hget(namespace + key, "completed"); 271 if (log.isDebugEnabled()) { 272 log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", namespace 273 + key, completed)); 274 } 275 return Boolean.parseBoolean(completed); 276 }); 277 } 278 279 @Override 280 public void setCompleted(String key, boolean completed) { 281 redisExecutor.execute((RedisCallable<Void>) jedis -> { 282 if (log.isDebugEnabled()) { 283 log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s", 284 completed, namespace + key)); 285 } 286 jedis.hset(namespace + key, "completed", String.valueOf(completed)); 287 return null; 288 }); 289 setTTL(key, firstLevelTTL); 290 } 291 292 @Override 293 public void remove(String key) { 294 // TODO NXP-18236: use a transaction? 295 296 Map<String, String> summary = getSummary(key); 297 if (summary != null) { 298 // Remove blobs 299 String blobCount = summary.get("blobCount"); 300 deleteBlobInfos(key, blobCount); 301 302 // Remove summary 303 redisExecutor.execute((RedisCallable<Long>) jedis -> { 304 Long deleted = jedis.del(namespace + key); 305 if (log.isDebugEnabled()) { 306 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key)); 307 } 308 return deleted; 309 }); 310 311 // Decrement storage size 312 String size = summary.get("size"); 313 if (size != null) { 314 long entrySize = Integer.parseInt(size); 315 if (entrySize > 0) { 316 decrementStorageSize(entrySize); 317 } 318 } 319 } 320 321 // Remove parameters 322 redisExecutor.execute((RedisCallable<Long>) jedis -> { 323 String paramsKey = namespace + join(key, "params"); 324 Long deleted = jedis.del(getBytes(paramsKey)); 325 if (log.isDebugEnabled()) { 326 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey)); 327 } 328 return deleted; 329 }); 330 } 331 332 @Override 333 public void release(String key) { 334 if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) { 335 setTTL(key, secondLevelTTL); 336 } else { 337 remove(key); 338 } 339 } 340 341 @Override 342 protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) { 343 // TODO NXP-18236: use a transaction? 344 345 Map<String, String> oldSummary = getSummary(key); 346 347 // Update storage size 348 long entrySize = -1; 349 if (oldSummary != null) { 350 String size = oldSummary.get("size"); 351 if (size != null) { 352 entrySize = Long.parseLong(size); 353 } 354 } 355 if (entrySize > 0) { 356 incrementStorageSize(sizeOfBlobs - entrySize); 357 } else { 358 if (sizeOfBlobs > 0) { 359 incrementStorageSize(sizeOfBlobs); 360 } 361 } 362 363 // Delete old blobs 364 if (oldSummary != null) { 365 String oldBlobCount = oldSummary.get("blobCount"); 366 deleteBlobInfos(key, oldBlobCount); 367 } 368 369 // Update entry size and blob count 370 final Map<String, String> entrySummary = new HashMap<>(); 371 int blobCount = 0; 372 if (blobInfos != null) { 373 blobCount = blobInfos.size(); 374 } 375 entrySummary.put("blobCount", String.valueOf(blobCount)); 376 entrySummary.put("size", String.valueOf(sizeOfBlobs)); 377 redisExecutor.execute((RedisCallable<Void>) jedis -> { 378 if (log.isDebugEnabled()) { 379 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, namespace 380 + key)); 381 } 382 jedis.hmset(namespace + key, entrySummary); 383 jedis.expire(namespace + key, firstLevelTTL); 384 return null; 385 }); 386 387 // Set new blobs 388 if (blobInfos != null) { 389 int blobsTimeout = firstLevelTTL + 60; 390 for (int i = 0; i < blobInfos.size(); i++) { 391 String blobInfoIndex = String.valueOf(i); 392 Map<String, String> blobInfo = blobInfos.get(i); 393 redisExecutor.execute((RedisCallable<Void>) jedis -> { 394 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 395 if (log.isDebugEnabled()) { 396 log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo, 397 blobInfoKey)); 398 } 399 jedis.hmset(blobInfoKey, blobInfo); 400 jedis.expire(blobInfoKey, blobsTimeout); 401 return null; 402 }); 403 } 404 } 405 406 // Set params TTL 407 redisExecutor.execute((RedisCallable<Void>) jedis -> { 408 String paramsKey = namespace + join(key, "params"); 409 jedis.expire(getBytes(paramsKey), firstLevelTTL + 60); 410 return null; 411 }); 412 } 413 414 @Override 415 public long getStorageSize() { 416 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 417 String value = jedis.get(sizeKey); 418 if (value == null) { 419 return 0L; 420 } 421 if (log.isDebugEnabled()) { 422 log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value)); 423 } 424 return Long.parseLong(value); 425 }); 426 } 427 428 @Override 429 protected void setStorageSize(final long newSize) { 430 redisExecutor.execute((RedisCallable<Void>) jedis -> { 431 if (log.isDebugEnabled()) { 432 log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize)); 433 } 434 jedis.set(sizeKey, "" + newSize); 435 return null; 436 }); 437 } 438 439 @Override 440 protected long incrementStorageSize(final long size) { 441 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 442 Long incremented = jedis.incrBy(sizeKey, size); 443 if (log.isDebugEnabled()) { 444 log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented)); 445 } 446 return incremented; 447 }); 448 } 449 450 @Override 451 protected long decrementStorageSize(final long size) { 452 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 453 Long decremented = jedis.decrBy(sizeKey, size); 454 if (log.isDebugEnabled()) { 455 log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented)); 456 } 457 return decremented; 458 }); 459 } 460 461 @Override 462 protected void removeAllEntries() { 463 // TODO NXP-18236: use a transaction? 464 Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> { 465 return jedis.keys(namespace + "*"); 466 }); 467 for (String key : keys) { 468 redisExecutor.execute((RedisCallable<Void>) jedis -> { 469 jedis.del(key); 470 return null; 471 }); 472 } 473 } 474 475 public long getTTL(String key) { 476 long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> { 477 return jedis.ttl(namespace + key); 478 }); 479 if (summaryTTL >= 0) { 480 return summaryTTL; 481 } else { 482 return redisExecutor.execute((RedisCallable<Long>) jedis -> { 483 String paramsKey = namespace + join(key, "params"); 484 return jedis.ttl(getBytes(paramsKey)); 485 }); 486 } 487 } 488 489 protected Map<String, String> getSummary(String key) { 490 return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> { 491 Map<String, String> summary = jedis.hgetAll(namespace + key); 492 if (summary.isEmpty()) { 493 return null; 494 } 495 if (log.isDebugEnabled()) { 496 log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key, 497 summary)); 498 } 499 return summary; 500 }); 501 } 502 503 protected void deleteBlobInfos(String key, String blobCountStr) { 504 if (blobCountStr != null) { 505 int blobCount = Integer.parseInt(blobCountStr); 506 if (blobCount > 0) { 507 for (int i = 0; i < blobCount; i++) { 508 String blobInfoIndex = String.valueOf(i); 509 redisExecutor.execute((RedisCallable<Long>) jedis -> { 510 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 511 Long deleted = jedis.del(blobInfoKey); 512 if (log.isDebugEnabled()) { 513 log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey)); 514 } 515 return deleted; 516 }); 517 } 518 } 519 } 520 } 521 522 protected String join(String... fragments) { 523 return StringUtils.join(fragments, ":"); 524 } 525 526 protected byte[] getBytes(String key) { 527 return key.getBytes(StandardCharsets.UTF_8); 528 } 529 530 protected String getString(byte[] bytes) { 531 return new String(bytes, StandardCharsets.UTF_8); 532 } 533 534 protected byte[] serialize(Serializable value) { 535 try { 536 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 537 ObjectOutputStream out = new ObjectOutputStream(baos); 538 out.writeObject(value); 539 out.flush(); 540 out.close(); 541 return baos.toByteArray(); 542 } catch (IOException e) { 543 throw new NuxeoException(e); 544 } 545 } 546 547 protected Serializable deserialize(byte[] bytes) { 548 try { 549 InputStream bain = new ByteArrayInputStream(bytes); 550 ObjectInputStream in = new ObjectInputStream(bain); 551 return (Serializable) in.readObject(); 552 } catch (IOException | ClassNotFoundException e) { 553 throw new NuxeoException(e); 554 } 555 } 556 557 protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) { 558 Map<byte[], byte[]> serializedMap = new HashMap<>(); 559 for (String key : map.keySet()) { 560 serializedMap.put(getBytes(key), serialize(map.get(key))); 561 } 562 return serializedMap; 563 } 564 565 protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) { 566 Map<String, Serializable> map = new HashMap<>(); 567 for (byte[] key : byteMap.keySet()) { 568 map.put(getString(key), deserialize(byteMap.get(key))); 569 } 570 return map; 571 } 572 573 protected void setTTL(String key, int seconds) { 574 575 Map<String, String> summary = getSummary(key); 576 if (summary != null) { 577 // Summary 578 redisExecutor.execute((RedisCallable<Void>) jedis -> { 579 jedis.expire(namespace + key, seconds); 580 return null; 581 }); 582 // Blobs 583 String blobCountStr = summary.get("blobCount"); 584 if (blobCountStr != null) { 585 int blobCount = Integer.parseInt(blobCountStr); 586 if (blobCount > 0) { 587 final int blobsTimeout = seconds + 60; 588 for (int i = 0; i < blobCount; i++) { 589 String blobInfoIndex = String.valueOf(i); 590 redisExecutor.execute((RedisCallable<Void>) jedis -> { 591 String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex); 592 jedis.expire(blobInfoKey, blobsTimeout); 593 return null; 594 }); 595 } 596 } 597 } 598 } 599 // Parameters 600 final int paramsTimeout; 601 if (summary == null) { 602 paramsTimeout = seconds; 603 } else { 604 paramsTimeout = seconds + 60; 605 } 606 redisExecutor.execute((RedisCallable<Void>) jedis -> { 607 String paramsKey = namespace + join(key, "params"); 608 jedis.expire(getBytes(paramsKey), paramsTimeout); 609 return null; 610 }); 611 } 612 613}