001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.apache.commons.lang3.StringUtils.isBlank; 023 024import java.io.IOException; 025import java.nio.charset.CharacterCodingException; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.stream.Stream; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.ecm.core.api.NuxeoException; 038import org.nuxeo.ecm.core.redis.RedisAdmin; 039import org.nuxeo.ecm.core.redis.RedisExecutor; 040import org.nuxeo.runtime.api.Framework; 041import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 042import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 043 044import redis.clients.jedis.exceptions.JedisDataException; 045 046/** 047 * Redis implementation of a Key/Value Store Provider. 048 * <p> 049 * The following configuration properties are available: 050 * <ul> 051 * <li>namespace: the Redis namespace to use for keys (in addition to the global Redis namespace configured in the Redis 052 * service). DEPRECATED since 10.10, use the descriptor's {@code <namespace>} element instead. 053 * </ul> 054 * 055 * @since 9.1 056 */ 057public class RedisKeyValueStore extends AbstractKeyValueStoreProvider { 058 059 private static final Log log = LogFactory.getLog(RedisKeyValueStore.class); 060 061 /** 062 * @deprecated since 10.10 063 */ 064 @Deprecated 065 public static final String NAMESPACE_PROP = "namespace"; 066 067 protected static final Long ONE = Long.valueOf(1); 068 069 protected String namespace; 070 071 protected byte[] compareAndSetSHA; 072 073 protected byte[] compareAndDelSHA; 074 075 protected byte[] compareNullAndSetSHA; 076 077 protected static byte[] getBytes(String key) { 078 return key.getBytes(UTF_8); 079 } 080 081 @Override 082 public void initialize(KeyValueStoreDescriptor descriptor) { 083 super.initialize(descriptor); 084 log.debug("Initializing"); 085 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 086 namespace = redisAdmin.namespace(getNamespace(descriptor)); 087 try { 088 compareAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-set")); 089 compareAndDelSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-del")); 090 compareNullAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-null-and-set")); 091 } catch (IOException e) { 092 throw new NuxeoException("Cannot load Redis script", e); 093 } 094 } 095 096 protected String[] getNamespace(KeyValueStoreDescriptor descriptor) { 097 String ns = descriptor.namespace; 098 if (isBlank(ns)) { 099 ns = descriptor.properties.get(NAMESPACE_PROP); 100 if (isBlank(ns)) { 101 return new String[0]; 102 } 103 } 104 return new String[] { ns.trim() }; 105 } 106 107 @Override 108 public Stream<String> keyStream() { 109 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 110 int namespaceLength = namespace.length(); 111 Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + "*")); 112 return keys.stream().map(key -> key.substring(namespaceLength)); 113 } 114 115 @Override 116 public Stream<String> keyStream(String prefix) { 117 final String prefixf = ecapeGlob(prefix); 118 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 119 int namespaceLength = namespace.length(); 120 Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + prefixf + "*")); 121 return keys.stream().map(key -> key.substring(namespaceLength)); 122 } 123 124 /** Escape glob-like wildcards and [] char ranges with a backslash. */ 125 public static String ecapeGlob(String prefix) { 126 if (prefix.indexOf('\\') >= 0 || prefix.indexOf('?') >= 0 || prefix.indexOf('*') >= 0 127 || prefix.indexOf('[') >= 0) { 128 prefix = prefix.replace("\\", "\\\\").replace("?", "\\?").replace("*", "\\*").replace("[", "\\["); 129 } 130 return prefix; 131 } 132 133 @Override 134 public void close() { 135 log.debug("Closed"); 136 } 137 138 @Override 139 public void clear() { 140 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 141 redisAdmin.clear(namespace + "*"); 142 } 143 144 @Override 145 public void put(String key, byte[] value, long ttl) { 146 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 147 redisExecutor.execute(jedis -> { 148 byte[] keyb = getBytes(namespace + key); 149 if (value == null) { 150 jedis.del(keyb); 151 } else if (ttl == 0) { 152 jedis.set(keyb, value); 153 } else { 154 jedis.setex(keyb, (int) ttl, value); 155 } 156 return null; 157 }); 158 } 159 160 @Override 161 public byte[] get(String key) { 162 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 163 return redisExecutor.execute(jedis -> jedis.get(getBytes(namespace + key))); 164 } 165 166 @Override 167 public Map<String, byte[]> get(Collection<String> keys) { 168 Map<String, byte[]> map = new HashMap<>(keys.size()); 169 List<byte[]> values = getValuesForKeys(keys); 170 int i = 0; 171 for (String key : keys) { 172 byte[] value = values.get(i++); 173 if (value != null) { 174 map.put(key, value); 175 } 176 } 177 return map; 178 } 179 180 @Override 181 public Map<String, String> getStrings(Collection<String> keys) { 182 Map<String, String> map = new HashMap<>(keys.size()); 183 List<byte[]> values = getValuesForKeys(keys); 184 int i = 0; 185 for (String key : keys) { 186 byte[] value = values.get(i++); 187 if (value != null) { 188 try { 189 map.put(key, bytesToString(value)); 190 } catch (CharacterCodingException e) { 191 throw new IllegalArgumentException("Value is not a String for key: " + key); 192 } 193 } 194 } 195 return map; 196 } 197 198 @Override 199 public Map<String, Long> getLongs(Collection<String> keys) { 200 Map<String, Long> map = new HashMap<>(keys.size()); 201 List<byte[]> values = getValuesForKeys(keys); 202 int i = 0; 203 for (String key : keys) { 204 byte[] value = values.get(i++); 205 if (value != null) { 206 map.put(key, bytesToLong(value)); 207 } 208 } 209 return map; 210 } 211 212 /** 213 * @since 9.10 214 */ 215 protected List<byte[]> getValuesForKeys(Collection<String> keys) { 216 byte[][] byteKeys = new byte[keys.size()][]; 217 int i = 0; 218 for (String key : keys) { 219 byteKeys[i++] = getBytes(namespace + key); 220 } 221 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 222 return redisExecutor.execute(jedis -> jedis.mget(byteKeys)); 223 } 224 225 @Override 226 public boolean setTTL(String key, long ttl) { 227 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 228 Long result = redisExecutor.execute(jedis -> { 229 byte[] keyb = getBytes(namespace + key); 230 if (ttl == 0) { 231 return jedis.persist(keyb); 232 } else { 233 return jedis.expire(keyb, (int) ttl); 234 } 235 }); 236 return ONE.equals(result); 237 } 238 239 @Override 240 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 241 if (expected == null && value == null) { 242 return get(key) == null; 243 } else { 244 byte[] sha; 245 List<byte[]> keys = Collections.singletonList(getBytes(namespace + key)); 246 List<byte[]> args; 247 if (expected == null) { 248 sha = compareNullAndSetSHA; 249 args = Collections.singletonList(value); 250 } else if (value == null) { 251 sha = compareAndDelSHA; 252 args = Collections.singletonList(expected); 253 } else { 254 sha = compareAndSetSHA; 255 args = Arrays.asList(expected, value); 256 } 257 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 258 Object result = redisExecutor.evalsha(sha, keys, args); 259 boolean set = ONE.equals(result); 260 if (set && value != null && ttl != 0) { 261 // no need to be atomic and to a SETEX, so just do the EXPIRE now 262 setTTL(key, ttl); 263 } 264 return set; 265 } 266 } 267 268 @Override 269 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 270 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 271 Long result = redisExecutor.execute(jedis -> { 272 byte[] keyb = getBytes(namespace + key); 273 try { 274 return jedis.incrBy(keyb, delta); 275 } catch (JedisDataException e) { 276 throw new NumberFormatException(); 277 } 278 }); 279 return result.longValue(); 280 } 281 282}