001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.mongodb.kv;
020
021import static com.mongodb.client.model.Filters.and;
022import static com.mongodb.client.model.Filters.eq;
023import static com.mongodb.client.model.Filters.in;
024import static com.mongodb.client.model.Projections.include;
025import static com.mongodb.client.model.Updates.inc;
026import static com.mongodb.client.model.Updates.set;
027import static com.mongodb.client.model.Updates.unset;
028import static java.nio.charset.StandardCharsets.UTF_8;
029
030import java.nio.charset.CharacterCodingException;
031import java.util.Collection;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.TimeUnit;
036import java.util.stream.Stream;
037import java.util.stream.StreamSupport;
038
039import com.mongodb.Block;
040import org.apache.commons.lang3.StringUtils;
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.bson.Document;
044import org.bson.conversions.Bson;
045import org.bson.types.Binary;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.runtime.api.Framework;
048import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
049import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
050import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
051
052import com.mongodb.ErrorCategory;
053import com.mongodb.MongoCommandException;
054import com.mongodb.MongoWriteException;
055import com.mongodb.client.MongoCollection;
056import com.mongodb.client.MongoDatabase;
057import com.mongodb.client.model.FindOneAndUpdateOptions;
058import com.mongodb.client.model.IndexOptions;
059import com.mongodb.client.model.ReturnDocument;
060import com.mongodb.client.model.UpdateOptions;
061import com.mongodb.client.result.DeleteResult;
062import com.mongodb.client.result.UpdateResult;
063
064/**
065 * MongoDB implementation of a Key/Value Store Provider.
066 * <p>
067 * The following configuration properties are available:
068 * <ul>
069 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name.
070 * </ul>
071 *
072 * @since 9.3
073 */
074public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider {
075
076    private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class);
077
078    public static final String KEYVALUE_CONNECTION_ID = "keyvalue";
079
080    public static final String COLLECTION_PROP = "collection";
081
082    public static final String COLLECTION_DEFAULT = "kv";
083
084    public static final String ID_KEY = "_id";
085
086    public static final String VALUE_KEY = "v";
087
088    public static final String TTL_KEY = "ttl";
089
090    public static final Double ONE = Double.valueOf(1);
091
092    protected String name;
093
094    protected MongoCollection<Document> coll;
095
096    @Override
097    public void initialize(KeyValueStoreDescriptor descriptor) {
098        name = descriptor.name;
099        Map<String, String> properties = descriptor.getProperties();
100        // find which collection prefix to use
101        String collectionName = properties.get(COLLECTION_PROP);
102        if (StringUtils.isBlank(collectionName)) {
103            collectionName = COLLECTION_DEFAULT;
104        }
105        collectionName += "." + name;
106        // get a connection to MongoDB
107        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
108        MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID);
109        coll = database.getCollection(collectionName);
110        // make sure TTL works by creating the appropriate index
111        IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS);
112        coll.createIndex(new Document(TTL_KEY, ONE), indexOptions);
113    }
114
115    @Override
116    public Stream<String> keyStream() {
117        return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false)
118                            .map(doc -> doc.getString(ID_KEY));
119    }
120
121    @Override
122    public void close() {
123        if (coll != null) {
124            coll = null;
125        }
126    }
127
128    @Override
129    public void clear() {
130        if (log.isTraceEnabled()) {
131            log.trace("MongoDB: CLEAR");
132        }
133        coll.deleteMany(new Document());
134    }
135
136    // if possible, store the bytes as a UTF-8 string
137    protected static Object toStorage(byte[] bytes) {
138        try {
139            return bytesToString(bytes);
140        } catch (CharacterCodingException e) {
141            // could not decode as UTF-8, use a binary
142            return new Binary(bytes);
143        }
144    }
145
146    protected byte[] toBytes(Object value) {
147        if (value instanceof String) {
148            return ((String) value).getBytes(UTF_8);
149        } else if (value instanceof Long) {
150            return ((Long) value).toString().getBytes(UTF_8);
151        } else if (value instanceof Binary) {
152            return ((Binary) value).getData();
153        }
154        return null;
155    }
156
157    protected String toString(Object value) {
158        if (value instanceof String) {
159            return (String) value;
160        } else if (value instanceof Long) {
161            return ((Long) value).toString();
162        } else if (value instanceof Binary) {
163            byte[] bytes = ((Binary) value).getData();
164            try {
165                return bytesToString(bytes);
166            } catch (CharacterCodingException e) {
167                return null;
168            }
169        }
170        return null;
171    }
172
173    protected Long toLong(Object value) throws NumberFormatException { // NOSONAR
174        if (value instanceof Long) {
175            return (Long) value;
176        } else if (value instanceof String) {
177            return Long.valueOf((String) value);
178        } else if (value instanceof Binary) {
179            byte[] bytes = ((Binary) value).getData();
180            return bytesToLong(bytes);
181        }
182        return null;
183    }
184
185    @Override
186    public byte[] get(String key) {
187        Object value = getObject(key);
188        if (value == null) {
189            return null;
190        }
191        byte[] bytes = toBytes(value);
192        if (bytes != null) {
193            return bytes;
194        }
195        throw new UnsupportedOperationException(value.getClass().getName());
196    }
197
198    @Override
199    public String getString(String key) {
200        Object value = getObject(key);
201        if (value == null) {
202            return null;
203        }
204        String stringValue = toString(value);
205        if (stringValue != null) {
206            return stringValue;
207        }
208        throw new IllegalArgumentException("Value is not a String for key: " + key);
209    }
210
211    @Override
212    public Long getLong(String key) throws NumberFormatException { // NOSONAR
213        Object value = getObject(key);
214        if (value == null) {
215            return null;
216        }
217        Long longValue = toLong(value);
218        if (longValue != null) {
219            return longValue;
220        }
221        throw new NumberFormatException("Value is not a Long for key: " + key);
222    }
223
224    protected Object getObject(String key) {
225        Bson filter = eq(ID_KEY, key);
226        Document doc = coll.find(filter).first();
227        if (doc == null) {
228            if (log.isTraceEnabled()) {
229                log.trace("MongoDB: GET " + key + " = null");
230            }
231            return null;
232        }
233        Object value = doc.get(VALUE_KEY);
234        if (log.isTraceEnabled()) {
235            log.trace("MongoDB: GET " + key + " = " + value);
236        }
237        return value;
238    }
239
240    @Override
241    public Map<String, byte[]> get(Collection<String> keys) {
242        Map<String, byte[]> map = new HashMap<>(keys.size());
243        Block<Document> block = doc -> {
244            String key = doc.getString(ID_KEY);
245            Object value = doc.get(VALUE_KEY);
246            if (value != null) {
247                byte[] bytes = toBytes(value);
248                if (bytes == null) {
249                    throw new UnsupportedOperationException(String.format(
250                            "Value of class %s is not supported for key: %s", value.getClass().getName(), key));
251                }
252                map.put(key, bytes);
253            }
254        };
255        findByKeys(keys, block);
256        return map;
257    }
258
259    @Override
260    public Map<String, String> getStrings(Collection<String> keys) {
261        Map<String, String> map = new HashMap<>(keys.size());
262        Block<Document> block = doc -> {
263            String key = doc.getString(ID_KEY);
264            Object value = doc.get(VALUE_KEY);
265            if (value != null) {
266                String strValue = toString(value);
267                if (strValue == null) {
268                    throw new IllegalArgumentException("Value is not a String for key: " + key);
269                }
270                map.put(key, strValue);
271            }
272        };
273        findByKeys(keys, block);
274        return map;
275    }
276
277    @Override
278    public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR
279        Map<String, Long> map = new HashMap<>(keys.size());
280        Block<Document> block = doc -> {
281            String key = doc.getString(ID_KEY);
282            Object value = doc.get(VALUE_KEY);
283            if (value != null) {
284                Long longValue = toLong(value);
285                if (longValue == null) {
286                    throw new IllegalArgumentException("Value is not a Long for key: " + key);
287                }
288                map.put(key, longValue);
289            }
290        };
291        findByKeys(keys, block);
292        return map;
293    }
294
295    /**
296     * @since 9.10
297     */
298    protected void findByKeys(Collection<String> keys, Block<Document> block) {
299        coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block);
300    }
301
302    protected Date getDateFromTTL(long ttl) {
303        return new Date(System.currentTimeMillis() + ttl * 1000);
304    }
305
306    @Override
307    public void put(String key, byte[] bytes, long ttl) {
308        put(key, toStorage(bytes), ttl);
309    }
310
311    @Override
312    public void put(String key, String string) {
313        put(key, (Object) string, 0);
314    }
315
316    @Override
317    public void put(String key, String string, long ttl) {
318        put(key, (Object) string, ttl);
319    }
320
321    @Override
322    public void put(String key, Long value) {
323        put(key, (Object) value, 0);
324    }
325
326    @Override
327    public void put(String key, Long value, long ttl) {
328        put(key, (Object) value, ttl);
329    }
330
331    protected void put(String key, Object value, long ttl) {
332        Bson filter = eq(ID_KEY, key);
333        if (value == null) {
334            if (log.isTraceEnabled()) {
335                log.trace("MongoDB: DEL " + key);
336            }
337            coll.deleteOne(filter);
338        } else {
339            Document doc = new Document(VALUE_KEY, value);
340            addTTL(doc, ttl);
341            if (log.isTraceEnabled()) {
342                log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")"));
343            }
344            coll.replaceOne(filter, doc, new UpdateOptions().upsert(true));
345        }
346    }
347
348    protected void addTTL(Document doc, long ttl) {
349        if (ttl != 0) {
350            doc.append(TTL_KEY, getDateFromTTL(ttl));
351        }
352    }
353
354    @Override
355    public boolean setTTL(String key, long ttl) {
356        Bson filter = eq(ID_KEY, key);
357        Bson update;
358        if (ttl == 0) {
359            update = unset(TTL_KEY);
360        } else {
361            update = set(TTL_KEY, getDateFromTTL(ttl));
362        }
363        if (log.isTraceEnabled()) {
364            log.trace("MongoDB: SETTTL " + key + " = " + ttl);
365        }
366        UpdateResult res = coll.updateOne(filter, update);
367        return res.getModifiedCount() == 1;
368    }
369
370    @Override
371    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
372        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
373    }
374
375    @Override
376    public boolean compareAndSet(String key, String expected, String value, long ttl) {
377        return compareAndSet(key, (Object) expected, (Object) value, ttl);
378    }
379
380    protected boolean compareAndSet(String key, Object expected, Object value, long ttl) {
381        Bson filter = eq(ID_KEY, key);
382        if (expected == null && value == null) {
383            // check that document doesn't exist
384            Document doc = coll.find(filter).first();
385            boolean set = doc == null;
386            if (log.isTraceEnabled()) {
387                if (set) {
388                    log.trace("MongoDB: TEST " + key + " = null ? NOP");
389                } else {
390                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
391                }
392            }
393            return set;
394        } else if (expected == null) {
395            // set value if no document already exists: regular insert
396            Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value);
397            addTTL(doc, ttl);
398            boolean set;
399            try {
400                coll.insertOne(doc);
401                set = true;
402            } catch (MongoWriteException e) {
403                if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) {
404                    throw e;
405                }
406                set = false;
407            }
408            if (log.isTraceEnabled()) {
409                if (set) {
410                    log.trace("MongoDB: TEST " + key + " = null ? SET " + value);
411                } else {
412                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
413                }
414            }
415            return set;
416        } else if (value == null) {
417            // delete if previous value exists
418            filter = and(filter, eq(VALUE_KEY, expected));
419            DeleteResult res = coll.deleteOne(filter);
420            boolean set = res.getDeletedCount() == 1;
421            if (log.isTraceEnabled()) {
422                if (set) {
423                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL");
424                } else {
425                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
426                }
427            }
428            return set;
429        } else {
430            // replace if previous value exists
431            filter = and(filter, eq(VALUE_KEY, expected));
432            Document doc = new Document(VALUE_KEY, value);
433            addTTL(doc, ttl);
434            UpdateResult res = coll.replaceOne(filter, doc);
435            boolean set = res.getModifiedCount() == 1;
436            if (log.isTraceEnabled()) {
437                if (set) {
438                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value);
439                } else {
440                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
441                }
442            }
443            return set;
444        }
445    }
446
447    @Override
448    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
449        Bson filter = eq(ID_KEY, key);
450        Bson update = inc(VALUE_KEY, Long.valueOf(delta));
451        Document result;
452        try {
453            result = coll.findOneAndUpdate(filter, update,
454                    new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
455        } catch (MongoCommandException e) {
456            // Cannot apply $inc to a value of non-numeric type; code: 16837
457            if (!e.getMessage().contains("Cannot apply $inc")) {
458                throw new NuxeoException(e);
459            }
460            // for compatibility with other backends that don't have datatypes,
461            // try to interpret the value as the string representation of an integer
462            // (this keeps the underlying format as a String though)
463            return addAndGetGeneric(key, delta);
464        }
465        if (result == null) {
466            throw new NuxeoException("Unexpected null result, upsert failed for key: " + key);
467        }
468        return ((Long) result.get(VALUE_KEY)).longValue();
469    }
470
471    // works on any representation that can be converted to a Long
472    protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR
473        for (;;) {
474            Object value = getObject(key);
475            long result;
476            if (value == null) {
477                result = delta;
478            } else {
479                Long base = toLong(value);
480                if (base == null) {
481                    throw new NumberFormatException("Value is not a Long for key: " + key);
482                }
483                result = base.longValue() + delta;
484            }
485            Object newValue = Long.valueOf(result);
486            if (compareAndSet(key, value, newValue, 0)) {
487                return result;
488            }
489            // else loop to try again
490        }
491    }
492
493    @Override
494    public String toString() {
495        return getClass().getSimpleName() + "(" + name + ")";
496    }
497
498}