001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.kv; 020 021import java.lang.reflect.Field; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.List; 025import java.util.Objects; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.locks.Lock; 028import java.util.stream.Stream; 029 030import net.jodah.expiringmap.ExpiringMap; 031 032/** 033 * Memory-based implementation of a Key/Value store. 034 * 035 * @since 9.1 036 */ 037public class MemKeyValueStore extends AbstractKeyValueStoreProvider { 038 039 protected final ExpiringMap<String, byte[]> map; 040 041 protected final Lock writeLock; 042 043 public MemKeyValueStore() { 044 map = ExpiringMap.builder().expiration(Integer.MAX_VALUE, TimeUnit.DAYS).variableExpiration().build(); 045 try { 046 Field field = map.getClass().getDeclaredField("writeLock"); 047 field.setAccessible(true); 048 writeLock = (Lock) field.get(map); 049 } catch (ReflectiveOperationException | SecurityException e) { 050 throw new RuntimeException(e); 051 } 052 } 053 054 @Override 055 public Stream<String> keyStream() { 056 // don't return a keySet stream directly as it may 057 // throw ConcurrentModificationException if there are concurrent writes 058 List<String> keys = new ArrayList<>(map.keySet()); 059 return keys.stream(); 060 } 061 062 @Override 063 public Stream<String> keyStream(String prefix) { 064 // don't return a keySet stream directly as it may 065 // throw ConcurrentModificationException if there are concurrent writes 066 List<String> keys = new ArrayList<>(); 067 map.keySet().stream().filter(key -> key.startsWith(prefix)).forEach(keys::add); 068 return keys.stream(); 069 } 070 071 @Override 072 public void close() { 073 } 074 075 @Override 076 public void clear() { 077 map.clear(); 078 } 079 080 protected static byte[] clone(byte[] value) { 081 return value == null ? null : value.clone(); 082 } 083 084 @Override 085 public void put(String key, byte[] value, long ttl) { 086 Objects.requireNonNull(key); 087 value = clone(value); 088 if (value == null) { 089 map.remove(key); 090 } else if (ttl == 0) { 091 map.put(key, value); 092 } else { 093 map.put(key, value, ttl, TimeUnit.SECONDS); 094 } 095 } 096 097 @Override 098 public byte[] get(String key) { 099 Objects.requireNonNull(key); 100 byte[] value = map.get(key); 101 return clone(value); 102 } 103 104 @Override 105 public boolean setTTL(String key, long ttl) { 106 Objects.requireNonNull(key); 107 byte[] value = map.get(key); 108 if (value == null) { 109 return false; 110 } 111 doSetTTL(key, ttl); 112 return true; 113 } 114 115 protected void doSetTTL(String key, long ttl) { 116 if (ttl == 0) { 117 map.setExpiration(key, Integer.MAX_VALUE, TimeUnit.DAYS); 118 } else { 119 map.setExpiration(key, ttl, TimeUnit.SECONDS); 120 } 121 } 122 123 @Override 124 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 125 Objects.requireNonNull(key); 126 // clone is not needed if the comparison fails 127 // but we are optimistic and prefer to do the clone outside the lock 128 value = clone(value); 129 // we don't use ExpiringMap.replace because it deals with null differently 130 writeLock.lock(); 131 try { 132 byte[] current = map.get(key); 133 boolean equal = Arrays.equals(expected, current); 134 if (equal) { 135 if (value == null) { 136 map.remove(key); 137 } else { 138 map.put(key, value); 139 doSetTTL(key, ttl); 140 } 141 } 142 return equal; 143 } finally { 144 writeLock.unlock(); 145 } 146 } 147 148}