001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.apache.commons.lang3.StringUtils.isBlank;
023
024import java.io.IOException;
025import java.nio.charset.CharacterCodingException;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.stream.Stream;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.ecm.core.api.NuxeoException;
038import org.nuxeo.ecm.core.redis.RedisAdmin;
039import org.nuxeo.ecm.core.redis.RedisExecutor;
040import org.nuxeo.runtime.api.Framework;
041import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
042import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
043
044import redis.clients.jedis.exceptions.JedisDataException;
045
046/**
047 * Redis implementation of a Key/Value Store Provider.
048 * <p>
049 * The following configuration properties are available:
050 * <ul>
051 * <li>namespace: the Redis namespace to use for keys (in addition to the global Redis namespace configured in the Redis
052 * service). DEPRECATED since 10.10, use the descriptor's {@code <namespace>} element instead.
053 * </ul>
054 *
055 * @since 9.1
056 */
057public class RedisKeyValueStore extends AbstractKeyValueStoreProvider {
058
059    private static final Log log = LogFactory.getLog(RedisKeyValueStore.class);
060
061    /**
062     * @deprecated since 10.10
063     */
064    @Deprecated
065    public static final String NAMESPACE_PROP = "namespace";
066
067    protected static final Long ONE = Long.valueOf(1);
068
069    protected String namespace;
070
071    protected byte[] compareAndSetSHA;
072
073    protected byte[] compareAndDelSHA;
074
075    protected byte[] compareNullAndSetSHA;
076
077    protected static byte[] getBytes(String key) {
078        return key.getBytes(UTF_8);
079    }
080
081    @Override
082    public void initialize(KeyValueStoreDescriptor descriptor) {
083        super.initialize(descriptor);
084        log.debug("Initializing");
085        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
086        namespace = redisAdmin.namespace(getNamespace(descriptor));
087        try {
088            compareAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-set"));
089            compareAndDelSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-del"));
090            compareNullAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-null-and-set"));
091        } catch (IOException e) {
092            throw new NuxeoException("Cannot load Redis script", e);
093        }
094    }
095
096    protected String[] getNamespace(KeyValueStoreDescriptor descriptor) {
097        String ns = descriptor.namespace;
098        if (isBlank(ns)) {
099            ns = descriptor.properties.get(NAMESPACE_PROP);
100            if (isBlank(ns)) {
101                return new String[0];
102            }
103        }
104        return new String[] { ns.trim() };
105    }
106
107    @Override
108    public Stream<String> keyStream() {
109        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
110        int namespaceLength = namespace.length();
111        Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + "*"));
112        return keys.stream().map(key -> key.substring(namespaceLength));
113    }
114
115    @Override
116    public Stream<String> keyStream(String prefix) {
117        final String prefixf = ecapeGlob(prefix);
118        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
119        int namespaceLength = namespace.length();
120        Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + prefixf + "*"));
121        return keys.stream().map(key -> key.substring(namespaceLength));
122    }
123
124    /** Escape glob-like wildcards and [] char ranges with a backslash. */
125    public static String ecapeGlob(String prefix) {
126        if (prefix.indexOf('\\') >= 0 || prefix.indexOf('?') >= 0 || prefix.indexOf('*') >= 0
127                || prefix.indexOf('[') >= 0) {
128            prefix = prefix.replace("\\", "\\\\").replace("?", "\\?").replace("*", "\\*").replace("[", "\\[");
129        }
130        return prefix;
131    }
132
133    @Override
134    public void close() {
135        log.debug("Closed");
136    }
137
138    @Override
139    public void clear() {
140        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
141        redisAdmin.clear(namespace + "*");
142    }
143
144    @Override
145    public void put(String key, byte[] value, long ttl) {
146        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
147        redisExecutor.execute(jedis -> {
148            byte[] keyb = getBytes(namespace + key);
149            if (value == null) {
150                jedis.del(keyb);
151            } else if (ttl == 0) {
152                jedis.set(keyb, value);
153            } else {
154                jedis.setex(keyb, (int) ttl, value);
155            }
156            return null;
157        });
158    }
159
160    @Override
161    public byte[] get(String key) {
162        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
163        return redisExecutor.execute(jedis -> jedis.get(getBytes(namespace + key)));
164    }
165
166    @Override
167    public Map<String, byte[]> get(Collection<String> keys) {
168        Map<String, byte[]> map = new HashMap<>(keys.size());
169        List<byte[]> values = getValuesForKeys(keys);
170        int i = 0;
171        for (String key : keys) {
172            byte[] value = values.get(i++);
173            if (value != null) {
174                map.put(key, value);
175            }
176        }
177        return map;
178    }
179
180    @Override
181    public Map<String, String> getStrings(Collection<String> keys) {
182        Map<String, String> map = new HashMap<>(keys.size());
183        List<byte[]> values = getValuesForKeys(keys);
184        int i = 0;
185        for (String key : keys) {
186            byte[] value = values.get(i++);
187            if (value != null) {
188                try {
189                    map.put(key, bytesToString(value));
190                } catch (CharacterCodingException e) {
191                    throw new IllegalArgumentException("Value is not a String for key: " + key);
192                }
193            }
194        }
195        return map;
196    }
197
198    @Override
199    public Map<String, Long> getLongs(Collection<String> keys) {
200        Map<String, Long> map = new HashMap<>(keys.size());
201        List<byte[]> values = getValuesForKeys(keys);
202        int i = 0;
203        for (String key : keys) {
204            byte[] value = values.get(i++);
205            if (value != null) {
206                map.put(key, bytesToLong(value));
207            }
208        }
209        return map;
210    }
211
212    /**
213     * @since 9.10
214     */
215    protected List<byte[]> getValuesForKeys(Collection<String> keys) {
216        byte[][] byteKeys = new byte[keys.size()][];
217        int i = 0;
218        for (String key : keys) {
219            byteKeys[i++] = getBytes(namespace + key);
220        }
221        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
222        return redisExecutor.execute(jedis -> jedis.mget(byteKeys));
223    }
224
225    @Override
226    public boolean setTTL(String key, long ttl) {
227        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
228        Long result = redisExecutor.execute(jedis -> {
229            byte[] keyb = getBytes(namespace + key);
230            if (ttl == 0) {
231                return jedis.persist(keyb);
232            } else {
233                return jedis.expire(keyb, (int) ttl);
234            }
235        });
236        return ONE.equals(result);
237    }
238
239    @Override
240    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
241        if (expected == null && value == null) {
242            return get(key) == null;
243        } else {
244            byte[] sha;
245            List<byte[]> keys = Collections.singletonList(getBytes(namespace + key));
246            List<byte[]> args;
247            if (expected == null) {
248                sha = compareNullAndSetSHA;
249                args = Collections.singletonList(value);
250            } else if (value == null) {
251                sha = compareAndDelSHA;
252                args = Collections.singletonList(expected);
253            } else {
254                sha = compareAndSetSHA;
255                args = Arrays.asList(expected, value);
256            }
257            RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
258            Object result = redisExecutor.evalsha(sha, keys, args);
259            boolean set = ONE.equals(result);
260            if (set && value != null && ttl != 0) {
261                // no need to be atomic and to a SETEX, so just do the EXPIRE now
262                setTTL(key, ttl);
263            }
264            return set;
265        }
266    }
267
268    @Override
269    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
270        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
271        Long result = redisExecutor.execute(jedis -> {
272            byte[] keyb = getBytes(namespace + key);
273            try {
274                return jedis.incrBy(keyb, delta);
275            } catch (JedisDataException e) {
276                throw new NumberFormatException();
277            }
278        });
279        return result.longValue();
280    }
281
282}