001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.kv;
020
021import java.lang.reflect.Field;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.List;
025import java.util.Objects;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.locks.Lock;
028import java.util.stream.Stream;
029
030import net.jodah.expiringmap.ExpiringMap;
031
032/**
033 * Memory-based implementation of a Key/Value store.
034 *
035 * @since 9.1
036 */
037public class MemKeyValueStore extends AbstractKeyValueStoreProvider {
038
039    protected final ExpiringMap<String, byte[]> map;
040
041    protected final Lock writeLock;
042
043    public MemKeyValueStore() {
044        map = ExpiringMap.builder().expiration(Integer.MAX_VALUE, TimeUnit.DAYS).variableExpiration().build();
045        try {
046            Field field = map.getClass().getDeclaredField("writeLock");
047            field.setAccessible(true);
048            writeLock = (Lock) field.get(map);
049        } catch (ReflectiveOperationException | SecurityException e) {
050            throw new RuntimeException(e);
051        }
052    }
053
054    @Override
055    public Stream<String> keyStream() {
056        // don't return a keySet stream directly as it may
057        // throw ConcurrentModificationException if there are concurrent writes
058        List<String> keys = new ArrayList<>(map.keySet());
059        return keys.stream();
060    }
061
062    @Override
063    public Stream<String> keyStream(String prefix) {
064        // don't return a keySet stream directly as it may
065        // throw ConcurrentModificationException if there are concurrent writes
066        List<String> keys = new ArrayList<>();
067        map.keySet().stream().filter(key -> key.startsWith(prefix)).forEach(keys::add);
068        return keys.stream();
069    }
070
071    @Override
072    public void close() {
073    }
074
075    @Override
076    public void clear() {
077        map.clear();
078    }
079
080    protected static byte[] clone(byte[] value) {
081        return value == null ? null : value.clone();
082    }
083
084    @Override
085    public void put(String key, byte[] value, long ttl) {
086        Objects.requireNonNull(key);
087        value = clone(value);
088        if (value == null) {
089            map.remove(key);
090        } else if (ttl == 0) {
091            map.put(key, value);
092        } else {
093            map.put(key, value, ttl, TimeUnit.SECONDS);
094        }
095    }
096
097    @Override
098    public byte[] get(String key) {
099        Objects.requireNonNull(key);
100        byte[] value = map.get(key);
101        return clone(value);
102    }
103
104    @Override
105    public boolean setTTL(String key, long ttl) {
106        Objects.requireNonNull(key);
107        byte[] value = map.get(key);
108        if (value == null) {
109            return false;
110        }
111        doSetTTL(key, ttl);
112        return true;
113    }
114
115    protected void doSetTTL(String key, long ttl) {
116        if (ttl == 0) {
117            map.setExpiration(key, Integer.MAX_VALUE, TimeUnit.DAYS);
118        } else {
119            map.setExpiration(key, ttl, TimeUnit.SECONDS);
120        }
121    }
122
123    @Override
124    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
125        Objects.requireNonNull(key);
126        // clone is not needed if the comparison fails
127        // but we are optimistic and prefer to do the clone outside the lock
128        value = clone(value);
129        // we don't use ExpiringMap.replace because it deals with null differently
130        writeLock.lock();
131        try {
132            byte[] current = map.get(key);
133            boolean equal = Arrays.equals(expected, current);
134            if (equal) {
135                if (value == null) {
136                    map.remove(key);
137                } else {
138                    map.put(key, value);
139                    doSetTTL(key, ttl);
140                }
141            }
142            return equal;
143        } finally {
144            writeLock.unlock();
145        }
146    }
147
148}