001/*
002 * (C) Copyright 2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Maxime Hilaire
018 */
019
020package org.nuxeo.ecm.core.redis.contribs;
021
022import java.io.ByteArrayInputStream;
023import java.io.ByteArrayOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.ObjectInputStream;
027import java.io.ObjectOutputStream;
028import java.io.Serializable;
029import java.util.Set;
030import java.util.stream.Collectors;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.api.NuxeoException;
035import org.nuxeo.ecm.core.cache.AbstractCache;
036import org.nuxeo.ecm.core.cache.CacheDescriptor;
037import org.nuxeo.ecm.core.redis.RedisAdmin;
038import org.nuxeo.ecm.core.redis.RedisCallable;
039import org.nuxeo.ecm.core.redis.RedisExecutor;
040import org.nuxeo.runtime.api.Framework;
041
042import redis.clients.jedis.Jedis;
043
044/**
045 * Cache implementation on top of Redis
046 *
047 * @since 6.0
048 */
049public class RedisCache extends AbstractCache {
050
051    protected static final String UTF_8 = "UTF-8";
052
053    protected static final Log log = LogFactory.getLog(RedisCache.class);
054
055    protected final RedisExecutor executor;
056
057    protected final String namespace;
058
059    public RedisCache(CacheDescriptor desc) {
060        super(desc);
061        executor = Framework.getService(RedisExecutor.class);
062        namespace = Framework.getService(RedisAdmin.class).namespace("cache", name);
063    }
064
065    protected String formatKey(String key) {
066        return namespace.concat(key);
067    }
068
069    protected Serializable deserializeValue(byte[] workBytes) throws IOException {
070        if (workBytes == null) {
071            return null;
072        }
073
074        InputStream bain = new ByteArrayInputStream(workBytes);
075        ObjectInputStream in = new ObjectInputStream(bain);
076        try {
077            return (Serializable) in.readObject();
078        } catch (ClassNotFoundException e) {
079            throw new NuxeoException(e);
080        }
081    }
082
083    protected static byte[] bytes(String string) {
084        try {
085            return string.getBytes(UTF_8);
086        } catch (IOException e) {
087            // cannot happen for UTF-8
088            throw new NuxeoException(e);
089        }
090    }
091
092    @Override
093    public Serializable get(final String key) {
094        return executor.execute(new RedisCallable<Serializable>() {
095            @Override
096            public Serializable call(Jedis jedis) {
097                try {
098                    return deserializeValue(jedis.get(bytes(formatKey(key))));
099                } catch (IOException e) {
100                    log.error(e);
101                    return null;
102                }
103            }
104        });
105
106    }
107
108    @Override
109    public Set<String> keySet() {
110        return executor.execute(new RedisCallable<Set<String>>() {
111            @Override
112            public Set<String> call(Jedis jedis) {
113                int offset = namespace.length();
114                return jedis.keys(formatKey("*"))
115                            .stream()
116                            .map(key -> key.substring(offset))
117                            .collect(Collectors.toSet());
118            }
119        });
120    }
121
122    protected byte[] serializeValue(Serializable value) throws IOException {
123        ByteArrayOutputStream baout = new ByteArrayOutputStream();
124        ObjectOutputStream out = new ObjectOutputStream(baout);
125        out.writeObject(value);
126        out.flush();
127        out.close();
128        return baout.toByteArray();
129    }
130
131    @Override
132    public void invalidate(final String key) {
133        executor.execute(new RedisCallable<Void>() {
134            @Override
135            public Void call(Jedis jedis) {
136                jedis.del(new String[] { formatKey(key) });
137                return null;
138            }
139        });
140    }
141
142    @Override
143    public void invalidateAll() {
144        Framework.getService(RedisAdmin.class).clear(formatKey("*"));
145    }
146
147    @Override
148    public void put(final String key, final Serializable value) {
149        executor.execute(new RedisCallable<Void>() {
150            @Override
151            public Void call(Jedis jedis) {
152                try {
153                    byte[] bkey = bytes(formatKey(key));
154                    jedis.set(bkey, serializeValue(value));
155                    // Redis set in second ttl but descriptor set as mn
156                    int ttlKey = ttl * 60;
157                    jedis.expire(bkey, ttlKey);
158                    return null;
159                } catch (IOException e) {
160                    throw new NuxeoException(e);
161                }
162            }
163        });
164    }
165
166    @Override
167    public boolean hasEntry(final String key) {
168        return (Boolean) executor.execute(new RedisCallable<Serializable>() {
169            @Override
170            public Serializable call(Jedis jedis) {
171                return jedis.exists(bytes(formatKey(key)));
172            }
173        });
174    }
175
176}