001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.IOException;
024import java.nio.charset.CharacterCodingException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.stream.Stream;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.redis.RedisAdmin;
038import org.nuxeo.ecm.core.redis.RedisExecutor;
039import org.nuxeo.runtime.api.Framework;
040import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
041import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
042
043/**
044 * Redis implementation of a Key/Value Store Provider.
045 * <p>
046 * The following configuration properties are available:
047 * <ul>
048 * <li>namespace: the Redis namespace to use for keys (in addition to the global Redis namespace configured in the Redis
049 * service).
050 * </ul>
051 *
052 * @since 9.1
053 */
054public class RedisKeyValueStore extends AbstractKeyValueStoreProvider {
055
056    private static final Log log = LogFactory.getLog(RedisKeyValueStore.class);
057
058    public static final String NAMESPACE_PROP = "namespace";
059
060    protected static final Long ONE = Long.valueOf(1);
061
062    protected String name;
063
064    protected String namespace;
065
066    protected byte[] compareAndSetSHA;
067
068    protected byte[] compareAndDelSHA;
069
070    protected byte[] compareNullAndSetSHA;
071
072    protected static byte[] getBytes(String key) {
073        return key.getBytes(UTF_8);
074    }
075
076    @Override
077    public void initialize(KeyValueStoreDescriptor descriptor) {
078        log.debug("Initializing");
079        name = descriptor.name;
080        Map<String, String> properties = descriptor.getProperties();
081        String name = properties.get(NAMESPACE_PROP);
082        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
083        namespace = redisAdmin.namespace(name == null ? new String[0] : new String[] { name });
084        try {
085            compareAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-set"));
086            compareAndDelSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-del"));
087            compareNullAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-null-and-set"));
088        } catch (IOException e) {
089            throw new NuxeoException("Cannot load Redis script", e);
090        }
091    }
092
093    @Override
094    public Stream<String> keyStream() {
095        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
096        int namespaceLength = namespace.length();
097        Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + "*"));
098        return keys.stream().map(key -> key.substring(namespaceLength));
099    }
100
101    @Override
102    public void close() {
103        log.debug("Closed");
104    }
105
106    @Override
107    public void clear() {
108        RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class);
109        redisAdmin.clear(namespace + "*");
110    }
111
112    @Override
113    public void put(String key, byte[] value, long ttl) {
114        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
115        redisExecutor.execute(jedis -> {
116            byte[] keyb = getBytes(namespace + key);
117            if (value == null) {
118                jedis.del(keyb);
119            } else if (ttl == 0) {
120                jedis.set(keyb, value);
121            } else {
122                jedis.setex(keyb, (int) ttl, value);
123            }
124            return null;
125        });
126    }
127
128    @Override
129    public byte[] get(String key) {
130        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
131        return redisExecutor.execute(jedis -> jedis.get(getBytes(namespace + key)));
132    }
133
134    @Override
135    public Map<String, byte[]> get(Collection<String> keys) {
136        Map<String, byte[]> map = new HashMap<>(keys.size());
137        List<byte[]> values = getValuesForKeys(keys);
138        int i = 0;
139        for (String key : keys) {
140            byte[] value = values.get(i++);
141            if (value != null) {
142                map.put(key, value);
143            }
144        }
145        return map;
146    }
147
148    @Override
149    public Map<String, String> getStrings(Collection<String> keys) {
150        Map<String, String> map = new HashMap<>(keys.size());
151        List<byte[]> values = getValuesForKeys(keys);
152        int i = 0;
153        for (String key : keys) {
154            byte[] value = values.get(i++);
155            if (value != null) {
156                try {
157                    map.put(key, bytesToString(value));
158                } catch (CharacterCodingException e) {
159                    throw new IllegalArgumentException("Value is not a String for key: " + key);
160                }
161            }
162        }
163        return map;
164    }
165
166    /**
167     * @since 9.10
168     */
169    protected List<byte[]> getValuesForKeys(Collection<String> keys) {
170        byte[][] byteKeys = new byte[keys.size()][];
171        int i = 0;
172        for (String key : keys) {
173            byteKeys[i++] = getBytes(namespace + key);
174        }
175        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
176        return redisExecutor.execute(jedis -> jedis.mget(byteKeys));
177    }
178
179    @Override
180    public boolean setTTL(String key, long ttl) {
181        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
182        Long result = redisExecutor.execute(jedis -> {
183            byte[] keyb = getBytes(namespace + key);
184            if (ttl == 0) {
185                return jedis.persist(keyb);
186            } else {
187                return jedis.expire(keyb, (int) ttl);
188            }
189        });
190        return ONE.equals(result);
191    }
192
193    @Override
194    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
195        if (expected == null && value == null) {
196            return get(key) == null;
197        } else {
198            byte[] sha;
199            List<byte[]> keys = Collections.singletonList(getBytes(namespace + key));
200            List<byte[]> args;
201            if (expected == null) {
202                sha = compareNullAndSetSHA;
203                args = Collections.singletonList(value);
204            } else if (value == null) {
205                sha = compareAndDelSHA;
206                args = Collections.singletonList(expected);
207            } else {
208                sha = compareAndSetSHA;
209                args = Arrays.asList(expected, value);
210            }
211            RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
212            Object result = redisExecutor.evalsha(sha, keys, args);
213            boolean set = ONE.equals(result);
214            if (set && value != null && ttl != 0) {
215                // no need to be atomic and to a SETEX, so just do the EXPIRE now
216                setTTL(key, ttl);
217            }
218            return set;
219        }
220    }
221
222    @Override
223    public String toString() {
224        return getClass().getSimpleName() + "(" + name + ")";
225    }
226
227}