001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.kv; 020 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.Objects; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Lock; 027import java.util.stream.Stream; 028 029import org.apache.commons.lang3.reflect.FieldUtils; 030 031import net.jodah.expiringmap.ExpiringMap; 032 033/** 034 * Memory-based implementation of a Key/Value store. 035 * 036 * @since 9.1 037 */ 038public class MemKeyValueStore extends AbstractKeyValueStoreProvider { 039 040 protected final ExpiringMap<String, byte[]> map; 041 042 protected final Lock writeLock; 043 044 public MemKeyValueStore() { 045 map = ExpiringMap.builder().expiration(Integer.MAX_VALUE, TimeUnit.DAYS).variableExpiration().build(); 046 try { 047 writeLock = (Lock) FieldUtils.readField(map, "writeLock", true); 048 } catch (ReflectiveOperationException e) { 049 throw new RuntimeException(e); 050 } 051 } 052 053 @Override 054 public Stream<String> keyStream() { 055 // don't return a keySet stream directly as it may 056 // throw ConcurrentModificationException if there are concurrent writes 057 List<String> keys = new ArrayList<>(map.keySet()); 058 return keys.stream(); 059 } 060 061 @Override 062 public Stream<String> keyStream(String prefix) { 063 // don't return a keySet stream directly as it may 064 // throw ConcurrentModificationException if there are concurrent writes 065 List<String> keys = new ArrayList<>(); 066 map.keySet().stream().filter(key -> key.startsWith(prefix)).forEach(keys::add); 067 return keys.stream(); 068 } 069 070 @Override 071 public void close() { 072 } 073 074 @Override 075 public void clear() { 076 map.clear(); 077 } 078 079 protected static byte[] clone(byte[] value) { 080 return value == null ? null : value.clone(); 081 } 082 083 @Override 084 public void put(String key, byte[] value, long ttl) { 085 Objects.requireNonNull(key); 086 value = clone(value); 087 if (value == null) { 088 map.remove(key); 089 } else if (ttl == 0) { 090 map.put(key, value); 091 } else { 092 map.put(key, value, ttl, TimeUnit.SECONDS); 093 } 094 } 095 096 @Override 097 public byte[] get(String key) { 098 Objects.requireNonNull(key); 099 byte[] value = map.get(key); 100 return clone(value); 101 } 102 103 @Override 104 public boolean setTTL(String key, long ttl) { 105 Objects.requireNonNull(key); 106 byte[] value = map.get(key); 107 if (value == null) { 108 return false; 109 } 110 doSetTTL(key, ttl); 111 return true; 112 } 113 114 protected void doSetTTL(String key, long ttl) { 115 if (ttl == 0) { 116 map.setExpiration(key, Integer.MAX_VALUE, TimeUnit.DAYS); 117 } else { 118 map.setExpiration(key, ttl, TimeUnit.SECONDS); 119 } 120 } 121 122 @Override 123 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 124 Objects.requireNonNull(key); 125 // clone is not needed if the comparison fails 126 // but we are optimistic and prefer to do the clone outside the lock 127 value = clone(value); 128 // we don't use ExpiringMap.replace because it deals with null differently 129 writeLock.lock(); 130 try { 131 byte[] current = map.get(key); 132 boolean equal = Arrays.equals(expected, current); 133 if (equal) { 134 if (value == null) { 135 map.remove(key); 136 } else { 137 map.put(key, value); 138 doSetTTL(key, ttl); 139 } 140 } 141 return equal; 142 } finally { 143 writeLock.unlock(); 144 } 145 } 146 147}