001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.nio.charset.CharacterCodingException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.stream.Stream; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.redis.RedisAdmin; 038import org.nuxeo.ecm.core.redis.RedisExecutor; 039import org.nuxeo.runtime.api.Framework; 040import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 041import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 042 043import redis.clients.jedis.exceptions.JedisDataException; 044 045/** 046 * Redis implementation of a Key/Value Store Provider. 047 * <p> 048 * The following configuration properties are available: 049 * <ul> 050 * <li>namespace: the Redis namespace to use for keys (in addition to the global Redis namespace configured in the Redis 051 * service). 052 * </ul> 053 * 054 * @since 9.1 055 */ 056public class RedisKeyValueStore extends AbstractKeyValueStoreProvider { 057 058 private static final Log log = LogFactory.getLog(RedisKeyValueStore.class); 059 060 public static final String NAMESPACE_PROP = "namespace"; 061 062 protected static final Long ONE = Long.valueOf(1); 063 064 protected String name; 065 066 protected String namespace; 067 068 protected byte[] compareAndSetSHA; 069 070 protected byte[] compareAndDelSHA; 071 072 protected byte[] compareNullAndSetSHA; 073 074 protected static byte[] getBytes(String key) { 075 return key.getBytes(UTF_8); 076 } 077 078 @Override 079 public void initialize(KeyValueStoreDescriptor descriptor) { 080 log.debug("Initializing"); 081 name = descriptor.name; 082 Map<String, String> properties = descriptor.getProperties(); 083 String name = properties.get(NAMESPACE_PROP); 084 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 085 namespace = redisAdmin.namespace(name == null ? new String[0] : new String[] { name }); 086 try { 087 compareAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-set")); 088 compareAndDelSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-del")); 089 compareNullAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-null-and-set")); 090 } catch (IOException e) { 091 throw new NuxeoException("Cannot load Redis script", e); 092 } 093 } 094 095 @Override 096 public Stream<String> keyStream() { 097 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 098 int namespaceLength = namespace.length(); 099 Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + "*")); 100 return keys.stream().map(key -> key.substring(namespaceLength)); 101 } 102 103 @Override 104 public void close() { 105 log.debug("Closed"); 106 } 107 108 @Override 109 public void clear() { 110 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 111 redisAdmin.clear(namespace + "*"); 112 } 113 114 @Override 115 public void put(String key, byte[] value, long ttl) { 116 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 117 redisExecutor.execute(jedis -> { 118 byte[] keyb = getBytes(namespace + key); 119 if (value == null) { 120 jedis.del(keyb); 121 } else if (ttl == 0) { 122 jedis.set(keyb, value); 123 } else { 124 jedis.setex(keyb, (int) ttl, value); 125 } 126 return null; 127 }); 128 } 129 130 @Override 131 public byte[] get(String key) { 132 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 133 return redisExecutor.execute(jedis -> jedis.get(getBytes(namespace + key))); 134 } 135 136 @Override 137 public Map<String, byte[]> get(Collection<String> keys) { 138 Map<String, byte[]> map = new HashMap<>(keys.size()); 139 List<byte[]> values = getValuesForKeys(keys); 140 int i = 0; 141 for (String key : keys) { 142 byte[] value = values.get(i++); 143 if (value != null) { 144 map.put(key, value); 145 } 146 } 147 return map; 148 } 149 150 @Override 151 public Map<String, String> getStrings(Collection<String> keys) { 152 Map<String, String> map = new HashMap<>(keys.size()); 153 List<byte[]> values = getValuesForKeys(keys); 154 int i = 0; 155 for (String key : keys) { 156 byte[] value = values.get(i++); 157 if (value != null) { 158 try { 159 map.put(key, bytesToString(value)); 160 } catch (CharacterCodingException e) { 161 throw new IllegalArgumentException("Value is not a String for key: " + key); 162 } 163 } 164 } 165 return map; 166 } 167 168 @Override 169 public Map<String, Long> getLongs(Collection<String> keys) { 170 Map<String, Long> map = new HashMap<>(keys.size()); 171 List<byte[]> values = getValuesForKeys(keys); 172 int i = 0; 173 for (String key : keys) { 174 byte[] value = values.get(i++); 175 if (value != null) { 176 map.put(key, bytesToLong(value)); 177 } 178 } 179 return map; 180 } 181 182 /** 183 * @since 9.10 184 */ 185 protected List<byte[]> getValuesForKeys(Collection<String> keys) { 186 byte[][] byteKeys = new byte[keys.size()][]; 187 int i = 0; 188 for (String key : keys) { 189 byteKeys[i++] = getBytes(namespace + key); 190 } 191 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 192 return redisExecutor.execute(jedis -> jedis.mget(byteKeys)); 193 } 194 195 @Override 196 public boolean setTTL(String key, long ttl) { 197 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 198 Long result = redisExecutor.execute(jedis -> { 199 byte[] keyb = getBytes(namespace + key); 200 if (ttl == 0) { 201 return jedis.persist(keyb); 202 } else { 203 return jedis.expire(keyb, (int) ttl); 204 } 205 }); 206 return ONE.equals(result); 207 } 208 209 @Override 210 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 211 if (expected == null && value == null) { 212 return get(key) == null; 213 } else { 214 byte[] sha; 215 List<byte[]> keys = Collections.singletonList(getBytes(namespace + key)); 216 List<byte[]> args; 217 if (expected == null) { 218 sha = compareNullAndSetSHA; 219 args = Collections.singletonList(value); 220 } else if (value == null) { 221 sha = compareAndDelSHA; 222 args = Collections.singletonList(expected); 223 } else { 224 sha = compareAndSetSHA; 225 args = Arrays.asList(expected, value); 226 } 227 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 228 Object result = redisExecutor.evalsha(sha, keys, args); 229 boolean set = ONE.equals(result); 230 if (set && value != null && ttl != 0) { 231 // no need to be atomic and to a SETEX, so just do the EXPIRE now 232 setTTL(key, ttl); 233 } 234 return set; 235 } 236 } 237 238 @Override 239 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 240 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 241 Long result = redisExecutor.execute(jedis -> { 242 byte[] keyb = getBytes(namespace + key); 243 try { 244 return jedis.incrBy(keyb, delta); 245 } catch (JedisDataException e) { 246 throw new NumberFormatException(); 247 } 248 }); 249 return result.longValue(); 250 } 251 252 @Override 253 public String toString() { 254 return getClass().getSimpleName() + "(" + name + ")"; 255 } 256 257}