001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.mongodb.kv;
020
021import static com.mongodb.client.model.Filters.and;
022import static com.mongodb.client.model.Filters.eq;
023import static com.mongodb.client.model.Filters.in;
024import static com.mongodb.client.model.Projections.include;
025import static com.mongodb.client.model.Updates.inc;
026import static com.mongodb.client.model.Updates.set;
027import static com.mongodb.client.model.Updates.unset;
028import static java.nio.charset.StandardCharsets.UTF_8;
029
030import java.nio.charset.CharacterCodingException;
031import java.util.Collection;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.TimeUnit;
036import java.util.function.Consumer;
037import java.util.regex.Pattern;
038import java.util.stream.Stream;
039import java.util.stream.StreamSupport;
040
041import org.apache.commons.lang3.StringUtils;
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.bson.Document;
045import org.bson.conversions.Bson;
046import org.bson.types.Binary;
047import org.nuxeo.ecm.core.api.NuxeoException;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
050import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
051import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
052
053import com.mongodb.ErrorCategory;
054import com.mongodb.MongoCommandException;
055import com.mongodb.MongoWriteException;
056import com.mongodb.client.MongoCollection;
057import com.mongodb.client.MongoDatabase;
058import com.mongodb.client.model.FindOneAndUpdateOptions;
059import com.mongodb.client.model.IndexOptions;
060import com.mongodb.client.model.ReplaceOptions;
061import com.mongodb.client.model.ReturnDocument;
062import com.mongodb.client.result.DeleteResult;
063import com.mongodb.client.result.UpdateResult;
064
065/**
066 * MongoDB implementation of a Key/Value Store Provider.
067 * <p>
068 * The following configuration properties are available:
069 * <ul>
070 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name.
071 * </ul>
072 *
073 * @since 9.3
074 */
075public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider {
076
077    private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class);
078
079    public static final String KEYVALUE_CONNECTION_ID = "keyvalue";
080
081    public static final String COLLECTION_PROP = "collection";
082
083    public static final String COLLECTION_DEFAULT = "kv";
084
085    public static final String ID_KEY = "_id";
086
087    public static final String VALUE_KEY = "v";
088
089    public static final String TTL_KEY = "ttl";
090
091    public static final Double ONE = Double.valueOf(1);
092
093    protected MongoCollection<Document> coll;
094
095    @Override
096    public void initialize(KeyValueStoreDescriptor descriptor) {
097        super.initialize(descriptor);
098        Map<String, String> properties = descriptor.properties;
099        // find which collection prefix to use
100        String collectionName = properties.get(COLLECTION_PROP);
101        if (StringUtils.isBlank(collectionName)) {
102            collectionName = COLLECTION_DEFAULT;
103        }
104        collectionName += "." + name;
105        // get a connection to MongoDB
106        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
107        MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID);
108        coll = database.getCollection(collectionName);
109        // make sure TTL works by creating the appropriate index
110        IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS);
111        coll.createIndex(new Document(TTL_KEY, ONE), indexOptions);
112    }
113
114    @Override
115    public Stream<String> keyStream() {
116        return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false)
117                            .map(doc -> doc.getString(ID_KEY));
118    }
119
120    @Override
121    public Stream<String> keyStream(String prefix) {
122        Document filter = new Document(ID_KEY, Pattern.compile('^' + Pattern.quote(prefix)));
123        return StreamSupport.stream(coll.find(filter).projection(include(ID_KEY)).spliterator(), false)
124                            .map(doc -> doc.getString(ID_KEY));
125    }
126
127    @Override
128    public void close() {
129        if (coll != null) {
130            coll = null;
131        }
132    }
133
134    @Override
135    public void clear() {
136        if (log.isTraceEnabled()) {
137            log.trace("MongoDB: CLEAR");
138        }
139        coll.deleteMany(new Document());
140    }
141
142    // if possible, store the bytes as a UTF-8 string
143    protected static Object toStorage(byte[] bytes) {
144        try {
145            return bytesToString(bytes);
146        } catch (CharacterCodingException e) {
147            // could not decode as UTF-8, use a binary
148            return new Binary(bytes);
149        }
150    }
151
152    protected byte[] toBytes(Object value) {
153        if (value instanceof String) {
154            return ((String) value).getBytes(UTF_8);
155        } else if (value instanceof Long) {
156            return ((Long) value).toString().getBytes(UTF_8);
157        } else if (value instanceof Binary) {
158            return ((Binary) value).getData();
159        }
160        return null;
161    }
162
163    protected String toString(Object value) {
164        if (value instanceof String) {
165            return (String) value;
166        } else if (value instanceof Long) {
167            return ((Long) value).toString();
168        } else if (value instanceof Binary) {
169            byte[] bytes = ((Binary) value).getData();
170            try {
171                return bytesToString(bytes);
172            } catch (CharacterCodingException e) {
173                return null;
174            }
175        }
176        return null;
177    }
178
179    protected Long toLong(Object value) throws NumberFormatException { // NOSONAR
180        if (value instanceof Long) {
181            return (Long) value;
182        } else if (value instanceof String) {
183            return Long.valueOf((String) value);
184        } else if (value instanceof Binary) {
185            byte[] bytes = ((Binary) value).getData();
186            return bytesToLong(bytes);
187        }
188        return null;
189    }
190
191    @Override
192    public byte[] get(String key) {
193        Object value = getObject(key);
194        if (value == null) {
195            return null;
196        }
197        byte[] bytes = toBytes(value);
198        if (bytes != null) {
199            return bytes;
200        }
201        throw new UnsupportedOperationException(value.getClass().getName());
202    }
203
204    @Override
205    public String getString(String key) {
206        Object value = getObject(key);
207        if (value == null) {
208            return null;
209        }
210        String stringValue = toString(value);
211        if (stringValue != null) {
212            return stringValue;
213        }
214        throw new IllegalArgumentException("Value is not a String for key: " + key);
215    }
216
217    @Override
218    public Long getLong(String key) throws NumberFormatException { // NOSONAR
219        Object value = getObject(key);
220        if (value == null) {
221            return null;
222        }
223        Long longValue = toLong(value);
224        if (longValue != null) {
225            return longValue;
226        }
227        throw new NumberFormatException("Value is not a Long for key: " + key);
228    }
229
230    protected Object getObject(String key) {
231        Bson filter = eq(ID_KEY, key);
232        Document doc = coll.find(filter).first();
233        if (doc == null) {
234            if (log.isTraceEnabled()) {
235                log.trace("MongoDB: GET " + key + " = null");
236            }
237            return null;
238        }
239        Object value = doc.get(VALUE_KEY);
240        if (log.isTraceEnabled()) {
241            log.trace("MongoDB: GET " + key + " = " + value);
242        }
243        return value;
244    }
245
246    @Override
247    public Map<String, byte[]> get(Collection<String> keys) {
248        Map<String, byte[]> map = new HashMap<>(keys.size());
249        Consumer<Document> block = doc -> {
250            String key = doc.getString(ID_KEY);
251            Object value = doc.get(VALUE_KEY);
252            if (value != null) {
253                byte[] bytes = toBytes(value);
254                if (bytes == null) {
255                    throw new UnsupportedOperationException(String.format(
256                            "Value of class %s is not supported for key: %s", value.getClass().getName(), key));
257                }
258                map.put(key, bytes);
259            }
260        };
261        findByKeys(keys, block);
262        return map;
263    }
264
265    @Override
266    public Map<String, String> getStrings(Collection<String> keys) {
267        Map<String, String> map = new HashMap<>(keys.size());
268        Consumer<Document> block = doc -> {
269            String key = doc.getString(ID_KEY);
270            Object value = doc.get(VALUE_KEY);
271            if (value != null) {
272                String strValue = toString(value);
273                if (strValue == null) {
274                    throw new IllegalArgumentException("Value is not a String for key: " + key);
275                }
276                map.put(key, strValue);
277            }
278        };
279        findByKeys(keys, block);
280        return map;
281    }
282
283    @Override
284    public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR
285        Map<String, Long> map = new HashMap<>(keys.size());
286        Consumer<Document> block = doc -> {
287            String key = doc.getString(ID_KEY);
288            Object value = doc.get(VALUE_KEY);
289            if (value != null) {
290                Long longValue = toLong(value);
291                if (longValue == null) {
292                    throw new IllegalArgumentException("Value is not a Long for key: " + key);
293                }
294                map.put(key, longValue);
295            }
296        };
297        findByKeys(keys, block);
298        return map;
299    }
300
301    /**
302     * @since 9.10
303     */
304    protected void findByKeys(Collection<String> keys, Consumer<Document> block) {
305        coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block);
306    }
307
308    protected Date getDateFromTTL(long ttl) {
309        return new Date(System.currentTimeMillis() + ttl * 1000);
310    }
311
312    @Override
313    public void put(String key, byte[] bytes, long ttl) {
314        put(key, toStorage(bytes), ttl);
315    }
316
317    @Override
318    public void put(String key, String string) {
319        put(key, (Object) string, 0);
320    }
321
322    @Override
323    public void put(String key, String string, long ttl) {
324        put(key, (Object) string, ttl);
325    }
326
327    @Override
328    public void put(String key, Long value) {
329        put(key, (Object) value, 0);
330    }
331
332    @Override
333    public void put(String key, Long value, long ttl) {
334        put(key, (Object) value, ttl);
335    }
336
337    protected void put(String key, Object value, long ttl) {
338        Bson filter = eq(ID_KEY, key);
339        if (value == null) {
340            if (log.isTraceEnabled()) {
341                log.trace("MongoDB: DEL " + key);
342            }
343            coll.deleteOne(filter);
344        } else {
345            Document doc = new Document(VALUE_KEY, value);
346            addTTL(doc, ttl);
347            if (log.isTraceEnabled()) {
348                log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")"));
349            }
350            try {
351                coll.replaceOne(filter, doc, new ReplaceOptions().upsert(true));
352            } catch (MongoWriteException e) {
353                if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) {
354                    throw e;
355                }
356                // retry once, as not all server versions do server-side retries on upsert
357                coll.replaceOne(filter, doc, new ReplaceOptions().upsert(true));
358            }
359        }
360    }
361
362    protected void addTTL(Document doc, long ttl) {
363        if (ttl != 0) {
364            doc.append(TTL_KEY, getDateFromTTL(ttl));
365        }
366    }
367
368    @Override
369    public boolean setTTL(String key, long ttl) {
370        Bson filter = eq(ID_KEY, key);
371        Bson update;
372        if (ttl == 0) {
373            update = unset(TTL_KEY);
374        } else {
375            update = set(TTL_KEY, getDateFromTTL(ttl));
376        }
377        if (log.isTraceEnabled()) {
378            log.trace("MongoDB: SETTTL " + key + " = " + ttl);
379        }
380        UpdateResult res = coll.updateOne(filter, update);
381        return res.getModifiedCount() == 1;
382    }
383
384    @Override
385    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
386        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
387    }
388
389    @Override
390    public boolean compareAndSet(String key, String expected, String value, long ttl) {
391        return compareAndSet(key, (Object) expected, (Object) value, ttl);
392    }
393
394    protected boolean compareAndSet(String key, Object expected, Object value, long ttl) {
395        Bson filter = eq(ID_KEY, key);
396        if (expected == null && value == null) {
397            // check that document doesn't exist
398            Document doc = coll.find(filter).first();
399            boolean set = doc == null;
400            if (log.isTraceEnabled()) {
401                if (set) {
402                    log.trace("MongoDB: TEST " + key + " = null ? NOP");
403                } else {
404                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
405                }
406            }
407            return set;
408        } else if (expected == null) {
409            // set value if no document already exists: regular insert
410            Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value);
411            addTTL(doc, ttl);
412            boolean set;
413            try {
414                coll.insertOne(doc);
415                set = true;
416            } catch (MongoWriteException e) {
417                if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) {
418                    throw e;
419                }
420                set = false;
421            }
422            if (log.isTraceEnabled()) {
423                if (set) {
424                    log.trace("MongoDB: TEST " + key + " = null ? SET " + value);
425                } else {
426                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
427                }
428            }
429            return set;
430        } else if (value == null) {
431            // delete if previous value exists
432            filter = and(filter, eq(VALUE_KEY, expected));
433            DeleteResult res = coll.deleteOne(filter);
434            boolean set = res.getDeletedCount() == 1;
435            if (log.isTraceEnabled()) {
436                if (set) {
437                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL");
438                } else {
439                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
440                }
441            }
442            return set;
443        } else {
444            // replace if previous value exists
445            filter = and(filter, eq(VALUE_KEY, expected));
446            Document doc = new Document(VALUE_KEY, value);
447            addTTL(doc, ttl);
448            UpdateResult res = coll.replaceOne(filter, doc);
449            boolean set = res.getModifiedCount() == 1;
450            if (log.isTraceEnabled()) {
451                if (set) {
452                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value);
453                } else {
454                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
455                }
456            }
457            return set;
458        }
459    }
460
461    @Override
462    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
463        Bson filter = eq(ID_KEY, key);
464        Bson update = inc(VALUE_KEY, Long.valueOf(delta));
465        Document result;
466        try {
467            result = coll.findOneAndUpdate(filter, update,
468                    new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
469        } catch (MongoCommandException e) {
470            // Cannot apply $inc to a value of non-numeric type; code: 16837
471            if (!e.getMessage().contains("Cannot apply $inc")) {
472                throw new NuxeoException(e);
473            }
474            // for compatibility with other backends that don't have datatypes,
475            // try to interpret the value as the string representation of an integer
476            // (this keeps the underlying format as a String though)
477            return addAndGetGeneric(key, delta);
478        }
479        if (result == null) {
480            throw new NuxeoException("Unexpected null result, upsert failed for key: " + key);
481        }
482        return ((Long) result.get(VALUE_KEY)).longValue();
483    }
484
485    // works on any representation that can be converted to a Long
486    protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR
487        for (;;) {
488            Object value = getObject(key);
489            long result;
490            if (value == null) {
491                result = delta;
492            } else {
493                Long base = toLong(value);
494                if (base == null) {
495                    throw new NumberFormatException("Value is not a Long for key: " + key);
496                }
497                result = base.longValue() + delta;
498            }
499            Object newValue = Long.valueOf(result);
500            if (compareAndSet(key, value, newValue, 0)) {
501                return result;
502            }
503            // else loop to try again
504        }
505    }
506
507}