001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 *     Antoine Taillefer <ataillefer@nuxeo.com>
019 */
020
021package org.nuxeo.ecm.core.redis.contribs;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.ObjectInputStream;
028import java.io.ObjectOutputStream;
029import java.io.Serializable;
030import java.nio.charset.StandardCharsets;
031import java.util.ArrayList;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.function.Function;
037import java.util.regex.Matcher;
038import java.util.regex.Pattern;
039import java.util.stream.Stream;
040
041import org.apache.commons.lang.StringUtils;
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.nuxeo.ecm.core.api.Blob;
045import org.nuxeo.ecm.core.api.NuxeoException;
046import org.nuxeo.ecm.core.redis.RedisAdmin;
047import org.nuxeo.ecm.core.redis.RedisCallable;
048import org.nuxeo.ecm.core.redis.RedisExecutor;
049import org.nuxeo.ecm.core.transientstore.AbstractTransientStore;
050import org.nuxeo.ecm.core.transientstore.api.TransientStore;
051import org.nuxeo.ecm.core.transientstore.api.TransientStoreConfig;
052import org.nuxeo.runtime.api.Framework;
053
054/**
055 * Redis implementation (i.e. cluster aware) of the {@link TransientStore}.
056 * <p>
057 * Since hashes cannot be nested, a storage entry is flattened as follows:
058 *
059 * <pre>
060 *  - Entry summary:
061 *
062 *    transientStore:transientStoreName:entryKey {
063 *      "blobCount":    number of blobs associated with the entry
064 *      "size":         storage size of the blobs associated with the entry
065 *      "completed":    entry status
066 *    }
067 *
068 * - Entry parameters:
069 *
070 *   transientStore:transientStoreName:entryKey:params {
071 *      "param1": value1
072 *      "param2": value2
073 *   }
074 *
075 * - Entry blobs:
076 *
077 *   transientStore:transientStoreName:entryKey:blobs:0 {
078 *      "file"
079 *      "filename"
080 *      "encoding"
081 *      "mimetype"
082 *      "digest"
083 *   }
084 *
085 *   transientStore:transientStoreName:entryKey:blobs:1 {
086 *      ...
087 *   }
088 *
089 *   ...
090 * </pre>
091 *
092 * @since 7.2
093 */
094public class RedisTransientStore extends AbstractTransientStore {
095
096    protected static final String SIZE_KEY = "size";
097
098    protected RedisExecutor redisExecutor;
099
100    protected String namespace;
101
102    protected String sizeKey;
103
104    protected KeyMatcher keyMatcher;
105
106    protected RedisAdmin redisAdmin;
107
108    protected int firstLevelTTL;
109
110    protected int secondLevelTTL;
111
112    protected Log log = LogFactory.getLog(RedisTransientStore.class);
113
114    public RedisTransientStore() {
115        redisExecutor = Framework.getService(RedisExecutor.class);
116        redisAdmin = Framework.getService(RedisAdmin.class);
117    }
118
119    @Override
120    public void init(TransientStoreConfig config) {
121        log.debug("Initializing RedisTransientStore: " + config.getName());
122        super.init(config);
123
124        namespace = redisAdmin.namespace("transientStore", config.getName());
125        sizeKey = namespace + SIZE_KEY;
126        keyMatcher = new KeyMatcher();
127
128        // Use seconds for Redis EXPIRE command
129        firstLevelTTL = config.getFirstLevelTTL() * 60;
130        secondLevelTTL = config.getSecondLevelTTL() * 60;
131    }
132
133    @Override
134    public void shutdown() {
135        log.debug("Shutting down RedisTransientStore: " + config.getName());
136        // Nothing to do here.
137    }
138
139    @Override
140    public boolean exists(String key) {
141        // Jedis#exists(String key) doesn't to work for a key created with hset or hmset
142        return getSummary(key) != null || getParameters(key) != null;
143    }
144
145    @Override
146    public Stream<String> keyStream() {
147        return redisExecutor.execute((RedisCallable<Stream<String>>) jedis -> {
148            return jedis.keys(namespace + "*") //
149                        .stream()
150                        .map(keyMatcher)
151                        .filter(key -> !SIZE_KEY.equals(key));
152        });
153    }
154
155    protected class KeyMatcher implements Function<String, String> {
156
157        protected final Pattern KEY_PATTERN = Pattern.compile("(.*?)(:(params|blobs:[0-9]+))?");
158
159        protected final int offset = namespace.length();
160
161        @Override
162        public String apply(String t) {
163            final Matcher m = KEY_PATTERN.matcher(t.substring(offset));
164            m.matches();
165            return m.group(1);
166        }
167
168    }
169
170    @Override
171    public void putParameter(String key, String parameter, Serializable value) {
172        redisExecutor.execute((RedisCallable<Void>) jedis -> {
173            String paramsKey = namespace + join(key, "params");
174            if (log.isDebugEnabled()) {
175                log.debug(String.format("Setting field %s to value %s in Redis hash stored at key %s", parameter, value,
176                        paramsKey));
177            }
178            jedis.hset(getBytes(paramsKey), getBytes(parameter), serialize(value));
179            return null;
180        });
181        setTTL(key, firstLevelTTL);
182    }
183
184    @Override
185    public Serializable getParameter(String key, String parameter) {
186        return redisExecutor.execute((RedisCallable<Serializable>) jedis -> {
187            String paramsKey = namespace + join(key, "params");
188            byte[] paramBytes = jedis.hget(getBytes(paramsKey), getBytes(parameter));
189            if (paramBytes == null) {
190                return null;
191            }
192            Serializable res = deserialize(paramBytes);
193            if (log.isDebugEnabled()) {
194                log.debug(String.format("Fetched field %s from Redis hash stored at key %s -> %s", parameter, paramsKey,
195                        res));
196            }
197            return res;
198        });
199    }
200
201    @Override
202    public void putParameters(String key, Map<String, Serializable> parameters) {
203        redisExecutor.execute((RedisCallable<Void>) jedis -> {
204            String paramsKey = namespace + join(key, "params");
205            if (log.isDebugEnabled()) {
206                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", parameters, paramsKey));
207            }
208            jedis.hmset(getBytes(paramsKey), serialize(parameters));
209            return null;
210        });
211        setTTL(key, firstLevelTTL);
212    }
213
214    @Override
215    public Map<String, Serializable> getParameters(String key) {
216        // TODO NXP-18236: use a transaction?
217        String paramsKey = namespace + join(key, "params");
218        Map<byte[], byte[]> paramBytes = redisExecutor.execute((RedisCallable<Map<byte[], byte[]>>) jedis -> {
219            return jedis.hgetAll(getBytes(paramsKey));
220        });
221        if (paramBytes.isEmpty()) {
222            if (getSummary(key) == null) {
223                return null;
224            } else {
225                return new HashMap<>();
226            }
227        }
228        Map<String, Serializable> res = deserialize(paramBytes);
229        if (log.isDebugEnabled()) {
230            log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", paramsKey, res));
231        }
232        return res;
233    }
234
235    @Override
236    public List<Blob> getBlobs(String key) {
237        // TODO NXP-18236: use a transaction?
238
239        // Get blob count
240        String blobCount = redisExecutor.execute((RedisCallable<String>) jedis -> {
241            return jedis.hget(namespace + key, "blobCount");
242        });
243        if (log.isDebugEnabled()) {
244            log.debug(String.format("Fetched field \"blobCount\" from Redis hash stored at key %s -> %s",
245                    namespace + key, blobCount));
246        }
247        if (blobCount == null) {
248            // Check for existing parameters
249            Map<String, Serializable> parameters = getParameters(key);
250            if (parameters == null) {
251                return null;
252            } else {
253                return new ArrayList<Blob>();
254            }
255        }
256
257        // Get blobs
258        int entryBlobCount = Integer.parseInt(blobCount);
259        if (entryBlobCount <= 0) {
260            return new ArrayList<>();
261        }
262        List<Map<String, String>> blobInfos = new ArrayList<>();
263        for (int i = 0; i < entryBlobCount; i++) {
264            String blobInfoIndex = String.valueOf(i);
265            Map<String, String> entryBlobInfo = redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
266                String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
267                Map<String, String> blobInfo = jedis.hgetAll(blobInfoKey);
268                if (blobInfo.isEmpty()) {
269                    throw new NuxeoException(
270                            String.format("Entry with key %s is inconsistent: blobCount = %d but key %s doesn't exist",
271                                    key, entryBlobCount, blobInfoKey));
272                }
273                if (log.isDebugEnabled()) {
274                    log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", blobInfoKey,
275                            blobInfo));
276                }
277                return blobInfo;
278            });
279            blobInfos.add(entryBlobInfo);
280        }
281
282        // Load blobs from the file system
283        return loadBlobs(blobInfos);
284    }
285
286    @Override
287    public long getSize(String key) {
288        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
289            String size = jedis.hget(namespace + key, SIZE_KEY);
290            if (size == null) {
291                return -1L;
292            }
293            if (log.isDebugEnabled()) {
294                log.debug(String.format("Fetched field \"%s\" from Redis hash stored at key %s -> %s", SIZE_KEY,
295                        namespace + key, size));
296            }
297            return Long.parseLong(size);
298        });
299    }
300
301    @Override
302    public boolean isCompleted(String key) {
303        return redisExecutor.execute((RedisCallable<Boolean>) jedis -> {
304            String completed = jedis.hget(namespace + key, "completed");
305            if (log.isDebugEnabled()) {
306                log.debug(String.format("Fetched field \"completed\" from Redis hash stored at key %s -> %s",
307                        namespace + key, completed));
308            }
309            return Boolean.parseBoolean(completed);
310        });
311    }
312
313    @Override
314    public void setCompleted(String key, boolean completed) {
315        redisExecutor.execute((RedisCallable<Void>) jedis -> {
316            if (log.isDebugEnabled()) {
317                log.debug(String.format("Setting field \"completed\" to value %s in Redis hash stored at key %s",
318                        completed, namespace + key));
319            }
320            jedis.hset(namespace + key, "completed", String.valueOf(completed));
321            return null;
322        });
323        setTTL(key, firstLevelTTL);
324    }
325
326    @Override
327    public void release(String key) {
328        if (getStorageSize() <= config.getTargetMaxSizeMB() * (1024 * 1024) || config.getTargetMaxSizeMB() < 0) {
329            setTTL(key, secondLevelTTL);
330        } else {
331            remove(key);
332        }
333    }
334
335    @Override
336    protected void persistBlobs(String key, long sizeOfBlobs, List<Map<String, String>> blobInfos) {
337        // TODO NXP-18236: use a transaction?
338
339        Map<String, String> oldSummary = getSummary(key);
340
341        // Update storage size
342        long entrySize = -1;
343        if (oldSummary != null) {
344            String size = oldSummary.get(SIZE_KEY);
345            if (size != null) {
346                entrySize = Long.parseLong(size);
347            }
348        }
349        if (entrySize > 0) {
350            incrementStorageSize(sizeOfBlobs - entrySize);
351        } else {
352            if (sizeOfBlobs > 0) {
353                incrementStorageSize(sizeOfBlobs);
354            }
355        }
356
357        // Delete old blobs
358        if (oldSummary != null) {
359            String oldBlobCount = oldSummary.get("blobCount");
360            deleteBlobInfos(key, oldBlobCount);
361        }
362
363        // Update entry size and blob count
364        final Map<String, String> entrySummary = new HashMap<>();
365        int blobCount = 0;
366        if (blobInfos != null) {
367            blobCount = blobInfos.size();
368        }
369        entrySummary.put("blobCount", String.valueOf(blobCount));
370        entrySummary.put(SIZE_KEY, String.valueOf(sizeOfBlobs));
371        redisExecutor.execute((RedisCallable<Void>) jedis -> {
372            if (log.isDebugEnabled()) {
373                log.debug(String.format("Setting fields %s in Redis hash stored at key %s", entrySummary,
374                        namespace + key));
375            }
376            jedis.hmset(namespace + key, entrySummary);
377            jedis.expire(namespace + key, firstLevelTTL);
378            return null;
379        });
380
381        // Set new blobs
382        if (blobInfos != null) {
383            int blobsTimeout = firstLevelTTL + 60;
384            for (int i = 0; i < blobInfos.size(); i++) {
385                String blobInfoIndex = String.valueOf(i);
386                Map<String, String> blobInfo = blobInfos.get(i);
387                redisExecutor.execute((RedisCallable<Void>) jedis -> {
388                    String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
389                    if (log.isDebugEnabled()) {
390                        log.debug(String.format("Setting fields %s in Redis hash stored at key %s", blobInfo,
391                                blobInfoKey));
392                    }
393                    jedis.hmset(blobInfoKey, blobInfo);
394                    jedis.expire(blobInfoKey, blobsTimeout);
395                    return null;
396                });
397            }
398        }
399
400        // Set params TTL
401        redisExecutor.execute((RedisCallable<Void>) jedis -> {
402            String paramsKey = namespace + join(key, "params");
403            jedis.expire(getBytes(paramsKey), firstLevelTTL + 60);
404            return null;
405        });
406    }
407
408    @Override
409    public long getStorageSize() {
410        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
411            String value = jedis.get(sizeKey);
412            if (value == null) {
413                return 0L;
414            }
415            if (log.isDebugEnabled()) {
416                log.debug(String.format("Fetched value of Redis key %s -> %s", sizeKey, value));
417            }
418            return Long.parseLong(value);
419        });
420    }
421
422    @Override
423    protected void setStorageSize(final long newSize) {
424        redisExecutor.execute((RedisCallable<Void>) jedis -> {
425            if (log.isDebugEnabled()) {
426                log.debug(String.format("Setting Redis key %s to value %s", sizeKey, newSize));
427            }
428            jedis.set(sizeKey, "" + newSize);
429            return null;
430        });
431    }
432
433    @Override
434    protected long incrementStorageSize(final long size) {
435        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
436            Long incremented = jedis.incrBy(sizeKey, size);
437            if (log.isDebugEnabled()) {
438                log.debug(String.format("Incremented Redis key %s to %d", sizeKey, incremented));
439            }
440            return incremented;
441        });
442    }
443
444    @Override
445    protected long decrementStorageSize(final long size) {
446        return redisExecutor.execute((RedisCallable<Long>) jedis -> {
447            Long decremented = jedis.decrBy(sizeKey, size);
448            if (log.isDebugEnabled()) {
449                log.debug(String.format("Decremented Redis key %s to %d", sizeKey, decremented));
450            }
451            return decremented;
452        });
453    }
454
455    @Override
456    protected void removeEntry(String key) {
457        // TODO NXP-18236: use a transaction?
458
459        Map<String, String> summary = getSummary(key);
460        if (summary != null) {
461            // Remove blobs
462            String blobCount = summary.get("blobCount");
463            deleteBlobInfos(key, blobCount);
464
465            // Remove summary
466            redisExecutor.execute((RedisCallable<Long>) jedis -> {
467                Long deleted = jedis.del(namespace + key);
468                if (log.isDebugEnabled()) {
469                    log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, namespace + key));
470                }
471                return deleted;
472            });
473
474            // Decrement storage size
475            String size = summary.get(SIZE_KEY);
476            if (size != null) {
477                long entrySize = Integer.parseInt(size);
478                if (entrySize > 0) {
479                    decrementStorageSize(entrySize);
480                }
481            }
482        }
483
484        // Remove parameters
485        redisExecutor.execute((RedisCallable<Long>) jedis -> {
486            String paramsKey = namespace + join(key, "params");
487            Long deleted = jedis.del(getBytes(paramsKey));
488            if (log.isDebugEnabled()) {
489                log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, paramsKey));
490            }
491            return deleted;
492        });
493    }
494
495    @Override
496    protected void removeAllEntries() {
497        // TODO NXP-18236: use a transaction?
498        Set<String> keys = redisExecutor.execute((RedisCallable<Set<String>>) jedis -> {
499            return jedis.keys(namespace + "*");
500        });
501        for (String key : keys) {
502            redisExecutor.execute((RedisCallable<Void>) jedis -> {
503                jedis.del(key);
504                return null;
505            });
506        }
507    }
508
509    public long getTTL(String key) {
510        long summaryTTL = redisExecutor.execute((RedisCallable<Long>) jedis -> {
511            return jedis.ttl(namespace + key);
512        });
513        if (summaryTTL >= 0) {
514            return summaryTTL;
515        } else {
516            return redisExecutor.execute((RedisCallable<Long>) jedis -> {
517                String paramsKey = namespace + join(key, "params");
518                return jedis.ttl(getBytes(paramsKey));
519            });
520        }
521    }
522
523    protected Map<String, String> getSummary(String key) {
524        return redisExecutor.execute((RedisCallable<Map<String, String>>) jedis -> {
525            Map<String, String> summary = jedis.hgetAll(namespace + key);
526            if (summary.isEmpty()) {
527                return null;
528            }
529            if (log.isDebugEnabled()) {
530                log.debug(String.format("Fetched fields from Redis hash stored at key %s -> %s", namespace + key,
531                        summary));
532            }
533            return summary;
534        });
535    }
536
537    protected void deleteBlobInfos(String key, String blobCountStr) {
538        if (blobCountStr != null) {
539            int blobCount = Integer.parseInt(blobCountStr);
540            if (blobCount > 0) {
541                for (int i = 0; i < blobCount; i++) {
542                    String blobInfoIndex = String.valueOf(i);
543                    redisExecutor.execute((RedisCallable<Long>) jedis -> {
544                        String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
545                        Long deleted = jedis.del(blobInfoKey);
546                        if (log.isDebugEnabled()) {
547                            log.debug(String.format("Deleted %d Redis hash stored at key %s", deleted, blobInfoKey));
548                        }
549                        return deleted;
550                    });
551                }
552            }
553        }
554    }
555
556    protected String join(String... fragments) {
557        return StringUtils.join(fragments, ":");
558    }
559
560    protected byte[] getBytes(String key) {
561        return key.getBytes(StandardCharsets.UTF_8);
562    }
563
564    protected String getString(byte[] bytes) {
565        return new String(bytes, StandardCharsets.UTF_8);
566    }
567
568    protected byte[] serialize(Serializable value) {
569        try {
570            ByteArrayOutputStream baos = new ByteArrayOutputStream();
571            ObjectOutputStream out = new ObjectOutputStream(baos);
572            out.writeObject(value);
573            out.flush();
574            out.close();
575            return baos.toByteArray();
576        } catch (IOException e) {
577            throw new NuxeoException(e);
578        }
579    }
580
581    protected Serializable deserialize(byte[] bytes) {
582        try {
583            InputStream bain = new ByteArrayInputStream(bytes);
584            ObjectInputStream in = new ObjectInputStream(bain);
585            return (Serializable) in.readObject();
586        } catch (IOException | ClassNotFoundException e) {
587            throw new NuxeoException(e);
588        }
589    }
590
591    protected Map<byte[], byte[]> serialize(Map<String, Serializable> map) {
592        Map<byte[], byte[]> serializedMap = new HashMap<>();
593        for (String key : map.keySet()) {
594            serializedMap.put(getBytes(key), serialize(map.get(key)));
595        }
596        return serializedMap;
597    }
598
599    protected Map<String, Serializable> deserialize(Map<byte[], byte[]> byteMap) {
600        Map<String, Serializable> map = new HashMap<>();
601        for (byte[] key : byteMap.keySet()) {
602            map.put(getString(key), deserialize(byteMap.get(key)));
603        }
604        return map;
605    }
606
607    protected void setTTL(String key, int seconds) {
608
609        Map<String, String> summary = getSummary(key);
610        if (summary != null) {
611            // Summary
612            redisExecutor.execute((RedisCallable<Void>) jedis -> {
613                jedis.expire(namespace + key, seconds);
614                return null;
615            });
616            // Blobs
617            String blobCountStr = summary.get("blobCount");
618            if (blobCountStr != null) {
619                int blobCount = Integer.parseInt(blobCountStr);
620                if (blobCount > 0) {
621                    final int blobsTimeout = seconds + 60;
622                    for (int i = 0; i < blobCount; i++) {
623                        String blobInfoIndex = String.valueOf(i);
624                        redisExecutor.execute((RedisCallable<Void>) jedis -> {
625                            String blobInfoKey = namespace + join(key, "blobs", blobInfoIndex);
626                            jedis.expire(blobInfoKey, blobsTimeout);
627                            return null;
628                        });
629                    }
630                }
631            }
632        }
633        // Parameters
634        final int paramsTimeout;
635        if (summary == null) {
636            paramsTimeout = seconds;
637        } else {
638            paramsTimeout = seconds + 60;
639        }
640        redisExecutor.execute((RedisCallable<Void>) jedis -> {
641            String paramsKey = namespace + join(key, "params");
642            jedis.expire(getBytes(paramsKey), paramsTimeout);
643            return null;
644        });
645    }
646
647}