001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.redis.contribs;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.ObjectInputStream;
028import java.io.ObjectOutputStream;
029import java.io.Serializable;
030import java.nio.charset.StandardCharsets;
031import java.util.ArrayList;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036
037import org.apache.commons.lang.StringUtils;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.Blob;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.redis.RedisAdmin;
043import org.nuxeo.ecm.core.redis.RedisCallable;
044import org.nuxeo.ecm.core.redis.RedisExecutor;
045import org.nuxeo.ecm.core.transientstore.AbstractTransientStore;
046import org.nuxeo.ecm.core.transientstore.api.TransientStore;
047import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
048import org.nuxeo.runtime.api.Framework;
049
050/**
051 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}.
052 * <p>
053 * Since hashes cannot be nested, a storage entry is flattened as follows:
054 *
055 * <pre>
056 *  - Entry summary:
057 * 
058 *    transientStore:transientStoreName:entryKey {
059 *      "blobCount":    number of blobs associated with the entry
060 *      "size":         storage size of the blobs associated with the entry
061 *      "completed":    entry status
062 *    }
063 * 
064 * - Entry parameters:
065 * 
066 *   transientStore:transientStoreName:entryKey:params {
067 *      "param1": value1
068 *      "param2": value2
069 *   }
070 * 
071 * - Entry blobs:
072 * 
073 *   transientStore:transientStoreName:entryKey:blobs:0 {
074 *      "file"
075 *      "filename"
076 *      "encoding"
077 *      "mimetype"
078 *      "digest"
079 *   }
080 * 
081 *   transientStore:transientStoreName:entryKey:blobs:1 {
082 *      ...
083 *   }
084 * 
085 *   ...
086 * </pre>
087 *
088 * @since 7.2
089 */
090public class RedisTransientStore extends AbstractTransientStore {
091
092    protected RedisExecutor redisExecutor;
093
094    protected String namespace;
095
096    protected String sizeKey;
097
098    protected RedisAdmin redisAdmin;
099
100    protected int firstLevelTTL;
101
102    protected int secondLevelTTL;
103
104    protected Log log = LogFactory.getLog(RedisTransientStore.class);
105
106    public RedisTransientStore() {
107        redisExecutor = Framework.getService(RedisExecutor.class);
108        redisAdmin = Framework.getService(RedisAdmin.class);
109    }
110
111    @Override
112    public void init(TransientStoreConfig config) {
113        log.debug("Initializing RedisTransientStore: " + config.getName());
114        super.init(config);
115
116        namespace = redisAdmin.namespace("transientStore", config.getName());
117        sizeKey = namespace + "size";
118
119        // Use seconds for Redis EXPIRE command
120        firstLevelTTL = config.getFirstLevelTTL() * 60;
121        secondLevelTTL = config.getSecondLevelTTL() * 60;
122    }
123
124    @Override
125    public void shutdown() {
126        log.debug("Shutting down RedisTransientStore: " + config.getName());
127        // Nothing to do here.
128    }
129
130    @Override
131    public boolean exists(String key) {
132        // Jedis#exists(String key) doesn't to work for a key created with hset or hmset
133        return getSummary(key) != null || getParameters(key) != null;
134    }
135
136    @Override
137    public void putParameter(String key, String parameter, Serializable value) {
138        redisExecutor.execute((RedisCallable<Void>) jedis -> {
139            String paramsKey = namespace + join(key, "params");
140            if (log.isDebugEnabled()) {
141                log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter,
142                        value, paramsKey));
143            }
144            jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value));
145            return null;
146        });
147        setTTL(key, firstLevelTTL);
148    }
149
150    @Override
151    public Serializable getParameter(String key, String parameter) {
152        return redisExecutor.execute((RedisCallable<Serializable>) jedis -> {
153            String paramsKey = namespace + join(key, "params");
154            byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter));
155            if (paramBytes == null) {
156                return null;
157            }
158            Serializable res = deserialize(paramBytes);
159            if (log.isDebugEnabled()) {
160                log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter,
161                        paramsKey, res));
162            }
163            return res;
164        });
165    }
166
167    @Override
168    public void putParameters(String key, Map<String, Serializable> parameters) {
169        redisExecutor.execute((RedisCallable<Void>) jedis -> {
170            String paramsKey = namespace + join(key, "params");
171            if (log.isDebugEnabled()) {
172                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey));
173            }
174            jedis.hmset(getBytes(paramsKey), serialize(parameters));
175            return null;
176        });
177        setTTL(key, firstLevelTTL);
178    }
179
180    @Override
181    public Map<String, Serializable> getParameters(String key) {
182        // TODO NXP-18236: use a transaction?
183        String paramsKey = namespace + join(key, "params");
184        Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> {
185            return jedis.hgetAll(getBytes(paramsKey));
186        });
187        if (paramBytes.isEmpty()) {
188            if (getSummary(key) == null) {
189                return null;
190            } else {
191                return new HashMap<>();
192            }
193        }
194        Map<String, Serializable> res = deserialize(paramBytes);
195        if (log.isDebugEnabled()) {
196            log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res));
197        }
198        return res;
199    }
200
201    @Override
202    public List<Blob> getBlobs(String key) {
203        // TODO NXP-18236: use a transaction?
204
205        // Get blob count
206        String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> {
207            return jedis.hget(namespace + key, "blobCount");
208        });
209        if (log.isDebugEnabled()) {
210            log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", namespace
211                    + key, blobCount));
212        }
213        if (blobCount == null) {
214            // Check for existing parameters
215            Map<String, Serializable> parameters = getParameters(key);
216            if (parameters == null) {
217                return null;
218            } else {
219                return new ArrayList<Blob>();
220            }
221        }
222
223        // Get blobs
224        int entryBlobCount = Integer.parseInt(blobCount);
225        if (entryBlobCount <= 0) {
226            return new ArrayList<>();
227        }
228        List<Map<String, String>> blobInfos = new ArrayList<>();
229        for (int i = 0; i < entryBlobCount; i++) {
230            String blobInfoIndex = String.valueOf(i);
231            Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
232                String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
233                Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey);
234                if (blobInfo.isEmpty()) {
235                    throw new NuxeoException(String.format(
236                            "Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", key,
237                            entryBlobCount, blobInfoKey));
238                }
239                if (log.isDebugEnabled()) {
240                    log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey,
241                            blobInfo));
242                }
243                return blobInfo;
244            });
245            blobInfos.add(entryBlobInfo);
246        }
247
248        // Load blobs from the file system
249        return loadBlobs(blobInfos);
250    }
251
252    @Override
253    public long getSize(String key) {
254        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
255            String size = jedis.hget(namespace + key, "size");
256            if (size == null) {
257                return -1L;
258            }
259            if (log.isDebugEnabled()) {
260                log.debug(String.format("Fetched field \"size\" from Redis hash stored at key %s -> %s", namespace
261                        + key, size));
262            }
263            return Long.parseLong(size);
264        });
265    }
266
267    @Override
268    public boolean isCompleted(String key) {
269        return redisExecutor.execute((RedisCallable<Boolean>) jedis -> {
270            String completed = jedis.hget(namespace + key, "completed");
271            if (log.isDebugEnabled()) {
272                log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", namespace
273                        + key, completed));
274            }
275            return Boolean.parseBoolean(completed);
276        });
277    }
278
279    @Override
280    public void setCompleted(String key, boolean completed) {
281        redisExecutor.execute((RedisCallable<Void>) jedis -> {
282            if (log.isDebugEnabled()) {
283                log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s",
284                        completed, namespace + key));
285            }
286            jedis.hset(namespace + key, "completed", String.valueOf(completed));
287            return null;
288        });
289        setTTL(key, firstLevelTTL);
290    }
291
292    @Override
293    public void remove(String key) {
294        // TODO NXP-18236: use a transaction?
295
296        Map<String, String> summary = getSummary(key);
297        if (summary != null) {
298            // Remove blobs
299            String blobCount = summary.get("blobCount");
300            deleteBlobInfos(key, blobCount);
301
302            // Remove summary
303            redisExecutor.execute((RedisCallable<Long>) jedis -> {
304                Long deleted = jedis.del(namespace + key);
305                if (log.isDebugEnabled()) {
306                    log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key));
307                }
308                return deleted;
309            });
310
311            // Decrement storage size
312            String size = summary.get("size");
313            if (size != null) {
314                long entrySize = Integer.parseInt(size);
315                if (entrySize > 0) {
316                    decrementStorageSize(entrySize);
317                }
318            }
319        }
320
321        // Remove parameters
322        redisExecutor.execute((RedisCallable<Long>) jedis -> {
323            String paramsKey = namespace + join(key, "params");
324            Long deleted = jedis.del(getBytes(paramsKey));
325            if (log.isDebugEnabled()) {
326                log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey));
327            }
328            return deleted;
329        });
330    }
331
332    @Override
333    public void release(String key) {
334        if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
335            setTTL(key, secondLevelTTL);
336        } else {
337            remove(key);
338        }
339    }
340
341    @Override
342    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
343        // TODO NXP-18236: use a transaction?
344
345        Map<String, String> oldSummary = getSummary(key);
346
347        // Update storage size
348        long entrySize = -1;
349        if (oldSummary != null) {
350            String size = oldSummary.get("size");
351            if (size != null) {
352                entrySize = Long.parseLong(size);
353            }
354        }
355        if (entrySize > 0) {
356            incrementStorageSize(sizeOfBlobs - entrySize);
357        } else {
358            if (sizeOfBlobs > 0) {
359                incrementStorageSize(sizeOfBlobs);
360            }
361        }
362
363        // Delete old blobs
364        if (oldSummary != null) {
365            String oldBlobCount = oldSummary.get("blobCount");
366            deleteBlobInfos(key, oldBlobCount);
367        }
368
369        // Update entry size and blob count
370        final Map<String, String> entrySummary = new HashMap<>();
371        int blobCount = 0;
372        if (blobInfos != null) {
373            blobCount = blobInfos.size();
374        }
375        entrySummary.put("blobCount", String.valueOf(blobCount));
376        entrySummary.put("size", String.valueOf(sizeOfBlobs));
377        redisExecutor.execute((RedisCallable<Void>) jedis -> {
378            if (log.isDebugEnabled()) {
379                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, namespace
380                        + key));
381            }
382            jedis.hmset(namespace + key, entrySummary);
383            jedis.expire(namespace + key, firstLevelTTL);
384            return null;
385        });
386
387        // Set new blobs
388        if (blobInfos != null) {
389            int blobsTimeout = firstLevelTTL + 60;
390            for (int i = 0; i < blobInfos.size(); i++) {
391                String blobInfoIndex = String.valueOf(i);
392                Map<String, String> blobInfo = blobInfos.get(i);
393                redisExecutor.execute((RedisCallable<Void>) jedis -> {
394                    String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
395                    if (log.isDebugEnabled()) {
396                        log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo,
397                                blobInfoKey));
398                    }
399                    jedis.hmset(blobInfoKey, blobInfo);
400                    jedis.expire(blobInfoKey, blobsTimeout);
401                    return null;
402                });
403            }
404        }
405
406        // Set params TTL
407        redisExecutor.execute((RedisCallable<Void>) jedis -> {
408            String paramsKey = namespace + join(key, "params");
409            jedis.expire(getBytes(paramsKey), firstLevelTTL + 60);
410            return null;
411        });
412    }
413
414    @Override
415    public long getStorageSize() {
416        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
417            String value = jedis.get(sizeKey);
418            if (value == null) {
419                return 0L;
420            }
421            if (log.isDebugEnabled()) {
422                log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value));
423            }
424            return Long.parseLong(value);
425        });
426    }
427
428    @Override
429    protected void setStorageSize(final long newSize) {
430        redisExecutor.execute((RedisCallable<Void>) jedis -> {
431            if (log.isDebugEnabled()) {
432                log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize));
433            }
434            jedis.set(sizeKey, "" + newSize);
435            return null;
436        });
437    }
438
439    @Override
440    protected long incrementStorageSize(final long size) {
441        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
442            Long incremented = jedis.incrBy(sizeKey, size);
443            if (log.isDebugEnabled()) {
444                log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented));
445            }
446            return incremented;
447        });
448    }
449
450    @Override
451    protected long decrementStorageSize(final long size) {
452        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
453            Long decremented = jedis.decrBy(sizeKey, size);
454            if (log.isDebugEnabled()) {
455                log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented));
456            }
457            return decremented;
458        });
459    }
460
461    @Override
462    protected void removeAllEntries() {
463        // TODO NXP-18236: use a transaction?
464        Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> {
465            return jedis.keys(namespace + "*");
466        });
467        for (String key : keys) {
468            redisExecutor.execute((RedisCallable<Void>) jedis -> {
469                jedis.del(key);
470                return null;
471            });
472        }
473    }
474
475    public long getTTL(String key) {
476        long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> {
477            return jedis.ttl(namespace + key);
478        });
479        if (summaryTTL >= 0) {
480            return summaryTTL;
481        } else {
482            return redisExecutor.execute((RedisCallable<Long>) jedis -> {
483                String paramsKey = namespace + join(key, "params");
484                return jedis.ttl(getBytes(paramsKey));
485            });
486        }
487    }
488
489    protected Map<String, String> getSummary(String key) {
490        return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
491            Map<String, String> summary = jedis.hgetAll(namespace + key);
492            if (summary.isEmpty()) {
493                return null;
494            }
495            if (log.isDebugEnabled()) {
496                log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key,
497                        summary));
498            }
499            return summary;
500        });
501    }
502
503    protected void deleteBlobInfos(String key, String blobCountStr) {
504        if (blobCountStr != null) {
505            int blobCount = Integer.parseInt(blobCountStr);
506            if (blobCount > 0) {
507                for (int i = 0; i < blobCount; i++) {
508                    String blobInfoIndex = String.valueOf(i);
509                    redisExecutor.execute((RedisCallable<Long>) jedis -> {
510                        String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
511                        Long deleted = jedis.del(blobInfoKey);
512                        if (log.isDebugEnabled()) {
513                            log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey));
514                        }
515                        return deleted;
516                    });
517                }
518            }
519        }
520    }
521
522    protected String join(String... fragments) {
523        return StringUtils.join(fragments, ":");
524    }
525
526    protected byte[] getBytes(String key) {
527        return key.getBytes(StandardCharsets.UTF_8);
528    }
529
530    protected String getString(byte[] bytes) {
531        return new String(bytes, StandardCharsets.UTF_8);
532    }
533
534    protected byte[] serialize(Serializable value) {
535        try {
536            ByteArrayOutputStream baos = new ByteArrayOutputStream();
537            ObjectOutputStream out = new ObjectOutputStream(baos);
538            out.writeObject(value);
539            out.flush();
540            out.close();
541            return baos.toByteArray();
542        } catch (IOException e) {
543            throw new NuxeoException(e);
544        }
545    }
546
547    protected Serializable deserialize(byte[] bytes) {
548        try {
549            InputStream bain = new ByteArrayInputStream(bytes);
550            ObjectInputStream in = new ObjectInputStream(bain);
551            return (Serializable) in.readObject();
552        } catch (IOException | ClassNotFoundException e) {
553            throw new NuxeoException(e);
554        }
555    }
556
557    protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) {
558        Map<byte[], byte[]> serializedMap = new HashMap<>();
559        for (String key : map.keySet()) {
560            serializedMap.put(getBytes(key), serialize(map.get(key)));
561        }
562        return serializedMap;
563    }
564
565    protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) {
566        Map<String, Serializable> map = new HashMap<>();
567        for (byte[] key : byteMap.keySet()) {
568            map.put(getString(key), deserialize(byteMap.get(key)));
569        }
570        return map;
571    }
572
573    protected void setTTL(String key, int seconds) {
574
575        Map<String, String> summary = getSummary(key);
576        if (summary != null) {
577            // Summary
578            redisExecutor.execute((RedisCallable<Void>) jedis -> {
579                jedis.expire(namespace + key, seconds);
580                return null;
581            });
582            // Blobs
583            String blobCountStr = summary.get("blobCount");
584            if (blobCountStr != null) {
585                int blobCount = Integer.parseInt(blobCountStr);
586                if (blobCount > 0) {
587                    final int blobsTimeout = seconds + 60;
588                    for (int i = 0; i < blobCount; i++) {
589                        String blobInfoIndex = String.valueOf(i);
590                        redisExecutor.execute((RedisCallable<Void>) jedis -> {
591                            String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
592                            jedis.expire(blobInfoKey, blobsTimeout);
593                            return null;
594                        });
595                    }
596                }
597            }
598        }
599        // Parameters
600        final int paramsTimeout;
601        if (summary == null) {
602            paramsTimeout = seconds;
603        } else {
604            paramsTimeout = seconds + 60;
605        }
606        redisExecutor.execute((RedisCallable<Void>) jedis -> {
607            String paramsKey = namespace + join(key, "params");
608            jedis.expire(getBytes(paramsKey), paramsTimeout);
609            return null;
610        });
611    }
612
613}