001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.kv;
020
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import java.util.Objects;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Lock;
027import java.util.stream.Stream;
028
029import org.apache.commons.lang3.reflect.FieldUtils;
030
031import net.jodah.expiringmap.ExpiringMap;
032
033/**
034 * Memory-based implementation of a Key/Value store.
035 *
036 * @since 9.1
037 */
038public class MemKeyValueStore extends AbstractKeyValueStoreProvider {
039
040    protected final ExpiringMap<String, byte[]> map;
041
042    protected final Lock writeLock;
043
044    public MemKeyValueStore() {
045        map = ExpiringMap.builder().expiration(Integer.MAX_VALUE, TimeUnit.DAYS).variableExpiration().build();
046        try {
047            writeLock = (Lock) FieldUtils.readField(map, "writeLock", true);
048        } catch (ReflectiveOperationException e) {
049            throw new RuntimeException(e);
050        }
051    }
052
053    @Override
054    public Stream<String> keyStream() {
055        // don't return a keySet stream directly as it may
056        // throw ConcurrentModificationException if there are concurrent writes
057        List<String> keys = new ArrayList<>(map.keySet());
058        return keys.stream();
059    }
060
061    @Override
062    public Stream<String> keyStream(String prefix) {
063        // don't return a keySet stream directly as it may
064        // throw ConcurrentModificationException if there are concurrent writes
065        List<String> keys = new ArrayList<>();
066        map.keySet().stream().filter(key -> key.startsWith(prefix)).forEach(keys::add);
067        return keys.stream();
068    }
069
070    @Override
071    public void close() {
072    }
073
074    @Override
075    public void clear() {
076        map.clear();
077    }
078
079    protected static byte[] clone(byte[] value) {
080        return value == null ? null : value.clone();
081    }
082
083    @Override
084    public void put(String key, byte[] value, long ttl) {
085        Objects.requireNonNull(key);
086        value = clone(value);
087        if (value == null) {
088            map.remove(key);
089        } else if (ttl == 0) {
090            map.put(key, value);
091        } else {
092            map.put(key, value, ttl, TimeUnit.SECONDS);
093        }
094    }
095
096    @Override
097    public byte[] get(String key) {
098        Objects.requireNonNull(key);
099        byte[] value = map.get(key);
100        return clone(value);
101    }
102
103    @Override
104    public boolean setTTL(String key, long ttl) {
105        Objects.requireNonNull(key);
106        byte[] value = map.get(key);
107        if (value == null) {
108            return false;
109        }
110        doSetTTL(key, ttl);
111        return true;
112    }
113
114    protected void doSetTTL(String key, long ttl) {
115        if (ttl == 0) {
116            map.setExpiration(key, Integer.MAX_VALUE, TimeUnit.DAYS);
117        } else {
118            map.setExpiration(key, ttl, TimeUnit.SECONDS);
119        }
120    }
121
122    @Override
123    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
124        Objects.requireNonNull(key);
125        // clone is not needed if the comparison fails
126        // but we are optimistic and prefer to do the clone outside the lock
127        value = clone(value);
128        // we don't use ExpiringMap.replace because it deals with null differently
129        writeLock.lock();
130        try {
131            byte[] current = map.get(key);
132            boolean equal = Arrays.equals(expected, current);
133            if (equal) {
134                if (value == null) {
135                    map.remove(key);
136                } else {
137                    map.put(key, value);
138                    doSetTTL(key, ttl);
139                }
140            }
141            return equal;
142        } finally {
143            writeLock.unlock();
144        }
145    }
146
147}