001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.mongodb.kv; 020 021import static com.mongodb.client.model.Filters.and; 022import static com.mongodb.client.model.Filters.eq; 023import static com.mongodb.client.model.Filters.in; 024import static com.mongodb.client.model.Projections.include; 025import static com.mongodb.client.model.Updates.inc; 026import static com.mongodb.client.model.Updates.set; 027import static com.mongodb.client.model.Updates.unset; 028import static java.nio.charset.StandardCharsets.UTF_8; 029 030import java.nio.charset.CharacterCodingException; 031import java.util.Collection; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.concurrent.TimeUnit; 036import java.util.regex.Pattern; 037import java.util.stream.Stream; 038import java.util.stream.StreamSupport; 039 040import org.apache.commons.lang3.StringUtils; 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.bson.Document; 044import org.bson.conversions.Bson; 045import org.bson.types.Binary; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.runtime.api.Framework; 048import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 049import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 050import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 051 052import com.mongodb.Block; 053import com.mongodb.ErrorCategory; 054import com.mongodb.MongoCommandException; 055import com.mongodb.MongoWriteException; 056import com.mongodb.client.MongoCollection; 057import com.mongodb.client.MongoDatabase; 058import com.mongodb.client.model.FindOneAndUpdateOptions; 059import com.mongodb.client.model.IndexOptions; 060import com.mongodb.client.model.ReturnDocument; 061import com.mongodb.client.model.UpdateOptions; 062import com.mongodb.client.result.DeleteResult; 063import com.mongodb.client.result.UpdateResult; 064 065/** 066 * MongoDB implementation of a Key/Value Store Provider. 067 * <p> 068 * The following configuration properties are available: 069 * <ul> 070 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name. 071 * </ul> 072 * 073 * @since 9.3 074 */ 075public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider { 076 077 private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class); 078 079 public static final String KEYVALUE_CONNECTION_ID = "keyvalue"; 080 081 public static final String COLLECTION_PROP = "collection"; 082 083 public static final String COLLECTION_DEFAULT = "kv"; 084 085 public static final String ID_KEY = "_id"; 086 087 public static final String VALUE_KEY = "v"; 088 089 public static final String TTL_KEY = "ttl"; 090 091 public static final Double ONE = Double.valueOf(1); 092 093 protected MongoCollection<Document> coll; 094 095 @Override 096 public void initialize(KeyValueStoreDescriptor descriptor) { 097 super.initialize(descriptor); 098 Map<String, String> properties = descriptor.properties; 099 // find which collection prefix to use 100 String collectionName = properties.get(COLLECTION_PROP); 101 if (StringUtils.isBlank(collectionName)) { 102 collectionName = COLLECTION_DEFAULT; 103 } 104 collectionName += "." + name; 105 // get a connection to MongoDB 106 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 107 MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID); 108 coll = database.getCollection(collectionName); 109 // make sure TTL works by creating the appropriate index 110 IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS); 111 coll.createIndex(new Document(TTL_KEY, ONE), indexOptions); 112 } 113 114 @Override 115 public Stream<String> keyStream() { 116 return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false) 117 .map(doc -> doc.getString(ID_KEY)); 118 } 119 120 @Override 121 public Stream<String> keyStream(String prefix) { 122 Document filter = new Document(ID_KEY, Pattern.compile('^' + Pattern.quote(prefix))); 123 return StreamSupport.stream(coll.find(filter).projection(include(ID_KEY)).spliterator(), false) 124 .map(doc -> doc.getString(ID_KEY)); 125 } 126 127 @Override 128 public void close() { 129 if (coll != null) { 130 coll = null; 131 } 132 } 133 134 @Override 135 public void clear() { 136 if (log.isTraceEnabled()) { 137 log.trace("MongoDB: CLEAR"); 138 } 139 coll.deleteMany(new Document()); 140 } 141 142 // if possible, store the bytes as a UTF-8 string 143 protected static Object toStorage(byte[] bytes) { 144 try { 145 return bytesToString(bytes); 146 } catch (CharacterCodingException e) { 147 // could not decode as UTF-8, use a binary 148 return new Binary(bytes); 149 } 150 } 151 152 protected byte[] toBytes(Object value) { 153 if (value instanceof String) { 154 return ((String) value).getBytes(UTF_8); 155 } else if (value instanceof Long) { 156 return ((Long) value).toString().getBytes(UTF_8); 157 } else if (value instanceof Binary) { 158 return ((Binary) value).getData(); 159 } 160 return null; 161 } 162 163 protected String toString(Object value) { 164 if (value instanceof String) { 165 return (String) value; 166 } else if (value instanceof Long) { 167 return ((Long) value).toString(); 168 } else if (value instanceof Binary) { 169 byte[] bytes = ((Binary) value).getData(); 170 try { 171 return bytesToString(bytes); 172 } catch (CharacterCodingException e) { 173 return null; 174 } 175 } 176 return null; 177 } 178 179 protected Long toLong(Object value) throws NumberFormatException { // NOSONAR 180 if (value instanceof Long) { 181 return (Long) value; 182 } else if (value instanceof String) { 183 return Long.valueOf((String) value); 184 } else if (value instanceof Binary) { 185 byte[] bytes = ((Binary) value).getData(); 186 return bytesToLong(bytes); 187 } 188 return null; 189 } 190 191 @Override 192 public byte[] get(String key) { 193 Object value = getObject(key); 194 if (value == null) { 195 return null; 196 } 197 byte[] bytes = toBytes(value); 198 if (bytes != null) { 199 return bytes; 200 } 201 throw new UnsupportedOperationException(value.getClass().getName()); 202 } 203 204 @Override 205 public String getString(String key) { 206 Object value = getObject(key); 207 if (value == null) { 208 return null; 209 } 210 String stringValue = toString(value); 211 if (stringValue != null) { 212 return stringValue; 213 } 214 throw new IllegalArgumentException("Value is not a String for key: " + key); 215 } 216 217 @Override 218 public Long getLong(String key) throws NumberFormatException { // NOSONAR 219 Object value = getObject(key); 220 if (value == null) { 221 return null; 222 } 223 Long longValue = toLong(value); 224 if (longValue != null) { 225 return longValue; 226 } 227 throw new NumberFormatException("Value is not a Long for key: " + key); 228 } 229 230 protected Object getObject(String key) { 231 Bson filter = eq(ID_KEY, key); 232 Document doc = coll.find(filter).first(); 233 if (doc == null) { 234 if (log.isTraceEnabled()) { 235 log.trace("MongoDB: GET " + key + " = null"); 236 } 237 return null; 238 } 239 Object value = doc.get(VALUE_KEY); 240 if (log.isTraceEnabled()) { 241 log.trace("MongoDB: GET " + key + " = " + value); 242 } 243 return value; 244 } 245 246 @Override 247 public Map<String, byte[]> get(Collection<String> keys) { 248 Map<String, byte[]> map = new HashMap<>(keys.size()); 249 Block<Document> block = doc -> { 250 String key = doc.getString(ID_KEY); 251 Object value = doc.get(VALUE_KEY); 252 if (value != null) { 253 byte[] bytes = toBytes(value); 254 if (bytes == null) { 255 throw new UnsupportedOperationException(String.format( 256 "Value of class %s is not supported for key: %s", value.getClass().getName(), key)); 257 } 258 map.put(key, bytes); 259 } 260 }; 261 findByKeys(keys, block); 262 return map; 263 } 264 265 @Override 266 public Map<String, String> getStrings(Collection<String> keys) { 267 Map<String, String> map = new HashMap<>(keys.size()); 268 Block<Document> block = doc -> { 269 String key = doc.getString(ID_KEY); 270 Object value = doc.get(VALUE_KEY); 271 if (value != null) { 272 String strValue = toString(value); 273 if (strValue == null) { 274 throw new IllegalArgumentException("Value is not a String for key: " + key); 275 } 276 map.put(key, strValue); 277 } 278 }; 279 findByKeys(keys, block); 280 return map; 281 } 282 283 @Override 284 public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR 285 Map<String, Long> map = new HashMap<>(keys.size()); 286 Block<Document> block = doc -> { 287 String key = doc.getString(ID_KEY); 288 Object value = doc.get(VALUE_KEY); 289 if (value != null) { 290 Long longValue = toLong(value); 291 if (longValue == null) { 292 throw new IllegalArgumentException("Value is not a Long for key: " + key); 293 } 294 map.put(key, longValue); 295 } 296 }; 297 findByKeys(keys, block); 298 return map; 299 } 300 301 /** 302 * @since 9.10 303 */ 304 protected void findByKeys(Collection<String> keys, Block<Document> block) { 305 coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block); 306 } 307 308 protected Date getDateFromTTL(long ttl) { 309 return new Date(System.currentTimeMillis() + ttl * 1000); 310 } 311 312 @Override 313 public void put(String key, byte[] bytes, long ttl) { 314 put(key, toStorage(bytes), ttl); 315 } 316 317 @Override 318 public void put(String key, String string) { 319 put(key, (Object) string, 0); 320 } 321 322 @Override 323 public void put(String key, String string, long ttl) { 324 put(key, (Object) string, ttl); 325 } 326 327 @Override 328 public void put(String key, Long value) { 329 put(key, (Object) value, 0); 330 } 331 332 @Override 333 public void put(String key, Long value, long ttl) { 334 put(key, (Object) value, ttl); 335 } 336 337 protected void put(String key, Object value, long ttl) { 338 Bson filter = eq(ID_KEY, key); 339 if (value == null) { 340 if (log.isTraceEnabled()) { 341 log.trace("MongoDB: DEL " + key); 342 } 343 coll.deleteOne(filter); 344 } else { 345 Document doc = new Document(VALUE_KEY, value); 346 addTTL(doc, ttl); 347 if (log.isTraceEnabled()) { 348 log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")")); 349 } 350 coll.replaceOne(filter, doc, new UpdateOptions().upsert(true)); 351 } 352 } 353 354 protected void addTTL(Document doc, long ttl) { 355 if (ttl != 0) { 356 doc.append(TTL_KEY, getDateFromTTL(ttl)); 357 } 358 } 359 360 @Override 361 public boolean setTTL(String key, long ttl) { 362 Bson filter = eq(ID_KEY, key); 363 Bson update; 364 if (ttl == 0) { 365 update = unset(TTL_KEY); 366 } else { 367 update = set(TTL_KEY, getDateFromTTL(ttl)); 368 } 369 if (log.isTraceEnabled()) { 370 log.trace("MongoDB: SETTTL " + key + " = " + ttl); 371 } 372 UpdateResult res = coll.updateOne(filter, update); 373 return res.getModifiedCount() == 1; 374 } 375 376 @Override 377 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 378 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 379 } 380 381 @Override 382 public boolean compareAndSet(String key, String expected, String value, long ttl) { 383 return compareAndSet(key, (Object) expected, (Object) value, ttl); 384 } 385 386 protected boolean compareAndSet(String key, Object expected, Object value, long ttl) { 387 Bson filter = eq(ID_KEY, key); 388 if (expected == null && value == null) { 389 // check that document doesn't exist 390 Document doc = coll.find(filter).first(); 391 boolean set = doc == null; 392 if (log.isTraceEnabled()) { 393 if (set) { 394 log.trace("MongoDB: TEST " + key + " = null ? NOP"); 395 } else { 396 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 397 } 398 } 399 return set; 400 } else if (expected == null) { 401 // set value if no document already exists: regular insert 402 Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value); 403 addTTL(doc, ttl); 404 boolean set; 405 try { 406 coll.insertOne(doc); 407 set = true; 408 } catch (MongoWriteException e) { 409 if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) { 410 throw e; 411 } 412 set = false; 413 } 414 if (log.isTraceEnabled()) { 415 if (set) { 416 log.trace("MongoDB: TEST " + key + " = null ? SET " + value); 417 } else { 418 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 419 } 420 } 421 return set; 422 } else if (value == null) { 423 // delete if previous value exists 424 filter = and(filter, eq(VALUE_KEY, expected)); 425 DeleteResult res = coll.deleteOne(filter); 426 boolean set = res.getDeletedCount() == 1; 427 if (log.isTraceEnabled()) { 428 if (set) { 429 log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL"); 430 } else { 431 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 432 } 433 } 434 return set; 435 } else { 436 // replace if previous value exists 437 filter = and(filter, eq(VALUE_KEY, expected)); 438 Document doc = new Document(VALUE_KEY, value); 439 addTTL(doc, ttl); 440 UpdateResult res = coll.replaceOne(filter, doc); 441 boolean set = res.getModifiedCount() == 1; 442 if (log.isTraceEnabled()) { 443 if (set) { 444 log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value); 445 } else { 446 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 447 } 448 } 449 return set; 450 } 451 } 452 453 @Override 454 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 455 Bson filter = eq(ID_KEY, key); 456 Bson update = inc(VALUE_KEY, Long.valueOf(delta)); 457 Document result; 458 try { 459 result = coll.findOneAndUpdate(filter, update, 460 new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)); 461 } catch (MongoCommandException e) { 462 // Cannot apply $inc to a value of non-numeric type; code: 16837 463 if (!e.getMessage().contains("Cannot apply $inc")) { 464 throw new NuxeoException(e); 465 } 466 // for compatibility with other backends that don't have datatypes, 467 // try to interpret the value as the string representation of an integer 468 // (this keeps the underlying format as a String though) 469 return addAndGetGeneric(key, delta); 470 } 471 if (result == null) { 472 throw new NuxeoException("Unexpected null result, upsert failed for key: " + key); 473 } 474 return ((Long) result.get(VALUE_KEY)).longValue(); 475 } 476 477 // works on any representation that can be converted to a Long 478 protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR 479 for (;;) { 480 Object value = getObject(key); 481 long result; 482 if (value == null) { 483 result = delta; 484 } else { 485 Long base = toLong(value); 486 if (base == null) { 487 throw new NumberFormatException("Value is not a Long for key: " + key); 488 } 489 result = base.longValue() + delta; 490 } 491 Object newValue = Long.valueOf(result); 492 if (compareAndSet(key, value, newValue, 0)) { 493 return result; 494 } 495 // else loop to try again 496 } 497 } 498 499}