001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.mongodb.kv; 020 021import static com.mongodb.client.model.Filters.and; 022import static com.mongodb.client.model.Filters.eq; 023import static com.mongodb.client.model.Filters.in; 024import static com.mongodb.client.model.Projections.include; 025import static com.mongodb.client.model.Updates.set; 026import static com.mongodb.client.model.Updates.unset; 027import static java.nio.charset.StandardCharsets.UTF_8; 028 029import java.nio.charset.CharacterCodingException; 030import java.util.Collection; 031import java.util.Date; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.concurrent.TimeUnit; 035import java.util.stream.Stream; 036import java.util.stream.StreamSupport; 037 038import com.mongodb.Block; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.bson.Document; 043import org.bson.conversions.Bson; 044import org.bson.types.Binary; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 047import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 048import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 049 050import com.mongodb.ErrorCategory; 051import com.mongodb.MongoWriteException; 052import com.mongodb.client.MongoCollection; 053import com.mongodb.client.MongoDatabase; 054import com.mongodb.client.model.IndexOptions; 055import com.mongodb.client.model.UpdateOptions; 056import com.mongodb.client.result.DeleteResult; 057import com.mongodb.client.result.UpdateResult; 058 059/** 060 * MongoDB implementation of a Key/Value Store Provider. 061 * <p> 062 * The following configuration properties are available: 063 * <ul> 064 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name. 065 * </ul> 066 * 067 * @since 9.3 068 */ 069public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider { 070 071 private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class); 072 073 public static final String KEYVALUE_CONNECTION_ID = "keyvalue"; 074 075 public static final String COLLECTION_PROP = "collection"; 076 077 public static final String COLLECTION_DEFAULT = "kv"; 078 079 public static final String ID_KEY = "_id"; 080 081 public static final String VALUE_KEY = "v"; 082 083 public static final String TTL_KEY = "ttl"; 084 085 public static final Double ONE = Double.valueOf(1); 086 087 protected String name; 088 089 protected MongoCollection<Document> coll; 090 091 @Override 092 public void initialize(KeyValueStoreDescriptor descriptor) { 093 name = descriptor.name; 094 Map<String, String> properties = descriptor.getProperties(); 095 // find which collection prefix to use 096 String collectionName = properties.get(COLLECTION_PROP); 097 if (StringUtils.isBlank(collectionName)) { 098 collectionName = COLLECTION_DEFAULT; 099 } 100 collectionName += "." + name; 101 // get a connection to MongoDB 102 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 103 MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID); 104 coll = database.getCollection(collectionName); 105 // make sure TTL works by creating the appropriate index 106 IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS); 107 coll.createIndex(new Document(TTL_KEY, ONE), indexOptions); 108 } 109 110 @Override 111 public Stream<String> keyStream() { 112 return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false) 113 .map(doc -> doc.getString(ID_KEY)); 114 } 115 116 @Override 117 public void close() { 118 if (coll != null) { 119 coll = null; 120 } 121 } 122 123 @Override 124 public void clear() { 125 if (log.isTraceEnabled()) { 126 log.trace("MongoDB: CLEAR"); 127 } 128 coll.deleteMany(new Document()); 129 } 130 131 // if possible, store the bytes as a UTF-8 string 132 protected static Object toStorage(byte[] bytes) { 133 try { 134 return bytesToString(bytes); 135 } catch (CharacterCodingException e) { 136 // could not decode as UTF-8, use a binary 137 return new Binary(bytes); 138 } 139 } 140 141 @Override 142 public byte[] get(String key) { 143 Object value = getObject(key); 144 if (value == null) { 145 return null; 146 } else if (value instanceof String) { 147 return ((String) value).getBytes(UTF_8); 148 } else if (value instanceof Binary) { 149 return ((Binary) value).getData(); 150 } 151 throw new UnsupportedOperationException(value.getClass().getName()); 152 } 153 154 @Override 155 public String getString(String key) { 156 Object value = getObject(key); 157 if (value == null) { 158 return null; 159 } else if (value instanceof String) { 160 return (String) value; 161 } else if (value instanceof Binary) { 162 byte[] bytes = ((Binary) value).getData(); 163 try { 164 return bytesToString(bytes); 165 } catch (CharacterCodingException e) { 166 // fall through to throw 167 } 168 } 169 throw new IllegalArgumentException("Value is not a String for key: " + key); 170 } 171 172 protected Object getObject(String key) { 173 Bson filter = eq(ID_KEY, key); 174 Document doc = coll.find(filter).first(); 175 if (doc == null) { 176 if (log.isTraceEnabled()) { 177 log.trace("MongoDB: GET " + key + " = null"); 178 } 179 return null; 180 } 181 Object value = doc.get(VALUE_KEY); 182 if (log.isTraceEnabled()) { 183 log.trace("MongoDB: GET " + key + " = " + value); 184 } 185 return value; 186 } 187 188 @Override 189 public Map<String, byte[]> get(Collection<String> keys) { 190 Map<String, byte[]> map = new HashMap<>(keys.size()); 191 Block<Document> block = doc -> { 192 String key = doc.getString(ID_KEY); 193 Object value = doc.get(VALUE_KEY); 194 if (value != null) { 195 byte[] bytesValue = null; 196 if (value instanceof String) { 197 bytesValue = ((String) value).getBytes(UTF_8); 198 } else if (value instanceof Binary) { 199 bytesValue = ((Binary) value).getData(); 200 } else { 201 throw new UnsupportedOperationException(String.format( 202 "Value of class %s is not supported for key: %s", value.getClass().getName(), key)); 203 } 204 map.put(key, bytesValue); 205 } 206 }; 207 findByKeys(keys, block); 208 return map; 209 } 210 211 @Override 212 public Map<String, String> getStrings(Collection<String> keys) { 213 Map<String, String> map = new HashMap<>(keys.size()); 214 Block<Document> block = doc -> { 215 String key = doc.getString(ID_KEY); 216 Object value = doc.get(VALUE_KEY); 217 if (value != null) { 218 String strValue = null; 219 if (value instanceof String) { 220 strValue = (String) value; 221 } else if (value instanceof Binary) { 222 byte[] bytes = ((Binary) value).getData(); 223 try { 224 strValue = bytesToString(bytes); 225 } catch (CharacterCodingException e) { 226 // fall through to throw 227 } 228 } 229 if (strValue == null) { 230 throw new IllegalArgumentException("Value is not a String for key: " + key); 231 } 232 map.put(key, strValue); 233 } 234 }; 235 findByKeys(keys, block); 236 return map; 237 } 238 239 /** 240 * @since 9.10 241 */ 242 protected void findByKeys(Collection<String> keys, Block<Document> block) { 243 coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block); 244 } 245 246 protected Date getDateFromTTL(long ttl) { 247 return new Date(System.currentTimeMillis() + ttl * 1000); 248 } 249 250 @Override 251 public void put(String key, byte[] bytes, long ttl) { 252 put(key, toStorage(bytes), ttl); 253 } 254 255 @Override 256 public void put(String key, String string) { 257 put(key, (Object) string, 0); 258 } 259 260 @Override 261 public void put(String key, String string, long ttl) { 262 put(key, (Object) string, ttl); 263 } 264 265 protected void put(String key, Object value, long ttl) { 266 Bson filter = eq(ID_KEY, key); 267 if (value == null) { 268 if (log.isTraceEnabled()) { 269 log.trace("MongoDB: DEL " + key); 270 } 271 coll.deleteOne(filter); 272 } else { 273 Document doc = new Document(VALUE_KEY, value); 274 addTTL(doc, ttl); 275 if (log.isTraceEnabled()) { 276 log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")")); 277 } 278 coll.replaceOne(filter, doc, new UpdateOptions().upsert(true)); 279 } 280 } 281 282 protected void addTTL(Document doc, long ttl) { 283 if (ttl != 0) { 284 doc.append(TTL_KEY, getDateFromTTL(ttl)); 285 } 286 } 287 288 @Override 289 public boolean setTTL(String key, long ttl) { 290 Bson filter = eq(ID_KEY, key); 291 Bson update; 292 if (ttl == 0) { 293 update = unset(TTL_KEY); 294 } else { 295 update = set(TTL_KEY, getDateFromTTL(ttl)); 296 } 297 if (log.isTraceEnabled()) { 298 log.trace("MongoDB: SETTTL " + key + " = " + ttl); 299 } 300 UpdateResult res = coll.updateOne(filter, update); 301 return res.getModifiedCount() == 1; 302 } 303 304 @Override 305 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 306 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 307 } 308 309 @Override 310 public boolean compareAndSet(String key, String expected, String value, long ttl) { 311 return compareAndSet(key, (Object) expected, (Object) value, ttl); 312 } 313 314 protected boolean compareAndSet(String key, Object expected, Object value, long ttl) { 315 Bson filter = eq(ID_KEY, key); 316 if (expected == null && value == null) { 317 // check that document doesn't exist 318 Document doc = coll.find(filter).first(); 319 boolean set = doc == null; 320 if (log.isTraceEnabled()) { 321 if (set) { 322 log.trace("MongoDB: TEST " + key + " = null ? NOP"); 323 } else { 324 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 325 } 326 } 327 return set; 328 } else if (expected == null) { 329 // set value if no document already exists: regular insert 330 Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value); 331 addTTL(doc, ttl); 332 boolean set; 333 try { 334 coll.insertOne(doc); 335 set = true; 336 } catch (MongoWriteException e) { 337 if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) { 338 throw e; 339 } 340 set = false; 341 } 342 if (log.isTraceEnabled()) { 343 if (set) { 344 log.trace("MongoDB: TEST " + key + " = null ? SET " + value); 345 } else { 346 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 347 } 348 } 349 return set; 350 } else if (value == null) { 351 // delete if previous value exists 352 filter = and(filter, eq(VALUE_KEY, expected)); 353 DeleteResult res = coll.deleteOne(filter); 354 boolean set = res.getDeletedCount() == 1; 355 if (log.isTraceEnabled()) { 356 if (set) { 357 log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL"); 358 } else { 359 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 360 } 361 } 362 return set; 363 } else { 364 // replace if previous value exists 365 filter = and(filter, eq(VALUE_KEY, expected)); 366 Document doc = new Document(VALUE_KEY, value); 367 addTTL(doc, ttl); 368 UpdateResult res = coll.replaceOne(filter, doc); 369 boolean set = res.getModifiedCount() == 1; 370 if (log.isTraceEnabled()) { 371 if (set) { 372 log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value); 373 } else { 374 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 375 } 376 } 377 return set; 378 } 379 } 380 381 @Override 382 public String toString() { 383 return getClass().getSimpleName() + "(" + name + ")"; 384 } 385 386}