001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.nio.charset.CharacterCodingException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.stream.Stream;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.redis.RedisAdmin;
038import org.nuxeo.ecm.core.redis.RedisExecutor;
039import org.nuxeo.runtime.api.Framework;
040import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
041import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
042
043import redis.clients.jedis.exceptions.JedisDataException;
044
045/**
046 * Redis implementation of a Key/Value Store Provider.
047 * <p>
048 * The following configuration properties are available:
049 * <ul>
050 * <li>namespace: the Redis namespace to use for keys (in addition to the global Redis namespace configured in the Redis
051 * service).
052 * </ul>
053 *
054 * @since 9.1
055 */
056public class RedisKeyValueStore extends AbstractKeyValueStoreProvider {
057
058    private static final Log log = LogFactory.getLog(RedisKeyValueStore.class);
059
060    public static final String NAMESPACE_PROP = "namespace";
061
062    protected static final Long ONE = Long.valueOf(1);
063
064    protected String name;
065
066    protected String namespace;
067
068    protected byte[] compareAndSetSHA;
069
070    protected byte[] compareAndDelSHA;
071
072    protected byte[] compareNullAndSetSHA;
073
074    protected static byte[] getBytes(String key) {
075        return key.getBytes(UTF_8);
076    }
077
078    @Override
079    public void initialize(KeyValueStoreDescriptor descriptor) {
080        log.debug("Initializing");
081        name = descriptor.name;
082        Map<String, String> properties = descriptor.getProperties();
083        String name = properties.get(NAMESPACE_PROP);
084        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
085        namespace = redisAdmin.namespace(name == null ? new String[0] : new String[] { name });
086        try {
087            compareAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-set"));
088            compareAndDelSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-del"));
089            compareNullAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-null-and-set"));
090        } catch (IOException e) {
091            throw new NuxeoException("Cannot load Redis script", e);
092        }
093    }
094
095    @Override
096    public Stream<String> keyStream() {
097        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
098        int namespaceLength = namespace.length();
099        Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + "*"));
100        return keys.stream().map(key -> key.substring(namespaceLength));
101    }
102
103    @Override
104    public void close() {
105        log.debug("Closed");
106    }
107
108    @Override
109    public void clear() {
110        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
111        redisAdmin.clear(namespace + "*");
112    }
113
114    @Override
115    public void put(String key, byte[] value, long ttl) {
116        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
117        redisExecutor.execute(jedis -> {
118            byte[] keyb = getBytes(namespace + key);
119            if (value == null) {
120                jedis.del(keyb);
121            } else if (ttl == 0) {
122                jedis.set(keyb, value);
123            } else {
124                jedis.setex(keyb, (int) ttl, value);
125            }
126            return null;
127        });
128    }
129
130    @Override
131    public byte[] get(String key) {
132        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
133        return redisExecutor.execute(jedis -> jedis.get(getBytes(namespace + key)));
134    }
135
136    @Override
137    public Map<String, byte[]> get(Collection<String> keys) {
138        Map<String, byte[]> map = new HashMap<>(keys.size());
139        List<byte[]> values = getValuesForKeys(keys);
140        int i = 0;
141        for (String key : keys) {
142            byte[] value = values.get(i++);
143            if (value != null) {
144                map.put(key, value);
145            }
146        }
147        return map;
148    }
149
150    @Override
151    public Map<String, String> getStrings(Collection<String> keys) {
152        Map<String, String> map = new HashMap<>(keys.size());
153        List<byte[]> values = getValuesForKeys(keys);
154        int i = 0;
155        for (String key : keys) {
156            byte[] value = values.get(i++);
157            if (value != null) {
158                try {
159                    map.put(key, bytesToString(value));
160                } catch (CharacterCodingException e) {
161                    throw new IllegalArgumentException("Value is not a String for key: " + key);
162                }
163            }
164        }
165        return map;
166    }
167
168    @Override
169    public Map<String, Long> getLongs(Collection<String> keys) {
170        Map<String, Long> map = new HashMap<>(keys.size());
171        List<byte[]> values = getValuesForKeys(keys);
172        int i = 0;
173        for (String key : keys) {
174            byte[] value = values.get(i++);
175            if (value != null) {
176                map.put(key, bytesToLong(value));
177            }
178        }
179        return map;
180    }
181
182    /**
183     * @since 9.10
184     */
185    protected List<byte[]> getValuesForKeys(Collection<String> keys) {
186        byte[][] byteKeys = new byte[keys.size()][];
187        int i = 0;
188        for (String key : keys) {
189            byteKeys[i++] = getBytes(namespace + key);
190        }
191        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
192        return redisExecutor.execute(jedis -> jedis.mget(byteKeys));
193    }
194
195    @Override
196    public boolean setTTL(String key, long ttl) {
197        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
198        Long result = redisExecutor.execute(jedis -> {
199            byte[] keyb = getBytes(namespace + key);
200            if (ttl == 0) {
201                return jedis.persist(keyb);
202            } else {
203                return jedis.expire(keyb, (int) ttl);
204            }
205        });
206        return ONE.equals(result);
207    }
208
209    @Override
210    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
211        if (expected == null && value == null) {
212            return get(key) == null;
213        } else {
214            byte[] sha;
215            List<byte[]> keys = Collections.singletonList(getBytes(namespace + key));
216            List<byte[]> args;
217            if (expected == null) {
218                sha = compareNullAndSetSHA;
219                args = Collections.singletonList(value);
220            } else if (value == null) {
221                sha = compareAndDelSHA;
222                args = Collections.singletonList(expected);
223            } else {
224                sha = compareAndSetSHA;
225                args = Arrays.asList(expected, value);
226            }
227            RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
228            Object result = redisExecutor.evalsha(sha, keys, args);
229            boolean set = ONE.equals(result);
230            if (set && value != null && ttl != 0) {
231                // no need to be atomic and to a SETEX, so just do the EXPIRE now
232                setTTL(key, ttl);
233            }
234            return set;
235        }
236    }
237
238    @Override
239    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
240        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
241        Long result = redisExecutor.execute(jedis -> {
242            byte[] keyb = getBytes(namespace + key);
243            try {
244                return jedis.incrBy(keyb, delta);
245            } catch (JedisDataException e) {
246                throw new NumberFormatException();
247            }
248        });
249        return result.longValue();
250    }
251
252    @Override
253    public String toString() {
254        return getClass().getSimpleName() + "(" + name + ")";
255    }
256
257}