001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.mongodb.kv; 020 021import static com.mongodb.client.model.Filters.and; 022import static com.mongodb.client.model.Filters.eq; 023import static com.mongodb.client.model.Filters.in; 024import static com.mongodb.client.model.Projections.include; 025import static com.mongodb.client.model.Updates.inc; 026import static com.mongodb.client.model.Updates.set; 027import static com.mongodb.client.model.Updates.unset; 028import static java.nio.charset.StandardCharsets.UTF_8; 029 030import java.nio.charset.CharacterCodingException; 031import java.util.Collection; 032import java.util.Date; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.concurrent.TimeUnit; 036import java.util.stream.Stream; 037import java.util.stream.StreamSupport; 038 039import com.mongodb.Block; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.bson.Document; 044import org.bson.conversions.Bson; 045import org.bson.types.Binary; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.runtime.api.Framework; 048import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 049import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 050import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 051 052import com.mongodb.ErrorCategory; 053import com.mongodb.MongoCommandException; 054import com.mongodb.MongoWriteException; 055import com.mongodb.client.MongoCollection; 056import com.mongodb.client.MongoDatabase; 057import com.mongodb.client.model.FindOneAndUpdateOptions; 058import com.mongodb.client.model.IndexOptions; 059import com.mongodb.client.model.ReturnDocument; 060import com.mongodb.client.model.UpdateOptions; 061import com.mongodb.client.result.DeleteResult; 062import com.mongodb.client.result.UpdateResult; 063 064/** 065 * MongoDB implementation of a Key/Value Store Provider. 066 * <p> 067 * The following configuration properties are available: 068 * <ul> 069 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name. 070 * </ul> 071 * 072 * @since 9.3 073 */ 074public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider { 075 076 private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class); 077 078 public static final String KEYVALUE_CONNECTION_ID = "keyvalue"; 079 080 public static final String COLLECTION_PROP = "collection"; 081 082 public static final String COLLECTION_DEFAULT = "kv"; 083 084 public static final String ID_KEY = "_id"; 085 086 public static final String VALUE_KEY = "v"; 087 088 public static final String TTL_KEY = "ttl"; 089 090 public static final Double ONE = Double.valueOf(1); 091 092 protected String name; 093 094 protected MongoCollection<Document> coll; 095 096 @Override 097 public void initialize(KeyValueStoreDescriptor descriptor) { 098 name = descriptor.name; 099 Map<String, String> properties = descriptor.getProperties(); 100 // find which collection prefix to use 101 String collectionName = properties.get(COLLECTION_PROP); 102 if (StringUtils.isBlank(collectionName)) { 103 collectionName = COLLECTION_DEFAULT; 104 } 105 collectionName += "." + name; 106 // get a connection to MongoDB 107 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 108 MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID); 109 coll = database.getCollection(collectionName); 110 // make sure TTL works by creating the appropriate index 111 IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS); 112 coll.createIndex(new Document(TTL_KEY, ONE), indexOptions); 113 } 114 115 @Override 116 public Stream<String> keyStream() { 117 return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false) 118 .map(doc -> doc.getString(ID_KEY)); 119 } 120 121 @Override 122 public void close() { 123 if (coll != null) { 124 coll = null; 125 } 126 } 127 128 @Override 129 public void clear() { 130 if (log.isTraceEnabled()) { 131 log.trace("MongoDB: CLEAR"); 132 } 133 coll.deleteMany(new Document()); 134 } 135 136 // if possible, store the bytes as a UTF-8 string 137 protected static Object toStorage(byte[] bytes) { 138 try { 139 return bytesToString(bytes); 140 } catch (CharacterCodingException e) { 141 // could not decode as UTF-8, use a binary 142 return new Binary(bytes); 143 } 144 } 145 146 protected byte[] toBytes(Object value) { 147 if (value instanceof String) { 148 return ((String) value).getBytes(UTF_8); 149 } else if (value instanceof Long) { 150 return ((Long) value).toString().getBytes(UTF_8); 151 } else if (value instanceof Binary) { 152 return ((Binary) value).getData(); 153 } 154 return null; 155 } 156 157 protected String toString(Object value) { 158 if (value instanceof String) { 159 return (String) value; 160 } else if (value instanceof Long) { 161 return ((Long) value).toString(); 162 } else if (value instanceof Binary) { 163 byte[] bytes = ((Binary) value).getData(); 164 try { 165 return bytesToString(bytes); 166 } catch (CharacterCodingException e) { 167 return null; 168 } 169 } 170 return null; 171 } 172 173 protected Long toLong(Object value) throws NumberFormatException { // NOSONAR 174 if (value instanceof Long) { 175 return (Long) value; 176 } else if (value instanceof String) { 177 return Long.valueOf((String) value); 178 } else if (value instanceof Binary) { 179 byte[] bytes = ((Binary) value).getData(); 180 return bytesToLong(bytes); 181 } 182 return null; 183 } 184 185 @Override 186 public byte[] get(String key) { 187 Object value = getObject(key); 188 if (value == null) { 189 return null; 190 } 191 byte[] bytes = toBytes(value); 192 if (bytes != null) { 193 return bytes; 194 } 195 throw new UnsupportedOperationException(value.getClass().getName()); 196 } 197 198 @Override 199 public String getString(String key) { 200 Object value = getObject(key); 201 if (value == null) { 202 return null; 203 } 204 String stringValue = toString(value); 205 if (stringValue != null) { 206 return stringValue; 207 } 208 throw new IllegalArgumentException("Value is not a String for key: " + key); 209 } 210 211 @Override 212 public Long getLong(String key) throws NumberFormatException { // NOSONAR 213 Object value = getObject(key); 214 if (value == null) { 215 return null; 216 } 217 Long longValue = toLong(value); 218 if (longValue != null) { 219 return longValue; 220 } 221 throw new NumberFormatException("Value is not a Long for key: " + key); 222 } 223 224 protected Object getObject(String key) { 225 Bson filter = eq(ID_KEY, key); 226 Document doc = coll.find(filter).first(); 227 if (doc == null) { 228 if (log.isTraceEnabled()) { 229 log.trace("MongoDB: GET " + key + " = null"); 230 } 231 return null; 232 } 233 Object value = doc.get(VALUE_KEY); 234 if (log.isTraceEnabled()) { 235 log.trace("MongoDB: GET " + key + " = " + value); 236 } 237 return value; 238 } 239 240 @Override 241 public Map<String, byte[]> get(Collection<String> keys) { 242 Map<String, byte[]> map = new HashMap<>(keys.size()); 243 Block<Document> block = doc -> { 244 String key = doc.getString(ID_KEY); 245 Object value = doc.get(VALUE_KEY); 246 if (value != null) { 247 byte[] bytes = toBytes(value); 248 if (bytes == null) { 249 throw new UnsupportedOperationException(String.format( 250 "Value of class %s is not supported for key: %s", value.getClass().getName(), key)); 251 } 252 map.put(key, bytes); 253 } 254 }; 255 findByKeys(keys, block); 256 return map; 257 } 258 259 @Override 260 public Map<String, String> getStrings(Collection<String> keys) { 261 Map<String, String> map = new HashMap<>(keys.size()); 262 Block<Document> block = doc -> { 263 String key = doc.getString(ID_KEY); 264 Object value = doc.get(VALUE_KEY); 265 if (value != null) { 266 String strValue = toString(value); 267 if (strValue == null) { 268 throw new IllegalArgumentException("Value is not a String for key: " + key); 269 } 270 map.put(key, strValue); 271 } 272 }; 273 findByKeys(keys, block); 274 return map; 275 } 276 277 @Override 278 public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR 279 Map<String, Long> map = new HashMap<>(keys.size()); 280 Block<Document> block = doc -> { 281 String key = doc.getString(ID_KEY); 282 Object value = doc.get(VALUE_KEY); 283 if (value != null) { 284 Long longValue = toLong(value); 285 if (longValue == null) { 286 throw new IllegalArgumentException("Value is not a Long for key: " + key); 287 } 288 map.put(key, longValue); 289 } 290 }; 291 findByKeys(keys, block); 292 return map; 293 } 294 295 /** 296 * @since 9.10 297 */ 298 protected void findByKeys(Collection<String> keys, Block<Document> block) { 299 coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block); 300 } 301 302 protected Date getDateFromTTL(long ttl) { 303 return new Date(System.currentTimeMillis() + ttl * 1000); 304 } 305 306 @Override 307 public void put(String key, byte[] bytes, long ttl) { 308 put(key, toStorage(bytes), ttl); 309 } 310 311 @Override 312 public void put(String key, String string) { 313 put(key, (Object) string, 0); 314 } 315 316 @Override 317 public void put(String key, String string, long ttl) { 318 put(key, (Object) string, ttl); 319 } 320 321 @Override 322 public void put(String key, Long value) { 323 put(key, (Object) value, 0); 324 } 325 326 @Override 327 public void put(String key, Long value, long ttl) { 328 put(key, (Object) value, ttl); 329 } 330 331 protected void put(String key, Object value, long ttl) { 332 Bson filter = eq(ID_KEY, key); 333 if (value == null) { 334 if (log.isTraceEnabled()) { 335 log.trace("MongoDB: DEL " + key); 336 } 337 coll.deleteOne(filter); 338 } else { 339 Document doc = new Document(VALUE_KEY, value); 340 addTTL(doc, ttl); 341 if (log.isTraceEnabled()) { 342 log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")")); 343 } 344 coll.replaceOne(filter, doc, new UpdateOptions().upsert(true)); 345 } 346 } 347 348 protected void addTTL(Document doc, long ttl) { 349 if (ttl != 0) { 350 doc.append(TTL_KEY, getDateFromTTL(ttl)); 351 } 352 } 353 354 @Override 355 public boolean setTTL(String key, long ttl) { 356 Bson filter = eq(ID_KEY, key); 357 Bson update; 358 if (ttl == 0) { 359 update = unset(TTL_KEY); 360 } else { 361 update = set(TTL_KEY, getDateFromTTL(ttl)); 362 } 363 if (log.isTraceEnabled()) { 364 log.trace("MongoDB: SETTTL " + key + " = " + ttl); 365 } 366 UpdateResult res = coll.updateOne(filter, update); 367 return res.getModifiedCount() == 1; 368 } 369 370 @Override 371 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 372 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 373 } 374 375 @Override 376 public boolean compareAndSet(String key, String expected, String value, long ttl) { 377 return compareAndSet(key, (Object) expected, (Object) value, ttl); 378 } 379 380 protected boolean compareAndSet(String key, Object expected, Object value, long ttl) { 381 Bson filter = eq(ID_KEY, key); 382 if (expected == null && value == null) { 383 // check that document doesn't exist 384 Document doc = coll.find(filter).first(); 385 boolean set = doc == null; 386 if (log.isTraceEnabled()) { 387 if (set) { 388 log.trace("MongoDB: TEST " + key + " = null ? NOP"); 389 } else { 390 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 391 } 392 } 393 return set; 394 } else if (expected == null) { 395 // set value if no document already exists: regular insert 396 Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value); 397 addTTL(doc, ttl); 398 boolean set; 399 try { 400 coll.insertOne(doc); 401 set = true; 402 } catch (MongoWriteException e) { 403 if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) { 404 throw e; 405 } 406 set = false; 407 } 408 if (log.isTraceEnabled()) { 409 if (set) { 410 log.trace("MongoDB: TEST " + key + " = null ? SET " + value); 411 } else { 412 log.trace("MongoDB: TEST " + key + " = null ? FAILED"); 413 } 414 } 415 return set; 416 } else if (value == null) { 417 // delete if previous value exists 418 filter = and(filter, eq(VALUE_KEY, expected)); 419 DeleteResult res = coll.deleteOne(filter); 420 boolean set = res.getDeletedCount() == 1; 421 if (log.isTraceEnabled()) { 422 if (set) { 423 log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL"); 424 } else { 425 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 426 } 427 } 428 return set; 429 } else { 430 // replace if previous value exists 431 filter = and(filter, eq(VALUE_KEY, expected)); 432 Document doc = new Document(VALUE_KEY, value); 433 addTTL(doc, ttl); 434 UpdateResult res = coll.replaceOne(filter, doc); 435 boolean set = res.getModifiedCount() == 1; 436 if (log.isTraceEnabled()) { 437 if (set) { 438 log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value); 439 } else { 440 log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED"); 441 } 442 } 443 return set; 444 } 445 } 446 447 @Override 448 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 449 Bson filter = eq(ID_KEY, key); 450 Bson update = inc(VALUE_KEY, Long.valueOf(delta)); 451 Document result; 452 try { 453 result = coll.findOneAndUpdate(filter, update, 454 new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)); 455 } catch (MongoCommandException e) { 456 // Cannot apply $inc to a value of non-numeric type; code: 16837 457 if (!e.getMessage().contains("Cannot apply $inc")) { 458 throw new NuxeoException(e); 459 } 460 // for compatibility with other backends that don't have datatypes, 461 // try to interpret the value as the string representation of an integer 462 // (this keeps the underlying format as a String though) 463 return addAndGetGeneric(key, delta); 464 } 465 if (result == null) { 466 throw new NuxeoException("Unexpected null result, upsert failed for key: " + key); 467 } 468 return ((Long) result.get(VALUE_KEY)).longValue(); 469 } 470 471 // works on any representation that can be converted to a Long 472 protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR 473 for (;;) { 474 Object value = getObject(key); 475 long result; 476 if (value == null) { 477 result = delta; 478 } else { 479 Long base = toLong(value); 480 if (base == null) { 481 throw new NumberFormatException("Value is not a Long for key: " + key); 482 } 483 result = base.longValue() + delta; 484 } 485 Object newValue = Long.valueOf(result); 486 if (compareAndSet(key, value, newValue, 0)) { 487 return result; 488 } 489 // else loop to try again 490 } 491 } 492 493 @Override 494 public String toString() { 495 return getClass().getSimpleName() + "(" + name + ")"; 496 } 497 498}