001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Thierry Delprat <tdelprat@nuxeo.com>
016 *     Antoine Taillefer <ataillefer@nuxeo.com>
017 */
018
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.Serializable;
028import java.nio.charset.StandardCharsets;
029import java.util.ArrayList;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034
035import org.apache.commons.lang.StringUtils;
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.nuxeo.ecm.core.api.Blob;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.redis.RedisAdmin;
041import org.nuxeo.ecm.core.redis.RedisCallable;
042import org.nuxeo.ecm.core.redis.RedisExecutor;
043import org.nuxeo.ecm.core.transientstore.AbstractTransientStore;
044import org.nuxeo.ecm.core.transientstore.api.TransientStore;
045import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
046import org.nuxeo.runtime.api.Framework;
047
048/**
049 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}.
050 * <p>
051 * Since hashes cannot be nested, a storage entry is flattened as follows:
052 *
053 * <pre>
054 *  - Entry summary:
055 * 
056 *    transientStore:transientStoreName:entryKey {
057 *      "blobCount":    number of blobs associated with the entry
058 *      "size":         storage size of the blobs associated with the entry
059 *      "completed":    entry status
060 *    }
061 * 
062 * - Entry parameters:
063 * 
064 *   transientStore:transientStoreName:entryKey:params {
065 *      "param1": value1
066 *      "param2": value2
067 *   }
068 * 
069 * - Entry blobs:
070 * 
071 *   transientStore:transientStoreName:entryKey:blobs:0 {
072 *      "file"
073 *      "filename"
074 *      "encoding"
075 *      "mimetype"
076 *      "digest"
077 *   }
078 * 
079 *   transientStore:transientStoreName:entryKey:blobs:1 {
080 *      ...
081 *   }
082 * 
083 *   ...
084 * </pre>
085 *
086 * @since 7.2
087 */
088public class RedisTransientStore extends AbstractTransientStore {
089
090    protected RedisExecutor redisExecutor;
091
092    protected String namespace;
093
094    protected String sizeKey;
095
096    protected RedisAdmin redisAdmin;
097
098    protected int firstLevelTTL;
099
100    protected int secondLevelTTL;
101
102    protected Log log = LogFactory.getLog(RedisTransientStore.class);
103
104    public RedisTransientStore() {
105        redisExecutor = Framework.getService(RedisExecutor.class);
106        redisAdmin = Framework.getService(RedisAdmin.class);
107    }
108
109    @Override
110    public void init(TransientStoreConfig config) {
111        log.debug("Initializing RedisTransientStore: " + config.getName());
112        super.init(config);
113
114        namespace = redisAdmin.namespace("transientStore", config.getName());
115        sizeKey = namespace + "size";
116
117        // Use seconds for Redis EXPIRE command
118        firstLevelTTL = config.getFirstLevelTTL() * 60;
119        secondLevelTTL = config.getSecondLevelTTL() * 60;
120    }
121
122    @Override
123    public void shutdown() {
124        log.debug("Shutting down RedisTransientStore: " + config.getName());
125        // Nothing to do here.
126    }
127
128    @Override
129    public boolean exists(String key) {
130        // Jedis#exists(String key) doesn't to work for a key created with hset or hmset
131        return getSummary(key) != null || getParameters(key) != null;
132    }
133
134    @Override
135    public void putParameter(String key, String parameter, Serializable value) {
136        redisExecutor.execute((RedisCallable<Void>) jedis -> {
137            String paramsKey = namespace + join(key, "params");
138            if (log.isDebugEnabled()) {
139                log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter,
140                        value, paramsKey));
141            }
142            jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value));
143            return null;
144        });
145        setTTL(key, firstLevelTTL);
146    }
147
148    @Override
149    public Serializable getParameter(String key, String parameter) {
150        return redisExecutor.execute((RedisCallable<Serializable>) jedis -> {
151            String paramsKey = namespace + join(key, "params");
152            byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter));
153            if (paramBytes == null) {
154                return null;
155            }
156            Serializable res = deserialize(paramBytes);
157            if (log.isDebugEnabled()) {
158                log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter,
159                        paramsKey, res));
160            }
161            return res;
162        });
163    }
164
165    @Override
166    public void putParameters(String key, Map<String, Serializable> parameters) {
167        redisExecutor.execute((RedisCallable<Void>) jedis -> {
168            String paramsKey = namespace + join(key, "params");
169            if (log.isDebugEnabled()) {
170                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey));
171            }
172            jedis.hmset(getBytes(paramsKey), serialize(parameters));
173            return null;
174        });
175        setTTL(key, firstLevelTTL);
176    }
177
178    @Override
179    public Map<String, Serializable> getParameters(String key) {
180        // TODO NXP-18236: use a transaction?
181        String paramsKey = namespace + join(key, "params");
182        Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> {
183            return jedis.hgetAll(getBytes(paramsKey));
184        });
185        if (paramBytes.isEmpty()) {
186            if (getSummary(key) == null) {
187                return null;
188            } else {
189                return new HashMap<>();
190            }
191        }
192        Map<String, Serializable> res = deserialize(paramBytes);
193        if (log.isDebugEnabled()) {
194            log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res));
195        }
196        return res;
197    }
198
199    @Override
200    public List<Blob> getBlobs(String key) {
201        // TODO NXP-18236: use a transaction?
202
203        // Get blob count
204        String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> {
205            return jedis.hget(namespace + key, "blobCount");
206        });
207        if (log.isDebugEnabled()) {
208            log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", namespace
209                    + key, blobCount));
210        }
211        if (blobCount == null) {
212            // Check for existing parameters
213            Map<String, Serializable> parameters = getParameters(key);
214            if (parameters == null) {
215                return null;
216            } else {
217                return new ArrayList<Blob>();
218            }
219        }
220
221        // Get blobs
222        int entryBlobCount = Integer.parseInt(blobCount);
223        if (entryBlobCount <= 0) {
224            return new ArrayList<>();
225        }
226        List<Map<String, String>> blobInfos = new ArrayList<>();
227        for (int i = 0; i < entryBlobCount; i++) {
228            String blobInfoIndex = String.valueOf(i);
229            Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
230                String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
231                Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey);
232                if (blobInfo.isEmpty()) {
233                    throw new NuxeoException(String.format(
234                            "Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", key,
235                            entryBlobCount, blobInfoKey));
236                }
237                if (log.isDebugEnabled()) {
238                    log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey,
239                            blobInfo));
240                }
241                return blobInfo;
242            });
243            blobInfos.add(entryBlobInfo);
244        }
245
246        // Load blobs from the file system
247        return loadBlobs(blobInfos);
248    }
249
250    @Override
251    public long getSize(String key) {
252        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
253            String size = jedis.hget(namespace + key, "size");
254            if (size == null) {
255                return -1L;
256            }
257            if (log.isDebugEnabled()) {
258                log.debug(String.format("Fetched field \"size\" from Redis hash stored at key %s -> %s", namespace
259                        + key, size));
260            }
261            return Long.parseLong(size);
262        });
263    }
264
265    @Override
266    public boolean isCompleted(String key) {
267        return redisExecutor.execute((RedisCallable<Boolean>) jedis -> {
268            String completed = jedis.hget(namespace + key, "completed");
269            if (log.isDebugEnabled()) {
270                log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", namespace
271                        + key, completed));
272            }
273            return Boolean.parseBoolean(completed);
274        });
275    }
276
277    @Override
278    public void setCompleted(String key, boolean completed) {
279        redisExecutor.execute((RedisCallable<Void>) jedis -> {
280            if (log.isDebugEnabled()) {
281                log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s",
282                        completed, namespace + key));
283            }
284            jedis.hset(namespace + key, "completed", String.valueOf(completed));
285            return null;
286        });
287        setTTL(key, firstLevelTTL);
288    }
289
290    @Override
291    public void remove(String key) {
292        // TODO NXP-18236: use a transaction?
293
294        Map<String, String> summary = getSummary(key);
295        if (summary != null) {
296            // Remove blobs
297            String blobCount = summary.get("blobCount");
298            deleteBlobInfos(key, blobCount);
299
300            // Remove summary
301            redisExecutor.execute((RedisCallable<Long>) jedis -> {
302                Long deleted = jedis.del(namespace + key);
303                if (log.isDebugEnabled()) {
304                    log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key));
305                }
306                return deleted;
307            });
308
309            // Decrement storage size
310            String size = summary.get("size");
311            if (size != null) {
312                long entrySize = Integer.parseInt(size);
313                if (entrySize > 0) {
314                    decrementStorageSize(entrySize);
315                }
316            }
317        }
318
319        // Remove parameters
320        redisExecutor.execute((RedisCallable<Long>) jedis -> {
321            String paramsKey = namespace + join(key, "params");
322            Long deleted = jedis.del(getBytes(paramsKey));
323            if (log.isDebugEnabled()) {
324                log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey));
325            }
326            return deleted;
327        });
328    }
329
330    @Override
331    public void release(String key) {
332        if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
333            setTTL(key, secondLevelTTL);
334        } else {
335            remove(key);
336        }
337    }
338
339    @Override
340    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
341        // TODO NXP-18236: use a transaction?
342
343        Map<String, String> oldSummary = getSummary(key);
344
345        // Update storage size
346        long entrySize = -1;
347        if (oldSummary != null) {
348            String size = oldSummary.get("size");
349            if (size != null) {
350                entrySize = Long.parseLong(size);
351            }
352        }
353        if (entrySize > 0) {
354            incrementStorageSize(sizeOfBlobs - entrySize);
355        } else {
356            if (sizeOfBlobs > 0) {
357                incrementStorageSize(sizeOfBlobs);
358            }
359        }
360
361        // Delete old blobs
362        if (oldSummary != null) {
363            String oldBlobCount = oldSummary.get("blobCount");
364            deleteBlobInfos(key, oldBlobCount);
365        }
366
367        // Update entry size and blob count
368        final Map<String, String> entrySummary = new HashMap<>();
369        int blobCount = 0;
370        if (blobInfos != null) {
371            blobCount = blobInfos.size();
372        }
373        entrySummary.put("blobCount", String.valueOf(blobCount));
374        entrySummary.put("size", String.valueOf(sizeOfBlobs));
375        redisExecutor.execute((RedisCallable<Void>) jedis -> {
376            if (log.isDebugEnabled()) {
377                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, namespace
378                        + key));
379            }
380            jedis.hmset(namespace + key, entrySummary);
381            jedis.expire(namespace + key, firstLevelTTL);
382            return null;
383        });
384
385        // Set new blobs
386        if (blobInfos != null) {
387            int blobsTimeout = firstLevelTTL + 60;
388            for (int i = 0; i < blobInfos.size(); i++) {
389                String blobInfoIndex = String.valueOf(i);
390                Map<String, String> blobInfo = blobInfos.get(i);
391                redisExecutor.execute((RedisCallable<Void>) jedis -> {
392                    String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
393                    if (log.isDebugEnabled()) {
394                        log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo,
395                                blobInfoKey));
396                    }
397                    jedis.hmset(blobInfoKey, blobInfo);
398                    jedis.expire(blobInfoKey, blobsTimeout);
399                    return null;
400                });
401            }
402        }
403
404        // Set params TTL
405        redisExecutor.execute((RedisCallable<Void>) jedis -> {
406            String paramsKey = namespace + join(key, "params");
407            jedis.expire(getBytes(paramsKey), firstLevelTTL + 60);
408            return null;
409        });
410    }
411
412    @Override
413    public long getStorageSize() {
414        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
415            String value = jedis.get(sizeKey);
416            if (value == null) {
417                return 0L;
418            }
419            if (log.isDebugEnabled()) {
420                log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value));
421            }
422            return Long.parseLong(value);
423        });
424    }
425
426    @Override
427    protected void setStorageSize(final long newSize) {
428        redisExecutor.execute((RedisCallable<Void>) jedis -> {
429            if (log.isDebugEnabled()) {
430                log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize));
431            }
432            jedis.set(sizeKey, "" + newSize);
433            return null;
434        });
435    }
436
437    @Override
438    protected long incrementStorageSize(final long size) {
439        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
440            Long incremented = jedis.incrBy(sizeKey, size);
441            if (log.isDebugEnabled()) {
442                log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented));
443            }
444            return incremented;
445        });
446    }
447
448    @Override
449    protected long decrementStorageSize(final long size) {
450        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
451            Long decremented = jedis.decrBy(sizeKey, size);
452            if (log.isDebugEnabled()) {
453                log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented));
454            }
455            return decremented;
456        });
457    }
458
459    @Override
460    protected void removeAllEntries() {
461        // TODO NXP-18236: use a transaction?
462        Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> {
463            return jedis.keys(namespace + "*");
464        });
465        for (String key : keys) {
466            redisExecutor.execute((RedisCallable<Void>) jedis -> {
467                jedis.del(key);
468                return null;
469            });
470        }
471    }
472
473    public long getTTL(String key) {
474        long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> {
475            return jedis.ttl(namespace + key);
476        });
477        if (summaryTTL >= 0) {
478            return summaryTTL;
479        } else {
480            return redisExecutor.execute((RedisCallable<Long>) jedis -> {
481                String paramsKey = namespace + join(key, "params");
482                return jedis.ttl(getBytes(paramsKey));
483            });
484        }
485    }
486
487    protected Map<String, String> getSummary(String key) {
488        return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
489            Map<String, String> summary = jedis.hgetAll(namespace + key);
490            if (summary.isEmpty()) {
491                return null;
492            }
493            if (log.isDebugEnabled()) {
494                log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key,
495                        summary));
496            }
497            return summary;
498        });
499    }
500
501    protected void deleteBlobInfos(String key, String blobCountStr) {
502        if (blobCountStr != null) {
503            int blobCount = Integer.parseInt(blobCountStr);
504            if (blobCount > 0) {
505                for (int i = 0; i < blobCount; i++) {
506                    String blobInfoIndex = String.valueOf(i);
507                    redisExecutor.execute((RedisCallable<Long>) jedis -> {
508                        String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
509                        Long deleted = jedis.del(blobInfoKey);
510                        if (log.isDebugEnabled()) {
511                            log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey));
512                        }
513                        return deleted;
514                    });
515                }
516            }
517        }
518    }
519
520    protected String join(String... fragments) {
521        return StringUtils.join(fragments, ":");
522    }
523
524    protected byte[] getBytes(String key) {
525        return key.getBytes(StandardCharsets.UTF_8);
526    }
527
528    protected String getString(byte[] bytes) {
529        return new String(bytes, StandardCharsets.UTF_8);
530    }
531
532    protected byte[] serialize(Serializable value) {
533        try {
534            ByteArrayOutputStream baos = new ByteArrayOutputStream();
535            ObjectOutputStream out = new ObjectOutputStream(baos);
536            out.writeObject(value);
537            out.flush();
538            out.close();
539            return baos.toByteArray();
540        } catch (IOException e) {
541            throw new NuxeoException(e);
542        }
543    }
544
545    protected Serializable deserialize(byte[] bytes) {
546        try {
547            InputStream bain = new ByteArrayInputStream(bytes);
548            ObjectInputStream in = new ObjectInputStream(bain);
549            return (Serializable) in.readObject();
550        } catch (IOException | ClassNotFoundException e) {
551            throw new NuxeoException(e);
552        }
553    }
554
555    protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) {
556        Map<byte[], byte[]> serializedMap = new HashMap<>();
557        for (String key : map.keySet()) {
558            serializedMap.put(getBytes(key), serialize(map.get(key)));
559        }
560        return serializedMap;
561    }
562
563    protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) {
564        Map<String, Serializable> map = new HashMap<>();
565        for (byte[] key : byteMap.keySet()) {
566            map.put(getString(key), deserialize(byteMap.get(key)));
567        }
568        return map;
569    }
570
571    protected void setTTL(String key, int seconds) {
572
573        Map<String, String> summary = getSummary(key);
574        if (summary != null) {
575            // Summary
576            redisExecutor.execute((RedisCallable<Void>) jedis -> {
577                jedis.expire(namespace + key, seconds);
578                return null;
579            });
580            // Blobs
581            String blobCountStr = summary.get("blobCount");
582            if (blobCountStr != null) {
583                int blobCount = Integer.parseInt(blobCountStr);
584                if (blobCount > 0) {
585                    final int blobsTimeout = seconds + 60;
586                    for (int i = 0; i < blobCount; i++) {
587                        String blobInfoIndex = String.valueOf(i);
588                        redisExecutor.execute((RedisCallable<Void>) jedis -> {
589                            String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
590                            jedis.expire(blobInfoKey, blobsTimeout);
591                            return null;
592                        });
593                    }
594                }
595            }
596        }
597        // Parameters
598        final int paramsTimeout;
599        if (summary == null) {
600            paramsTimeout = seconds;
601        } else {
602            paramsTimeout = seconds + 60;
603        }
604        redisExecutor.execute((RedisCallable<Void>) jedis -> {
605            String paramsKey = namespace + join(key, "params");
606            jedis.expire(getBytes(paramsKey), paramsTimeout);
607            return null;
608        });
609    }
610
611}