001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.mongodb.kv; 020 021import static com.mongodb.client.model.Filters.and; 022import static com.mongodb.client.model.Filters.eq; 023import static com.mongodb.client.model.Filters.in; 024import static com.mongodb.client.model.Projections.include; 025import static com.mongodb.client.model.Updates.inc; 026import static com.mongodb.client.model.Updates.set; 027import static com.mongodb.client.model.Updates.unset; 028import static java.nio.charset.StandardCharsets.UTF_8; 029 030import java.nio.charset.CharacterCodingException; 031import java.util.Collection; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Consumer; 037import java.util.regex.Pattern; 038import java.util.stream.Stream; 039import java.util.stream.StreamSupport; 040 041import org.apache.commons.lang3.StringUtils; 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.bson.Document; 045import org.bson.conversions.Bson; 046import org.bson.types.Binary; 047import org.nuxeo.ecm.core.api.NuxeoException; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 050import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 051import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 052 053import com.mongodb.ErrorCategory; 054import com.mongodb.MongoCommandException; 055import com.mongodb.MongoWriteException; 056import com.mongodb.client.MongoCollection; 057import com.mongodb.client.MongoDatabase; 058import com.mongodb.client.model.FindOneAndUpdateOptions; 059import com.mongodb.client.model.IndexOptions; 060import com.mongodb.client.model.ReplaceOptions; 061import com.mongodb.client.model.ReturnDocument; 062import com.mongodb.client.result.DeleteResult; 063import com.mongodb.client.result.UpdateResult; 064 065/** 066 * MongoDB implementation of a Key/Value Store Provider. 067 * <p> 068 * The following configuration properties are available: 069 * <ul> 070 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name. 071 * </ul> 072 * 073 * @since 9.3 074 */ 075public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider { 076 077 private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class); 078 079 public static final String KEYVALUE_CONNECTION_ID = "keyvalue"; 080 081 public static final String COLLECTION_PROP = "collection"; 082 083 public static final String COLLECTION_DEFAULT = "kv"; 084 085 public static final String ID_KEY = "_id"; 086 087 public static final String VALUE_KEY = "v"; 088 089 public static final String TTL_KEY = "ttl"; 090 091 public static final Double ONE = Double.valueOf(1); 092 093 protected MongoCollection<Document> coll; 094 095 @Override 096 public void initialize(KeyValueStoreDescriptor descriptor) { 097 super.initialize(descriptor); 098 Map<String, String> properties = descriptor.properties; 099 // find which collection prefix to use 100 String collectionName = properties.get(COLLECTION_PROP); 101 if (StringUtils.isBlank(collectionName)) { 102 collectionName = COLLECTION_DEFAULT; 103 } 104 collectionName += "." + name; 105 // get a connection to MongoDB 106 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 107 MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID); 108 coll = database.getCollection(collectionName); 109 // make sure TTL works by creating the appropriate index 110 IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS); 111 coll.createIndex(new Document(TTL_KEY, ONE), indexOptions); 112 } 113 114 @Override 115 public Stream<String> keyStream() { 116 return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false) 117 .map(doc -> doc.getString(ID_KEY)); 118 } 119 120 @Override 121 public Stream<String> keyStream(String prefix) { 122 Document filter = new Document(ID_KEY, Pattern.compile('^' + Pattern.quote(prefix))); 123 return StreamSupport.stream(coll.find(filter).projection(include(ID_KEY)).spliterator(), false) 124 .map(doc -> doc.getString(ID_KEY)); 125 } 126 127 @Override 128 public void close() { 129 if (coll != null) { 130 coll = null; 131 } 132 } 133 134 @Override 135 public void clear() { 136 if (log.isTraceEnabled()) { 137 log.trace("MongoDB: CLEAR"); 138 } 139 coll.deleteMany(new Document()); 140 } 141 142 // if possible, store the bytes as a UTF-8 string 143 protected static Object toStorage(byte[] bytes) { 144 try { 145 return bytesToString(bytes); 146 } catch (CharacterCodingException e) { 147 // could not decode as UTF-8, use a binary 148 return new Binary(bytes); 149 } 150 } 151 152 protected byte[] toBytes(Object value) { 153 if (value instanceof String) { 154 return ((String) value).getBytes(UTF_8); 155 } else if (value instanceof Long) { 156 return ((Long) value).toString().getBytes(UTF_8); 157 } else if (value instanceof Binary) { 158 return ((Binary) value).getData(); 159 } 160 return null; 161 } 162 163 protected String toString(Object value) { 164 if (value instanceof String) { 165 return (String) value; 166 } else if (value instanceof Long) { 167 return ((Long) value).toString(); 168 } else if (value instanceof Binary) { 169 byte[] bytes = ((Binary) value).getData(); 170 try { 171 return bytesToString(bytes); 172 } catch (CharacterCodingException e) { 173 return null; 174 } 175 } 176 return null; 177 } 178 179 protected Long toLong(Object value) throws NumberFormatException { // NOSONAR 180 if (value instanceof Long) { 181 return (Long) value; 182 } else if (value instanceof String) { 183 return Long.valueOf((String) value); 184 } else if (value instanceof Binary) { 185 byte[] bytes = ((Binary) value).getData(); 186 return bytesToLong(bytes); 187 } 188 return null; 189 } 190 191 @Override 192 public byte[] get(String key) { 193 Object value = getObject(key); 194 if (value == null) { 195 return null; 196 } 197 byte[] bytes = toBytes(value); 198 if (bytes != null) { 199 return bytes; 200 } 201 throw new UnsupportedOperationException(value.getClass().getName()); 202 } 203 204 @Override 205 public String getString(String key) { 206 Object value = getObject(key); 207 if (value == null) { 208 return null; 209 } 210 String stringValue = toString(value); 211 if (stringValue != null) { 212 return stringValue; 213 } 214 throw new IllegalArgumentException("Value is not a String for key: " + key); 215 } 216 217 @Override 218 public Long getLong(String key) throws NumberFormatException { // NOSONAR 219 Object value = getObject(key); 220 if (value == null) { 221 return null; 222 } 223 Long longValue = toLong(value); 224 if (longValue != null) { 225 return longValue; 226 } 227 throw new NumberFormatException("Value is not a Long for key: " + key); 228 } 229 230 protected Object getObject(String key) { 231 Bson filter = eq(ID_KEY, key); 232 Document doc = coll.find(filter).first(); 233 if (doc == null) { 234 if (log.isTraceEnabled()) { 235 log.trace("MongoDB: GET " + key + " = null"); 236 } 237 return null; 238 } 239 Object value = doc.get(VALUE_KEY); 240 if (log.isTraceEnabled()) { 241 log.trace("MongoDB: GET " + key + " = " + value); 242 } 243 return value; 244 } 245 246 @Override 247 public Map<String, byte[]> get(Collection<String> keys) { 248 Map<String, byte[]> map = new HashMap<>(keys.size()); 249 Consumer<Document> block = doc -> { 250 String key = doc.getString(ID_KEY); 251 Object value = doc.get(VALUE_KEY); 252 if (value != null) { 253 byte[] bytes = toBytes(value); 254 if (bytes == null) { 255 throw new UnsupportedOperationException(String.format( 256 "Value of class %s is not supported for key: %s", value.getClass().getName(), key)); 257 } 258 map.put(key, bytes); 259 } 260 }; 261 findByKeys(keys, block); 262 return map; 263 } 264 265 @Override 266 public Map<String, String> getStrings(Collection<String> keys) { 267 Map<String, String> map = new HashMap<>(keys.size()); 268 Consumer<Document> block = doc -> { 269 String key = doc.getString(ID_KEY); 270 Object value = doc.get(VALUE_KEY); 271 if (value != null) { 272 String strValue = toString(value); 273 if (strValue == null) { 274 throw new IllegalArgumentException("Value is not a String for key: " + key); 275 } 276 map.put(key, strValue); 277 } 278 }; 279 findByKeys(keys, block); 280 return map; 281 } 282 283 @Override 284 public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR 285 Map<String, Long> map = new HashMap<>(keys.size()); 286 Consumer<Document> block = doc -> { 287 String key = doc.getString(ID_KEY); 288 Object value = doc.get(VALUE_KEY); 289 if (value != null) { 290 Long longValue = toLong(value); 291 if (longValue == null) { 292 throw new IllegalArgumentException("Value is not a Long for key: " + key); 293 } 294 map.put(key, longValue); 295 } 296 }; 297 findByKeys(keys, block); 298 return map; 299 } 300 301 /** 302 * @since 9.10 303 */ 304 protected void findByKeys(Collection<String> keys, Consumer<Document> block) { 305 coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block); 306 } 307 308 protected Date getDateFromTTL(long ttl) { 309 return new Date(System.currentTimeMillis() + ttl * 1000); 310 } 311 312 @Override 313 public void put(String key, byte[] bytes, long ttl) { 314 put(key, toStorage(bytes), ttl); 315 } 316 317 @Override 318 public void put(String key, String string) { 319 put(key, (Object) string, 0); 320 } 321 322 @Override 323 public void put(String key, String string, long ttl) { 324 put(key, (Object) string, ttl); 325 } 326 327 @Override 328 public void put(String key, Long value) { 329 put(key, (Object) value, 0); 330 } 331 332 @Override 333 public void put(String key, Long value, long ttl) { 334 put(key, (Object) value, ttl); 335 } 336 337 protected void put(String key, Object value, long ttl) { 338 Bson filter = eq(ID_KEY, key); 339 if (value == null) { 340 if (log.isTraceEnabled()) { 341 log.trace("MongoDB: DEL " + key); 342 } 343 coll.deleteOne(filter); 344 } else { 345 Document doc = new Document(VALUE_KEY, value); 346 addTTL(doc, ttl); 347 if (log.isTraceEnabled()) { 348 log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")")); 349 } 350 try { 351 coll.replaceOne(filter, doc, new ReplaceOptions().upsert(true)); 352 } catch (MongoWriteException e) { 353 if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) { 354 throw e; 355 } 356 // retry once, as not all server versions do server-side retries on upsert 357 coll.replaceOne(filter, doc, new ReplaceOptions().upsert(true)); 358 } 359 } 360 } 361 362 protected void addTTL(Document doc, long ttl) { 363 if (ttl != 0) { 364 doc.append(TTL_KEY, getDateFromTTL(ttl)); 365 } 366 } 367 368 @Override 369 public boolean setTTL(String key, long ttl) { 370 Bson filter = eq(ID_KEY, key); 371 Bson update; 372 if (ttl == 0) { 373 update = unset(TTL_KEY); 374 } else { 375 update = set(TTL_KEY, getDateFromTTL(ttl)); 376 } 377 if (log.isTraceEnabled()) { 378 log.trace("MongoDB: SETTTL " + key + " = " + ttl); 379 } 380 UpdateResult res = coll.updateOne(filter, update); 381 return res.getModifiedCount() == 1; 382 } 383 384 @Override 385 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 386 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 387 } 388 389 @Override 390 public boolean compareAndSet(String key, String expected, String value, long ttl) { 391 return compareAndSet(key, (Object) expected, (Object) value, ttl); 392 } 393 394 protected boolean compareAndSet(String key, Object expected, Object value, long ttl) { 395 Bson filter = eq(ID_KEY, key); 396 if (expected == null && value == null) { 397 // check that document doesn't exist 398 Document doc = coll.find(filter).first(); 399 boolean set = doc == null; 400 if (log.isTraceEnabled()) { 401 if (set) { 402 log.trace("MongoDB: TEST " + key + " = null ? NOP"); 403 } else { 404 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 405 } 406 } 407 return set; 408 } else if (expected == null) { 409 // set value if no document already exists: regular insert 410 Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value); 411 addTTL(doc, ttl); 412 boolean set; 413 try { 414 coll.insertOne(doc); 415 set = true; 416 } catch (MongoWriteException e) { 417 if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) { 418 throw e; 419 } 420 set = false; 421 } 422 if (log.isTraceEnabled()) { 423 if (set) { 424 log.trace("MongoDB: TEST " + key + " = null ? SET " + value); 425 } else { 426 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 427 } 428 } 429 return set; 430 } else if (value == null) { 431 // delete if previous value exists 432 filter = and(filter, eq(VALUE_KEY, expected)); 433 DeleteResult res = coll.deleteOne(filter); 434 boolean set = res.getDeletedCount() == 1; 435 if (log.isTraceEnabled()) { 436 if (set) { 437 log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL"); 438 } else { 439 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 440 } 441 } 442 return set; 443 } else { 444 // replace if previous value exists 445 filter = and(filter, eq(VALUE_KEY, expected)); 446 Document doc = new Document(VALUE_KEY, value); 447 addTTL(doc, ttl); 448 UpdateResult res = coll.replaceOne(filter, doc); 449 boolean set = res.getModifiedCount() == 1; 450 if (log.isTraceEnabled()) { 451 if (set) { 452 log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value); 453 } else { 454 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 455 } 456 } 457 return set; 458 } 459 } 460 461 @Override 462 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 463 Bson filter = eq(ID_KEY, key); 464 Bson update = inc(VALUE_KEY, Long.valueOf(delta)); 465 Document result; 466 try { 467 result = coll.findOneAndUpdate(filter, update, 468 new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)); 469 } catch (MongoCommandException e) { 470 // Cannot apply $inc to a value of non-numeric type; code: 16837 471 if (!e.getMessage().contains("Cannot apply $inc")) { 472 throw new NuxeoException(e); 473 } 474 // for compatibility with other backends that don't have datatypes, 475 // try to interpret the value as the string representation of an integer 476 // (this keeps the underlying format as a String though) 477 return addAndGetGeneric(key, delta); 478 } 479 if (result == null) { 480 throw new NuxeoException("Unexpected null result, upsert failed for key: " + key); 481 } 482 return ((Long) result.get(VALUE_KEY)).longValue(); 483 } 484 485 // works on any representation that can be converted to a Long 486 protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR 487 for (;;) { 488 Object value = getObject(key); 489 long result; 490 if (value == null) { 491 result = delta; 492 } else { 493 Long base = toLong(value); 494 if (base == null) { 495 throw new NumberFormatException("Value is not a Long for key: " + key); 496 } 497 result = base.longValue() + delta; 498 } 499 Object newValue = Long.valueOf(result); 500 if (compareAndSet(key, value, newValue, 0)) { 501 return result; 502 } 503 // else loop to try again 504 } 505 } 506 507}