001/*
002 * (C) Copyright 2015-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.redis.contribs;
022
023import static java.nio.charset.StandardCharsets.UTF_8;
024
025import java.io.ByteArrayInputStream;
026import java.io.ByteArrayOutputStream;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.ObjectInputStream;
030import java.io.ObjectOutputStream;
031import java.io.Serializable;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.function.Function;
038import java.util.regex.Matcher;
039import java.util.regex.Pattern;
040import java.util.stream.Stream;
041
042import org.apache.commons.lang3.StringUtils;
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.nuxeo.ecm.core.api.Blob;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.redis.RedisAdmin;
048import org.nuxeo.ecm.core.redis.RedisCallable;
049import org.nuxeo.ecm.core.redis.RedisExecutor;
050import org.nuxeo.ecm.core.transientstore.AbstractTransientStore;
051import org.nuxeo.ecm.core.transientstore.api.TransientStore;
052import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
053import org.nuxeo.runtime.api.Framework;
054
055/**
056 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}.
057 * <p>
058 * Since hashes cannot be nested, a storage entry is flattened as follows:
059 *
060 * <pre>
061 *  - Entry summary:
062 *
063 *    transientStore:transientStoreName:entryKey {
064 *      "blobCount":    number of blobs associated with the entry
065 *      "size":         storage size of the blobs associated with the entry
066 *      "completed":    entry status
067 *    }
068 *
069 * - Entry parameters:
070 *
071 *   transientStore:transientStoreName:entryKey:params {
072 *      "param1": value1
073 *      "param2": value2
074 *   }
075 *
076 * - Entry blobs:
077 *
078 *   transientStore:transientStoreName:entryKey:blobs:0 {
079 *      "file"
080 *      "filename"
081 *      "encoding"
082 *      "mimetype"
083 *      "digest"
084 *   }
085 *
086 *   transientStore:transientStoreName:entryKey:blobs:1 {
087 *      ...
088 *   }
089 *
090 *   ...
091 * </pre>
092 *
093 * @since 7.2
094 */
095public class RedisTransientStore extends AbstractTransientStore {
096
097    protected static final String SIZE_KEY = "size";
098
099    protected RedisExecutor redisExecutor;
100
101    protected String namespace;
102
103    protected String sizeKey;
104
105    protected KeyMatcher keyMatcher;
106
107    protected RedisAdmin redisAdmin;
108
109    protected int firstLevelTTL;
110
111    protected int secondLevelTTL;
112
113    protected Log log = LogFactory.getLog(RedisTransientStore.class);
114
115    public RedisTransientStore() {
116        redisExecutor = Framework.getService(RedisExecutor.class);
117        redisAdmin = Framework.getService(RedisAdmin.class);
118    }
119
120    @Override
121    public void init(TransientStoreConfig config) {
122        log.debug("Initializing RedisTransientStore: " + config.getName());
123        super.init(config);
124
125        namespace = redisAdmin.namespace("transientStore", config.getName());
126        sizeKey = namespace + SIZE_KEY;
127        keyMatcher = new KeyMatcher();
128
129        // Use seconds for Redis EXPIRE command
130        firstLevelTTL = config.getFirstLevelTTL() * 60;
131        secondLevelTTL = config.getSecondLevelTTL() * 60;
132    }
133
134    @Override
135    public void shutdown() {
136        log.debug("Shutting down RedisTransientStore: " + config.getName());
137        // Nothing to do here.
138    }
139
140    @Override
141    public boolean exists(String key) {
142        // Jedis#exists(String key) doesn't to work for a key created with hset or hmset
143        return getSummary(key) != null || getParameters(key) != null;
144    }
145
146    @Override
147    public Stream<String> keyStream() {
148        return redisExecutor.execute((RedisCallable<Stream<String>>) jedis -> {
149            return jedis.keys(namespace + "*") //
150                        .stream()
151                        .map(keyMatcher)
152                        .filter(key -> !SIZE_KEY.equals(key));
153        });
154    }
155
156    protected class KeyMatcher implements Function<String, String> {
157
158        protected final Pattern KEY_PATTERN = Pattern.compile("(.*?)(:(params|blobs:[0-9]+))?");
159
160        protected final int offset = namespace.length();
161
162        @Override
163        public String apply(String t) {
164            final Matcher m = KEY_PATTERN.matcher(t.substring(offset));
165            m.matches();
166            return m.group(1);
167        }
168
169    }
170
171    @Override
172    public void putParameter(String key, String parameter, Serializable value) {
173        redisExecutor.execute((RedisCallable<Void>) jedis -> {
174            String paramsKey = namespace + join(key, "params");
175            if (log.isDebugEnabled()) {
176                log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, value,
177                        paramsKey));
178            }
179            jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value));
180            return null;
181        });
182        setTTL(key, firstLevelTTL);
183    }
184
185    @Override
186    public Serializable getParameter(String key, String parameter) {
187        return redisExecutor.execute((RedisCallable<Serializable>) jedis -> {
188            String paramsKey = namespace + join(key, "params");
189            byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter));
190            if (paramBytes == null) {
191                return null;
192            }
193            Serializable res = deserialize(paramBytes);
194            if (log.isDebugEnabled()) {
195                log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, paramsKey,
196                        res));
197            }
198            return res;
199        });
200    }
201
202    @Override
203    public void putParameters(String key, Map<String, Serializable> parameters) {
204        redisExecutor.execute((RedisCallable<Void>) jedis -> {
205            String paramsKey = namespace + join(key, "params");
206            if (log.isDebugEnabled()) {
207                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey));
208            }
209            jedis.hmset(getBytes(paramsKey), serialize(parameters));
210            return null;
211        });
212        setTTL(key, firstLevelTTL);
213    }
214
215    @Override
216    public Map<String, Serializable> getParameters(String key) {
217        // TODO NXP-18236: use a transaction?
218        String paramsKey = namespace + join(key, "params");
219        Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> {
220            return jedis.hgetAll(getBytes(paramsKey));
221        });
222        if (paramBytes.isEmpty()) {
223            if (getSummary(key) == null) {
224                return null;
225            } else {
226                return new HashMap<>();
227            }
228        }
229        Map<String, Serializable> res = deserialize(paramBytes);
230        if (log.isDebugEnabled()) {
231            log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res));
232        }
233        return res;
234    }
235
236    @Override
237    public List<Blob> getBlobs(String key) {
238        // TODO NXP-18236: use a transaction?
239
240        // Get blob count
241        String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> {
242            return jedis.hget(namespace + key, "blobCount");
243        });
244        if (log.isDebugEnabled()) {
245            log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s",
246                    namespace + key, blobCount));
247        }
248        if (blobCount == null) {
249            // Check for existing parameters
250            Map<String, Serializable> parameters = getParameters(key);
251            if (parameters == null) {
252                return null;
253            } else {
254                return new ArrayList<>();
255            }
256        }
257
258        // Get blobs
259        int entryBlobCount = Integer.parseInt(blobCount);
260        if (entryBlobCount <= 0) {
261            return new ArrayList<>();
262        }
263        List<Map<String, String>> blobInfos = new ArrayList<>();
264        for (int i = 0; i < entryBlobCount; i++) {
265            String blobInfoIndex = String.valueOf(i);
266            Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
267                String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
268                Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey);
269                if (blobInfo.isEmpty()) {
270                    throw new NuxeoException(
271                            String.format("Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist",
272                                    key, entryBlobCount, blobInfoKey));
273                }
274                if (log.isDebugEnabled()) {
275                    log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey,
276                            blobInfo));
277                }
278                return blobInfo;
279            });
280            blobInfos.add(entryBlobInfo);
281        }
282
283        // Load blobs from the file system
284        return loadBlobs(blobInfos);
285    }
286
287    @Override
288    public long getSize(String key) {
289        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
290            String size = jedis.hget(namespace + key, SIZE_KEY);
291            if (size == null) {
292                return -1L;
293            }
294            if (log.isDebugEnabled()) {
295                log.debug(String.format("Fetched field \"%s\" from Redis hash stored at key %s -> %s", SIZE_KEY,
296                        namespace + key, size));
297            }
298            return Long.parseLong(size);
299        });
300    }
301
302    @Override
303    public boolean isCompleted(String key) {
304        return redisExecutor.execute((RedisCallable<Boolean>) jedis -> {
305            String completed = jedis.hget(namespace + key, "completed");
306            if (log.isDebugEnabled()) {
307                log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s",
308                        namespace + key, completed));
309            }
310            return Boolean.parseBoolean(completed);
311        });
312    }
313
314    @Override
315    public void setCompleted(String key, boolean completed) {
316        redisExecutor.execute((RedisCallable<Void>) jedis -> {
317            if (log.isDebugEnabled()) {
318                log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s",
319                        completed, namespace + key));
320            }
321            jedis.hset(namespace + key, "completed", String.valueOf(completed));
322            return null;
323        });
324        setTTL(key, firstLevelTTL);
325    }
326
327    @Override
328    public void release(String key) {
329        if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
330            setTTL(key, secondLevelTTL);
331        } else {
332            remove(key);
333        }
334    }
335
336    @Override
337    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
338        // TODO NXP-18236: use a transaction?
339
340        Map<String, String> oldSummary = getSummary(key);
341
342        // Update storage size
343        long entrySize = -1;
344        if (oldSummary != null) {
345            String size = oldSummary.get(SIZE_KEY);
346            if (size != null) {
347                entrySize = Long.parseLong(size);
348            }
349        }
350        if (entrySize > 0) {
351            incrementStorageSize(sizeOfBlobs - entrySize);
352        } else {
353            if (sizeOfBlobs > 0) {
354                incrementStorageSize(sizeOfBlobs);
355            }
356        }
357
358        // Delete old blobs
359        if (oldSummary != null) {
360            String oldBlobCount = oldSummary.get("blobCount");
361            deleteBlobInfos(key, oldBlobCount);
362        }
363
364        // Update entry size and blob count
365        final Map<String, String> entrySummary = new HashMap<>();
366        int blobCount = 0;
367        if (blobInfos != null) {
368            blobCount = blobInfos.size();
369        }
370        entrySummary.put("blobCount", String.valueOf(blobCount));
371        entrySummary.put(SIZE_KEY, String.valueOf(sizeOfBlobs));
372        redisExecutor.execute((RedisCallable<Void>) jedis -> {
373            if (log.isDebugEnabled()) {
374                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary,
375                        namespace + key));
376            }
377            jedis.hmset(namespace + key, entrySummary);
378            jedis.expire(namespace + key, firstLevelTTL);
379            return null;
380        });
381
382        // Set new blobs
383        if (blobInfos != null) {
384            int blobsTimeout = firstLevelTTL + 60;
385            for (int i = 0; i < blobInfos.size(); i++) {
386                String blobInfoIndex = String.valueOf(i);
387                Map<String, String> blobInfo = blobInfos.get(i);
388                redisExecutor.execute((RedisCallable<Void>) jedis -> {
389                    String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
390                    if (log.isDebugEnabled()) {
391                        log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo,
392                                blobInfoKey));
393                    }
394                    jedis.hmset(blobInfoKey, blobInfo);
395                    jedis.expire(blobInfoKey, blobsTimeout);
396                    return null;
397                });
398            }
399        }
400
401        // Set params TTL
402        redisExecutor.execute((RedisCallable<Void>) jedis -> {
403            String paramsKey = namespace + join(key, "params");
404            jedis.expire(getBytes(paramsKey), firstLevelTTL + 60);
405            return null;
406        });
407    }
408
409    @Override
410    public long getStorageSize() {
411        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
412            String value = jedis.get(sizeKey);
413            if (value == null) {
414                return 0L;
415            }
416            if (log.isDebugEnabled()) {
417                log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value));
418            }
419            return Long.parseLong(value);
420        });
421    }
422
423    @Override
424    protected void setStorageSize(final long newSize) {
425        redisExecutor.execute((RedisCallable<Void>) jedis -> {
426            if (log.isDebugEnabled()) {
427                log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize));
428            }
429            jedis.set(sizeKey, "" + newSize);
430            return null;
431        });
432    }
433
434    @Override
435    protected long incrementStorageSize(final long size) {
436        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
437            Long incremented = jedis.incrBy(sizeKey, size);
438            if (log.isDebugEnabled()) {
439                log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented));
440            }
441            return incremented;
442        });
443    }
444
445    @Override
446    protected long decrementStorageSize(final long size) {
447        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
448            Long decremented = jedis.decrBy(sizeKey, size);
449            if (log.isDebugEnabled()) {
450                log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented));
451            }
452            return decremented;
453        });
454    }
455
456    @Override
457    protected void removeEntry(String key) {
458        // TODO NXP-18236: use a transaction?
459
460        Map<String, String> summary = getSummary(key);
461        if (summary != null) {
462            // Remove blobs
463            String blobCount = summary.get("blobCount");
464            deleteBlobInfos(key, blobCount);
465
466            // Remove summary
467            redisExecutor.execute((RedisCallable<Long>) jedis -> {
468                Long deleted = jedis.del(namespace + key);
469                if (log.isDebugEnabled()) {
470                    log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key));
471                }
472                return deleted;
473            });
474
475            // Decrement storage size
476            String size = summary.get(SIZE_KEY);
477            if (size != null) {
478                long entrySize = Integer.parseInt(size);
479                if (entrySize > 0) {
480                    decrementStorageSize(entrySize);
481                }
482            }
483        }
484
485        // Remove parameters
486        redisExecutor.execute((RedisCallable<Long>) jedis -> {
487            String paramsKey = namespace + join(key, "params");
488            Long deleted = jedis.del(getBytes(paramsKey));
489            if (log.isDebugEnabled()) {
490                log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey));
491            }
492            return deleted;
493        });
494    }
495
496    @Override
497    protected void removeAllEntries() {
498        // TODO NXP-18236: use a transaction?
499        Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> {
500            return jedis.keys(namespace + "*");
501        });
502        for (String key : keys) {
503            redisExecutor.execute((RedisCallable<Void>) jedis -> {
504                jedis.del(key);
505                return null;
506            });
507        }
508    }
509
510    public long getTTL(String key) {
511        long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> {
512            return jedis.ttl(namespace + key);
513        });
514        if (summaryTTL >= 0) {
515            return summaryTTL;
516        } else {
517            return redisExecutor.execute((RedisCallable<Long>) jedis -> {
518                String paramsKey = namespace + join(key, "params");
519                return jedis.ttl(getBytes(paramsKey));
520            });
521        }
522    }
523
524    protected Map<String, String> getSummary(String key) {
525        return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
526            Map<String, String> summary = jedis.hgetAll(namespace + key);
527            if (summary.isEmpty()) {
528                return null;
529            }
530            if (log.isDebugEnabled()) {
531                log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key,
532                        summary));
533            }
534            return summary;
535        });
536    }
537
538    protected void deleteBlobInfos(String key, String blobCountStr) {
539        if (blobCountStr != null) {
540            int blobCount = Integer.parseInt(blobCountStr);
541            if (blobCount > 0) {
542                for (int i = 0; i < blobCount; i++) {
543                    String blobInfoIndex = String.valueOf(i);
544                    redisExecutor.execute((RedisCallable<Long>) jedis -> {
545                        String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
546                        Long deleted = jedis.del(blobInfoKey);
547                        if (log.isDebugEnabled()) {
548                            log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey));
549                        }
550                        return deleted;
551                    });
552                }
553            }
554        }
555    }
556
557    protected String join(String... fragments) {
558        return StringUtils.join(fragments, ":");
559    }
560
561    protected byte[] getBytes(String key) {
562        return key.getBytes(UTF_8);
563    }
564
565    protected String getString(byte[] bytes) {
566        return new String(bytes, UTF_8);
567    }
568
569    protected byte[] serialize(Serializable value) {
570        try {
571            ByteArrayOutputStream baos = new ByteArrayOutputStream();
572            ObjectOutputStream out = new ObjectOutputStream(baos);
573            out.writeObject(value);
574            out.flush();
575            out.close();
576            return baos.toByteArray();
577        } catch (IOException e) {
578            throw new NuxeoException(e);
579        }
580    }
581
582    protected Serializable deserialize(byte[] bytes) {
583        try {
584            InputStream bain = new ByteArrayInputStream(bytes);
585            ObjectInputStream in = new ObjectInputStream(bain);
586            return (Serializable) in.readObject();
587        } catch (IOException | ClassNotFoundException e) {
588            throw new NuxeoException(e);
589        }
590    }
591
592    protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) {
593        Map<byte[], byte[]> serializedMap = new HashMap<>();
594        for (String key : map.keySet()) {
595            serializedMap.put(getBytes(key), serialize(map.get(key)));
596        }
597        return serializedMap;
598    }
599
600    protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) {
601        Map<String, Serializable> map = new HashMap<>();
602        for (byte[] key : byteMap.keySet()) {
603            map.put(getString(key), deserialize(byteMap.get(key)));
604        }
605        return map;
606    }
607
608    protected void setTTL(String key, int seconds) {
609
610        Map<String, String> summary = getSummary(key);
611        if (summary != null) {
612            // Summary
613            redisExecutor.execute((RedisCallable<Void>) jedis -> {
614                jedis.expire(namespace + key, seconds);
615                return null;
616            });
617            // Blobs
618            String blobCountStr = summary.get("blobCount");
619            if (blobCountStr != null) {
620                int blobCount = Integer.parseInt(blobCountStr);
621                if (blobCount > 0) {
622                    final int blobsTimeout = seconds + 60;
623                    for (int i = 0; i < blobCount; i++) {
624                        String blobInfoIndex = String.valueOf(i);
625                        redisExecutor.execute((RedisCallable<Void>) jedis -> {
626                            String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
627                            jedis.expire(blobInfoKey, blobsTimeout);
628                            return null;
629                        });
630                    }
631                }
632            }
633        }
634        // Parameters
635        final int paramsTimeout;
636        if (summary == null) {
637            paramsTimeout = seconds;
638        } else {
639            paramsTimeout = seconds + 60;
640        }
641        redisExecutor.execute((RedisCallable<Void>) jedis -> {
642            String paramsKey = namespace + join(key, "params");
643            jedis.expire(getBytes(paramsKey), paramsTimeout);
644            return null;
645        });
646    }
647
648}