001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.nio.charset.CharacterCodingException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.stream.Stream; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.ecm.core.api.NuxeoException; 037import org.nuxeo.ecm.core.redis.RedisAdmin; 038import org.nuxeo.ecm.core.redis.RedisExecutor; 039import org.nuxeo.runtime.api.Framework; 040import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 041import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 042 043/** 044 * Redis implementation of a Key/Value Store Provider. 045 * <p> 046 * The following configuration properties are available: 047 * <ul> 048 * <li>namespace: the Redis namespace to use for keys (in addition to the global Redis namespace configured in the Redis 049 * service). 050 * </ul> 051 * 052 * @since 9.1 053 */ 054public class RedisKeyValueStore extends AbstractKeyValueStoreProvider { 055 056 private static final Log log = LogFactory.getLog(RedisKeyValueStore.class); 057 058 public static final String NAMESPACE_PROP = "namespace"; 059 060 protected static final Long ONE = Long.valueOf(1); 061 062 protected String name; 063 064 protected String namespace; 065 066 protected byte[] compareAndSetSHA; 067 068 protected byte[] compareAndDelSHA; 069 070 protected byte[] compareNullAndSetSHA; 071 072 protected static byte[] getBytes(String key) { 073 return key.getBytes(UTF_8); 074 } 075 076 @Override 077 public void initialize(KeyValueStoreDescriptor descriptor) { 078 log.debug("Initializing"); 079 name = descriptor.name; 080 Map<String, String> properties = descriptor.getProperties(); 081 String name = properties.get(NAMESPACE_PROP); 082 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 083 namespace = redisAdmin.namespace(name == null ? new String[0] : new String[] { name }); 084 try { 085 compareAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-set")); 086 compareAndDelSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-and-del")); 087 compareNullAndSetSHA = getBytes(redisAdmin.load("org.nuxeo.ecm.core.redis", "compare-null-and-set")); 088 } catch (IOException e) { 089 throw new NuxeoException("Cannot load Redis script", e); 090 } 091 } 092 093 @Override 094 public Stream<String> keyStream() { 095 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 096 int namespaceLength = namespace.length(); 097 Set<String> keys = redisExecutor.execute(jedis -> jedis.keys(namespace + "*")); 098 return keys.stream().map(key -> key.substring(namespaceLength)); 099 } 100 101 @Override 102 public void close() { 103 log.debug("Closed"); 104 } 105 106 @Override 107 public void clear() { 108 RedisAdmin redisAdmin = Framework.getService(RedisAdmin.class); 109 redisAdmin.clear(namespace + "*"); 110 } 111 112 @Override 113 public void put(String key, byte[] value, long ttl) { 114 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 115 redisExecutor.execute(jedis -> { 116 byte[] keyb = getBytes(namespace + key); 117 if (value == null) { 118 jedis.del(keyb); 119 } else if (ttl == 0) { 120 jedis.set(keyb, value); 121 } else { 122 jedis.setex(keyb, (int) ttl, value); 123 } 124 return null; 125 }); 126 } 127 128 @Override 129 public byte[] get(String key) { 130 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 131 return redisExecutor.execute(jedis -> jedis.get(getBytes(namespace + key))); 132 } 133 134 @Override 135 public Map<String, byte[]> get(Collection<String> keys) { 136 Map<String, byte[]> map = new HashMap<>(keys.size()); 137 List<byte[]> values = getValuesForKeys(keys); 138 int i = 0; 139 for (String key : keys) { 140 byte[] value = values.get(i++); 141 if (value != null) { 142 map.put(key, value); 143 } 144 } 145 return map; 146 } 147 148 @Override 149 public Map<String, String> getStrings(Collection<String> keys) { 150 Map<String, String> map = new HashMap<>(keys.size()); 151 List<byte[]> values = getValuesForKeys(keys); 152 int i = 0; 153 for (String key : keys) { 154 byte[] value = values.get(i++); 155 if (value != null) { 156 try { 157 map.put(key, bytesToString(value)); 158 } catch (CharacterCodingException e) { 159 throw new IllegalArgumentException("Value is not a String for key: " + key); 160 } 161 } 162 } 163 return map; 164 } 165 166 /** 167 * @since 9.10 168 */ 169 protected List<byte[]> getValuesForKeys(Collection<String> keys) { 170 byte[][] byteKeys = new byte[keys.size()][]; 171 int i = 0; 172 for (String key : keys) { 173 byteKeys[i++] = getBytes(namespace + key); 174 } 175 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 176 return redisExecutor.execute(jedis -> jedis.mget(byteKeys)); 177 } 178 179 @Override 180 public boolean setTTL(String key, long ttl) { 181 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 182 Long result = redisExecutor.execute(jedis -> { 183 byte[] keyb = getBytes(namespace + key); 184 if (ttl == 0) { 185 return jedis.persist(keyb); 186 } else { 187 return jedis.expire(keyb, (int) ttl); 188 } 189 }); 190 return ONE.equals(result); 191 } 192 193 @Override 194 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 195 if (expected == null && value == null) { 196 return get(key) == null; 197 } else { 198 byte[] sha; 199 List<byte[]> keys = Collections.singletonList(getBytes(namespace + key)); 200 List<byte[]> args; 201 if (expected == null) { 202 sha = compareNullAndSetSHA; 203 args = Collections.singletonList(value); 204 } else if (value == null) { 205 sha = compareAndDelSHA; 206 args = Collections.singletonList(expected); 207 } else { 208 sha = compareAndSetSHA; 209 args = Arrays.asList(expected, value); 210 } 211 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 212 Object result = redisExecutor.evalsha(sha, keys, args); 213 boolean set = ONE.equals(result); 214 if (set && value != null && ttl != 0) { 215 // no need to be atomic and to a SETEX, so just do the EXPIRE now 216 setTTL(key, ttl); 217 } 218 return set; 219 } 220 } 221 222 @Override 223 public String toString() { 224 return getClass().getSimpleName() + "(" + name + ")"; 225 } 226 227}