001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.redis.contribs;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.ObjectInputStream;
028import java.io.ObjectOutputStream;
029import java.io.Serializable;
030import java.nio.charset.StandardCharsets;
031import java.util.ArrayList;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.function.Function;
037import java.util.regex.Matcher;
038import java.util.regex.Pattern;
039import java.util.stream.Collectors;
040
041import org.apache.commons.lang.StringUtils;
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.nuxeo.ecm.core.api.Blob;
045import org.nuxeo.ecm.core.api.NuxeoException;
046import org.nuxeo.ecm.core.redis.RedisAdmin;
047import org.nuxeo.ecm.core.redis.RedisCallable;
048import org.nuxeo.ecm.core.redis.RedisExecutor;
049import org.nuxeo.ecm.core.transientstore.AbstractTransientStore;
050import org.nuxeo.ecm.core.transientstore.api.TransientStore;
051import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
052import org.nuxeo.runtime.api.Framework;
053
054/**
055 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}.
056 * <p>
057 * Since hashes cannot be nested, a storage entry is flattened as follows:
058 *
059 * <pre>
060 *  - Entry summary:
061 *
062 *    transientStore:transientStoreName:entryKey {
063 *      "blobCount":    number of blobs associated with the entry
064 *      "size":         storage size of the blobs associated with the entry
065 *      "completed":    entry status
066 *    }
067 *
068 * - Entry parameters:
069 *
070 *   transientStore:transientStoreName:entryKey:params {
071 *      "param1": value1
072 *      "param2": value2
073 *   }
074 *
075 * - Entry blobs:
076 *
077 *   transientStore:transientStoreName:entryKey:blobs:0 {
078 *      "file"
079 *      "filename"
080 *      "encoding"
081 *      "mimetype"
082 *      "digest"
083 *   }
084 *
085 *   transientStore:transientStoreName:entryKey:blobs:1 {
086 *      ...
087 *   }
088 *
089 *   ...
090 * </pre>
091 *
092 * @since 7.2
093 */
094public class RedisTransientStore extends AbstractTransientStore {
095
096    protected static final String SIZE_KEY = "size";
097
098    protected RedisExecutor redisExecutor;
099
100    protected String namespace;
101
102    protected String sizeKey;
103
104    protected KeyMatcher keyMatcher;
105
106    protected RedisAdmin redisAdmin;
107
108    protected int firstLevelTTL;
109
110    protected int secondLevelTTL;
111
112    protected Log log = LogFactory.getLog(RedisTransientStore.class);
113
114    public RedisTransientStore() {
115        redisExecutor = Framework.getService(RedisExecutor.class);
116        redisAdmin = Framework.getService(RedisAdmin.class);
117    }
118
119    @Override
120    public void init(TransientStoreConfig config) {
121        log.debug("Initializing RedisTransientStore: " + config.getName());
122        super.init(config);
123
124        namespace = redisAdmin.namespace("transientStore", config.getName());
125        sizeKey = namespace + SIZE_KEY;
126        keyMatcher = new KeyMatcher();
127
128        // Use seconds for Redis EXPIRE command
129        firstLevelTTL = config.getFirstLevelTTL() * 60;
130        secondLevelTTL = config.getSecondLevelTTL() * 60;
131    }
132
133    @Override
134    public void shutdown() {
135        log.debug("Shutting down RedisTransientStore: " + config.getName());
136        // Nothing to do here.
137    }
138
139    @Override
140    public boolean exists(String key) {
141        // Jedis#exists(String key) doesn't to work for a key created with hset or hmset
142        return getSummary(key) != null || getParameters(key) != null;
143    }
144
145    @Override
146    public Set<String> keySet() {
147        return redisExecutor.execute((RedisCallable<Set<String>>) jedis -> {
148            return jedis.keys(namespace + "*")
149                        .stream()
150                        .map(keyMatcher)
151                        .filter(key -> !SIZE_KEY.equals(key))
152                        .collect(Collectors.toSet());
153        });
154    }
155
156    protected class KeyMatcher implements Function<String, String> {
157
158        protected final Pattern KEY_PATTERN = Pattern.compile("(.*?)(:(params|blobs:[0-9]+))?");
159
160        protected final int offset = namespace.length();
161
162        @Override
163        public String apply(String t) {
164            final Matcher m = KEY_PATTERN.matcher(t.substring(offset));
165            m.matches();
166            return m.group(1);
167        }
168
169    }
170
171    @Override
172    public void putParameter(String key, String parameter, Serializable value) {
173        redisExecutor.execute((RedisCallable<Void>) jedis -> {
174            String paramsKey = namespace + join(key, "params");
175            if (log.isDebugEnabled()) {
176                log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter,
177                        value, paramsKey));
178            }
179            jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value));
180            return null;
181        });
182        setTTL(key, firstLevelTTL);
183    }
184
185    @Override
186    public Serializable getParameter(String key, String parameter) {
187        return redisExecutor.execute((RedisCallable<Serializable>) jedis -> {
188            String paramsKey = namespace + join(key, "params");
189            byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter));
190            if (paramBytes == null) {
191                return null;
192            }
193            Serializable res = deserialize(paramBytes);
194            if (log.isDebugEnabled()) {
195                log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter,
196                        paramsKey, res));
197            }
198            return res;
199        });
200    }
201
202    @Override
203    public void putParameters(String key, Map<String, Serializable> parameters) {
204        redisExecutor.execute((RedisCallable<Void>) jedis -> {
205            String paramsKey = namespace + join(key, "params");
206            if (log.isDebugEnabled()) {
207                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey));
208            }
209            jedis.hmset(getBytes(paramsKey), serialize(parameters));
210            return null;
211        });
212        setTTL(key, firstLevelTTL);
213    }
214
215    @Override
216    public Map<String, Serializable> getParameters(String key) {
217        // TODO NXP-18236: use a transaction?
218        String paramsKey = namespace + join(key, "params");
219        Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> {
220            return jedis.hgetAll(getBytes(paramsKey));
221        });
222        if (paramBytes.isEmpty()) {
223            if (getSummary(key) == null) {
224                return null;
225            } else {
226                return new HashMap<>();
227            }
228        }
229        Map<String, Serializable> res = deserialize(paramBytes);
230        if (log.isDebugEnabled()) {
231            log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res));
232        }
233        return res;
234    }
235
236    @Override
237    public List<Blob> getBlobs(String key) {
238        // TODO NXP-18236: use a transaction?
239
240        // Get blob count
241        String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> {
242            return jedis.hget(namespace + key, "blobCount");
243        });
244        if (log.isDebugEnabled()) {
245            log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s", namespace
246                    + key, blobCount));
247        }
248        if (blobCount == null) {
249            // Check for existing parameters
250            Map<String, Serializable> parameters = getParameters(key);
251            if (parameters == null) {
252                return null;
253            } else {
254                return new ArrayList<Blob>();
255            }
256        }
257
258        // Get blobs
259        int entryBlobCount = Integer.parseInt(blobCount);
260        if (entryBlobCount <= 0) {
261            return new ArrayList<>();
262        }
263        List<Map<String, String>> blobInfos = new ArrayList<>();
264        for (int i = 0; i < entryBlobCount; i++) {
265            String blobInfoIndex = String.valueOf(i);
266            Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
267                String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
268                Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey);
269                if (blobInfo.isEmpty()) {
270                    throw new NuxeoException(String.format(
271                            "Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist", key,
272                            entryBlobCount, blobInfoKey));
273                }
274                if (log.isDebugEnabled()) {
275                    log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey,
276                            blobInfo));
277                }
278                return blobInfo;
279            });
280            blobInfos.add(entryBlobInfo);
281        }
282
283        // Load blobs from the file system
284        return loadBlobs(blobInfos);
285    }
286
287    @Override
288    public long getSize(String key) {
289        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
290            String size = jedis.hget(namespace + key, SIZE_KEY);
291            if (size == null) {
292                return -1L;
293            }
294            if (log.isDebugEnabled()) {
295                log.debug(String.format("Fetched field \"%s\" from Redis hash stored at key %s -> %s", SIZE_KEY,
296                        namespace + key, size));
297            }
298            return Long.parseLong(size);
299        });
300    }
301
302    @Override
303    public boolean isCompleted(String key) {
304        return redisExecutor.execute((RedisCallable<Boolean>) jedis -> {
305            String completed = jedis.hget(namespace + key, "completed");
306            if (log.isDebugEnabled()) {
307                log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s", namespace
308                        + key, completed));
309            }
310            return Boolean.parseBoolean(completed);
311        });
312    }
313
314    @Override
315    public void setCompleted(String key, boolean completed) {
316        redisExecutor.execute((RedisCallable<Void>) jedis -> {
317            if (log.isDebugEnabled()) {
318                log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s",
319                        completed, namespace + key));
320            }
321            jedis.hset(namespace + key, "completed", String.valueOf(completed));
322            return null;
323        });
324        setTTL(key, firstLevelTTL);
325    }
326
327    @Override
328    public void remove(String key) {
329        // TODO NXP-18236: use a transaction?
330
331        Map<String, String> summary = getSummary(key);
332        if (summary != null) {
333            // Remove blobs
334            String blobCount = summary.get("blobCount");
335            deleteBlobInfos(key, blobCount);
336
337            // Remove summary
338            redisExecutor.execute((RedisCallable<Long>) jedis -> {
339                Long deleted = jedis.del(namespace + key);
340                if (log.isDebugEnabled()) {
341                    log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key));
342                }
343                return deleted;
344            });
345
346            // Decrement storage size
347            String size = summary.get(SIZE_KEY);
348            if (size != null) {
349                long entrySize = Integer.parseInt(size);
350                if (entrySize > 0) {
351                    decrementStorageSize(entrySize);
352                }
353            }
354        }
355
356        // Remove parameters
357        redisExecutor.execute((RedisCallable<Long>) jedis -> {
358            String paramsKey = namespace + join(key, "params");
359            Long deleted = jedis.del(getBytes(paramsKey));
360            if (log.isDebugEnabled()) {
361                log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey));
362            }
363            return deleted;
364        });
365    }
366
367    @Override
368    public void release(String key) {
369        if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
370            setTTL(key, secondLevelTTL);
371        } else {
372            remove(key);
373        }
374    }
375
376    @Override
377    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
378        // TODO NXP-18236: use a transaction?
379
380        Map<String, String> oldSummary = getSummary(key);
381
382        // Update storage size
383        long entrySize = -1;
384        if (oldSummary != null) {
385            String size = oldSummary.get(SIZE_KEY);
386            if (size != null) {
387                entrySize = Long.parseLong(size);
388            }
389        }
390        if (entrySize > 0) {
391            incrementStorageSize(sizeOfBlobs - entrySize);
392        } else {
393            if (sizeOfBlobs > 0) {
394                incrementStorageSize(sizeOfBlobs);
395            }
396        }
397
398        // Delete old blobs
399        if (oldSummary != null) {
400            String oldBlobCount = oldSummary.get("blobCount");
401            deleteBlobInfos(key, oldBlobCount);
402        }
403
404        // Update entry size and blob count
405        final Map<String, String> entrySummary = new HashMap<>();
406        int blobCount = 0;
407        if (blobInfos != null) {
408            blobCount = blobInfos.size();
409        }
410        entrySummary.put("blobCount", String.valueOf(blobCount));
411        entrySummary.put(SIZE_KEY, String.valueOf(sizeOfBlobs));
412        redisExecutor.execute((RedisCallable<Void>) jedis -> {
413            if (log.isDebugEnabled()) {
414                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary, namespace
415                        + key));
416            }
417            jedis.hmset(namespace + key, entrySummary);
418            jedis.expire(namespace + key, firstLevelTTL);
419            return null;
420        });
421
422        // Set new blobs
423        if (blobInfos != null) {
424            int blobsTimeout = firstLevelTTL + 60;
425            for (int i = 0; i < blobInfos.size(); i++) {
426                String blobInfoIndex = String.valueOf(i);
427                Map<String, String> blobInfo = blobInfos.get(i);
428                redisExecutor.execute((RedisCallable<Void>) jedis -> {
429                    String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
430                    if (log.isDebugEnabled()) {
431                        log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo,
432                                blobInfoKey));
433                    }
434                    jedis.hmset(blobInfoKey, blobInfo);
435                    jedis.expire(blobInfoKey, blobsTimeout);
436                    return null;
437                });
438            }
439        }
440
441        // Set params TTL
442        redisExecutor.execute((RedisCallable<Void>) jedis -> {
443            String paramsKey = namespace + join(key, "params");
444            jedis.expire(getBytes(paramsKey), firstLevelTTL + 60);
445            return null;
446        });
447    }
448
449    @Override
450    public long getStorageSize() {
451        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
452            String value = jedis.get(sizeKey);
453            if (value == null) {
454                return 0L;
455            }
456            if (log.isDebugEnabled()) {
457                log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value));
458            }
459            return Long.parseLong(value);
460        });
461    }
462
463    @Override
464    protected void setStorageSize(final long newSize) {
465        redisExecutor.execute((RedisCallable<Void>) jedis -> {
466            if (log.isDebugEnabled()) {
467                log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize));
468            }
469            jedis.set(sizeKey, "" + newSize);
470            return null;
471        });
472    }
473
474    @Override
475    protected long incrementStorageSize(final long size) {
476        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
477            Long incremented = jedis.incrBy(sizeKey, size);
478            if (log.isDebugEnabled()) {
479                log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented));
480            }
481            return incremented;
482        });
483    }
484
485    @Override
486    protected long decrementStorageSize(final long size) {
487        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
488            Long decremented = jedis.decrBy(sizeKey, size);
489            if (log.isDebugEnabled()) {
490                log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented));
491            }
492            return decremented;
493        });
494    }
495
496    @Override
497    protected void removeAllEntries() {
498        // TODO NXP-18236: use a transaction?
499        Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> {
500            return jedis.keys(namespace + "*");
501        });
502        for (String key : keys) {
503            redisExecutor.execute((RedisCallable<Void>) jedis -> {
504                jedis.del(key);
505                return null;
506            });
507        }
508    }
509
510    public long getTTL(String key) {
511        long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> {
512            return jedis.ttl(namespace + key);
513        });
514        if (summaryTTL >= 0) {
515            return summaryTTL;
516        } else {
517            return redisExecutor.execute((RedisCallable<Long>) jedis -> {
518                String paramsKey = namespace + join(key, "params");
519                return jedis.ttl(getBytes(paramsKey));
520            });
521        }
522    }
523
524    protected Map<String, String> getSummary(String key) {
525        return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
526            Map<String, String> summary = jedis.hgetAll(namespace + key);
527            if (summary.isEmpty()) {
528                return null;
529            }
530            if (log.isDebugEnabled()) {
531                log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key,
532                        summary));
533            }
534            return summary;
535        });
536    }
537
538    protected void deleteBlobInfos(String key, String blobCountStr) {
539        if (blobCountStr != null) {
540            int blobCount = Integer.parseInt(blobCountStr);
541            if (blobCount > 0) {
542                for (int i = 0; i < blobCount; i++) {
543                    String blobInfoIndex = String.valueOf(i);
544                    redisExecutor.execute((RedisCallable<Long>) jedis -> {
545                        String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
546                        Long deleted = jedis.del(blobInfoKey);
547                        if (log.isDebugEnabled()) {
548                            log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey));
549                        }
550                        return deleted;
551                    });
552                }
553            }
554        }
555    }
556
557    protected String join(String... fragments) {
558        return StringUtils.join(fragments, ":");
559    }
560
561    protected byte[] getBytes(String key) {
562        return key.getBytes(StandardCharsets.UTF_8);
563    }
564
565    protected String getString(byte[] bytes) {
566        return new String(bytes, StandardCharsets.UTF_8);
567    }
568
569    protected byte[] serialize(Serializable value) {
570        try {
571            ByteArrayOutputStream baos = new ByteArrayOutputStream();
572            ObjectOutputStream out = new ObjectOutputStream(baos);
573            out.writeObject(value);
574            out.flush();
575            out.close();
576            return baos.toByteArray();
577        } catch (IOException e) {
578            throw new NuxeoException(e);
579        }
580    }
581
582    protected Serializable deserialize(byte[] bytes) {
583        try {
584            InputStream bain = new ByteArrayInputStream(bytes);
585            ObjectInputStream in = new ObjectInputStream(bain);
586            return (Serializable) in.readObject();
587        } catch (IOException | ClassNotFoundException e) {
588            throw new NuxeoException(e);
589        }
590    }
591
592    protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) {
593        Map<byte[], byte[]> serializedMap = new HashMap<>();
594        for (String key : map.keySet()) {
595            serializedMap.put(getBytes(key), serialize(map.get(key)));
596        }
597        return serializedMap;
598    }
599
600    protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) {
601        Map<String, Serializable> map = new HashMap<>();
602        for (byte[] key : byteMap.keySet()) {
603            map.put(getString(key), deserialize(byteMap.get(key)));
604        }
605        return map;
606    }
607
608    protected void setTTL(String key, int seconds) {
609
610        Map<String, String> summary = getSummary(key);
611        if (summary != null) {
612            // Summary
613            redisExecutor.execute((RedisCallable<Void>) jedis -> {
614                jedis.expire(namespace + key, seconds);
615                return null;
616            });
617            // Blobs
618            String blobCountStr = summary.get("blobCount");
619            if (blobCountStr != null) {
620                int blobCount = Integer.parseInt(blobCountStr);
621                if (blobCount > 0) {
622                    final int blobsTimeout = seconds + 60;
623                    for (int i = 0; i < blobCount; i++) {
624                        String blobInfoIndex = String.valueOf(i);
625                        redisExecutor.execute((RedisCallable<Void>) jedis -> {
626                            String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
627                            jedis.expire(blobInfoKey, blobsTimeout);
628                            return null;
629                        });
630                    }
631                }
632            }
633        }
634        // Parameters
635        final int paramsTimeout;
636        if (summary == null) {
637            paramsTimeout = seconds;
638        } else {
639            paramsTimeout = seconds + 60;
640        }
641        redisExecutor.execute((RedisCallable<Void>) jedis -> {
642            String paramsKey = namespace + join(key, "params");
643            jedis.expire(getBytes(paramsKey), paramsTimeout);
644            return null;
645        });
646    }
647
648}