001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.mongodb.kv;
020
021import static com.mongodb.client.model.Filters.and;
022import static com.mongodb.client.model.Filters.eq;
023import static com.mongodb.client.model.Filters.in;
024import static com.mongodb.client.model.Projections.include;
025import static com.mongodb.client.model.Updates.inc;
026import static com.mongodb.client.model.Updates.set;
027import static com.mongodb.client.model.Updates.unset;
028import static java.nio.charset.StandardCharsets.UTF_8;
029
030import java.nio.charset.CharacterCodingException;
031import java.util.Collection;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.TimeUnit;
036import java.util.regex.Pattern;
037import java.util.stream.Stream;
038import java.util.stream.StreamSupport;
039
040import org.apache.commons.lang3.StringUtils;
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.bson.Document;
044import org.bson.conversions.Bson;
045import org.bson.types.Binary;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.runtime.api.Framework;
048import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
049import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
050import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
051
052import com.mongodb.Block;
053import com.mongodb.ErrorCategory;
054import com.mongodb.MongoCommandException;
055import com.mongodb.MongoWriteException;
056import com.mongodb.client.MongoCollection;
057import com.mongodb.client.MongoDatabase;
058import com.mongodb.client.model.FindOneAndUpdateOptions;
059import com.mongodb.client.model.IndexOptions;
060import com.mongodb.client.model.ReturnDocument;
061import com.mongodb.client.model.UpdateOptions;
062import com.mongodb.client.result.DeleteResult;
063import com.mongodb.client.result.UpdateResult;
064
065/**
066 * MongoDB implementation of a Key/Value Store Provider.
067 * <p>
068 * The following configuration properties are available:
069 * <ul>
070 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name.
071 * </ul>
072 *
073 * @since 9.3
074 */
075public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider {
076
077    private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class);
078
079    public static final String KEYVALUE_CONNECTION_ID = "keyvalue";
080
081    public static final String COLLECTION_PROP = "collection";
082
083    public static final String COLLECTION_DEFAULT = "kv";
084
085    public static final String ID_KEY = "_id";
086
087    public static final String VALUE_KEY = "v";
088
089    public static final String TTL_KEY = "ttl";
090
091    public static final Double ONE = Double.valueOf(1);
092
093    protected MongoCollection<Document> coll;
094
095    @Override
096    public void initialize(KeyValueStoreDescriptor descriptor) {
097        super.initialize(descriptor);
098        Map<String, String> properties = descriptor.properties;
099        // find which collection prefix to use
100        String collectionName = properties.get(COLLECTION_PROP);
101        if (StringUtils.isBlank(collectionName)) {
102            collectionName = COLLECTION_DEFAULT;
103        }
104        collectionName += "." + name;
105        // get a connection to MongoDB
106        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
107        MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID);
108        coll = database.getCollection(collectionName);
109        // make sure TTL works by creating the appropriate index
110        IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS);
111        coll.createIndex(new Document(TTL_KEY, ONE), indexOptions);
112    }
113
114    @Override
115    public Stream<String> keyStream() {
116        return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false)
117                            .map(doc -> doc.getString(ID_KEY));
118    }
119
120    @Override
121    public Stream<String> keyStream(String prefix) {
122        Document filter = new Document(ID_KEY, Pattern.compile('^' + Pattern.quote(prefix)));
123        return StreamSupport.stream(coll.find(filter).projection(include(ID_KEY)).spliterator(), false)
124                            .map(doc -> doc.getString(ID_KEY));
125    }
126
127    @Override
128    public void close() {
129        if (coll != null) {
130            coll = null;
131        }
132    }
133
134    @Override
135    public void clear() {
136        if (log.isTraceEnabled()) {
137            log.trace("MongoDB: CLEAR");
138        }
139        coll.deleteMany(new Document());
140    }
141
142    // if possible, store the bytes as a UTF-8 string
143    protected static Object toStorage(byte[] bytes) {
144        try {
145            return bytesToString(bytes);
146        } catch (CharacterCodingException e) {
147            // could not decode as UTF-8, use a binary
148            return new Binary(bytes);
149        }
150    }
151
152    protected byte[] toBytes(Object value) {
153        if (value instanceof String) {
154            return ((String) value).getBytes(UTF_8);
155        } else if (value instanceof Long) {
156            return ((Long) value).toString().getBytes(UTF_8);
157        } else if (value instanceof Binary) {
158            return ((Binary) value).getData();
159        }
160        return null;
161    }
162
163    protected String toString(Object value) {
164        if (value instanceof String) {
165            return (String) value;
166        } else if (value instanceof Long) {
167            return ((Long) value).toString();
168        } else if (value instanceof Binary) {
169            byte[] bytes = ((Binary) value).getData();
170            try {
171                return bytesToString(bytes);
172            } catch (CharacterCodingException e) {
173                return null;
174            }
175        }
176        return null;
177    }
178
179    protected Long toLong(Object value) throws NumberFormatException { // NOSONAR
180        if (value instanceof Long) {
181            return (Long) value;
182        } else if (value instanceof String) {
183            return Long.valueOf((String) value);
184        } else if (value instanceof Binary) {
185            byte[] bytes = ((Binary) value).getData();
186            return bytesToLong(bytes);
187        }
188        return null;
189    }
190
191    @Override
192    public byte[] get(String key) {
193        Object value = getObject(key);
194        if (value == null) {
195            return null;
196        }
197        byte[] bytes = toBytes(value);
198        if (bytes != null) {
199            return bytes;
200        }
201        throw new UnsupportedOperationException(value.getClass().getName());
202    }
203
204    @Override
205    public String getString(String key) {
206        Object value = getObject(key);
207        if (value == null) {
208            return null;
209        }
210        String stringValue = toString(value);
211        if (stringValue != null) {
212            return stringValue;
213        }
214        throw new IllegalArgumentException("Value is not a String for key: " + key);
215    }
216
217    @Override
218    public Long getLong(String key) throws NumberFormatException { // NOSONAR
219        Object value = getObject(key);
220        if (value == null) {
221            return null;
222        }
223        Long longValue = toLong(value);
224        if (longValue != null) {
225            return longValue;
226        }
227        throw new NumberFormatException("Value is not a Long for key: " + key);
228    }
229
230    protected Object getObject(String key) {
231        Bson filter = eq(ID_KEY, key);
232        Document doc = coll.find(filter).first();
233        if (doc == null) {
234            if (log.isTraceEnabled()) {
235                log.trace("MongoDB: GET " + key + " = null");
236            }
237            return null;
238        }
239        Object value = doc.get(VALUE_KEY);
240        if (log.isTraceEnabled()) {
241            log.trace("MongoDB: GET " + key + " = " + value);
242        }
243        return value;
244    }
245
246    @Override
247    public Map<String, byte[]> get(Collection<String> keys) {
248        Map<String, byte[]> map = new HashMap<>(keys.size());
249        Block<Document> block = doc -> {
250            String key = doc.getString(ID_KEY);
251            Object value = doc.get(VALUE_KEY);
252            if (value != null) {
253                byte[] bytes = toBytes(value);
254                if (bytes == null) {
255                    throw new UnsupportedOperationException(String.format(
256                            "Value of class %s is not supported for key: %s", value.getClass().getName(), key));
257                }
258                map.put(key, bytes);
259            }
260        };
261        findByKeys(keys, block);
262        return map;
263    }
264
265    @Override
266    public Map<String, String> getStrings(Collection<String> keys) {
267        Map<String, String> map = new HashMap<>(keys.size());
268        Block<Document> block = doc -> {
269            String key = doc.getString(ID_KEY);
270            Object value = doc.get(VALUE_KEY);
271            if (value != null) {
272                String strValue = toString(value);
273                if (strValue == null) {
274                    throw new IllegalArgumentException("Value is not a String for key: " + key);
275                }
276                map.put(key, strValue);
277            }
278        };
279        findByKeys(keys, block);
280        return map;
281    }
282
283    @Override
284    public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR
285        Map<String, Long> map = new HashMap<>(keys.size());
286        Block<Document> block = doc -> {
287            String key = doc.getString(ID_KEY);
288            Object value = doc.get(VALUE_KEY);
289            if (value != null) {
290                Long longValue = toLong(value);
291                if (longValue == null) {
292                    throw new IllegalArgumentException("Value is not a Long for key: " + key);
293                }
294                map.put(key, longValue);
295            }
296        };
297        findByKeys(keys, block);
298        return map;
299    }
300
301    /**
302     * @since 9.10
303     */
304    protected void findByKeys(Collection<String> keys, Block<Document> block) {
305        coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block);
306    }
307
308    protected Date getDateFromTTL(long ttl) {
309        return new Date(System.currentTimeMillis() + ttl * 1000);
310    }
311
312    @Override
313    public void put(String key, byte[] bytes, long ttl) {
314        put(key, toStorage(bytes), ttl);
315    }
316
317    @Override
318    public void put(String key, String string) {
319        put(key, (Object) string, 0);
320    }
321
322    @Override
323    public void put(String key, String string, long ttl) {
324        put(key, (Object) string, ttl);
325    }
326
327    @Override
328    public void put(String key, Long value) {
329        put(key, (Object) value, 0);
330    }
331
332    @Override
333    public void put(String key, Long value, long ttl) {
334        put(key, (Object) value, ttl);
335    }
336
337    protected void put(String key, Object value, long ttl) {
338        Bson filter = eq(ID_KEY, key);
339        if (value == null) {
340            if (log.isTraceEnabled()) {
341                log.trace("MongoDB: DEL " + key);
342            }
343            coll.deleteOne(filter);
344        } else {
345            Document doc = new Document(VALUE_KEY, value);
346            addTTL(doc, ttl);
347            if (log.isTraceEnabled()) {
348                log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")"));
349            }
350            coll.replaceOne(filter, doc, new UpdateOptions().upsert(true));
351        }
352    }
353
354    protected void addTTL(Document doc, long ttl) {
355        if (ttl != 0) {
356            doc.append(TTL_KEY, getDateFromTTL(ttl));
357        }
358    }
359
360    @Override
361    public boolean setTTL(String key, long ttl) {
362        Bson filter = eq(ID_KEY, key);
363        Bson update;
364        if (ttl == 0) {
365            update = unset(TTL_KEY);
366        } else {
367            update = set(TTL_KEY, getDateFromTTL(ttl));
368        }
369        if (log.isTraceEnabled()) {
370            log.trace("MongoDB: SETTTL " + key + " = " + ttl);
371        }
372        UpdateResult res = coll.updateOne(filter, update);
373        return res.getModifiedCount() == 1;
374    }
375
376    @Override
377    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
378        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
379    }
380
381    @Override
382    public boolean compareAndSet(String key, String expected, String value, long ttl) {
383        return compareAndSet(key, (Object) expected, (Object) value, ttl);
384    }
385
386    protected boolean compareAndSet(String key, Object expected, Object value, long ttl) {
387        Bson filter = eq(ID_KEY, key);
388        if (expected == null && value == null) {
389            // check that document doesn't exist
390            Document doc = coll.find(filter).first();
391            boolean set = doc == null;
392            if (log.isTraceEnabled()) {
393                if (set) {
394                    log.trace("MongoDB: TEST " + key + " = null ? NOP");
395                } else {
396                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
397                }
398            }
399            return set;
400        } else if (expected == null) {
401            // set value if no document already exists: regular insert
402            Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value);
403            addTTL(doc, ttl);
404            boolean set;
405            try {
406                coll.insertOne(doc);
407                set = true;
408            } catch (MongoWriteException e) {
409                if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) {
410                    throw e;
411                }
412                set = false;
413            }
414            if (log.isTraceEnabled()) {
415                if (set) {
416                    log.trace("MongoDB: TEST " + key + " = null ? SET " + value);
417                } else {
418                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
419                }
420            }
421            return set;
422        } else if (value == null) {
423            // delete if previous value exists
424            filter = and(filter, eq(VALUE_KEY, expected));
425            DeleteResult res = coll.deleteOne(filter);
426            boolean set = res.getDeletedCount() == 1;
427            if (log.isTraceEnabled()) {
428                if (set) {
429                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL");
430                } else {
431                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
432                }
433            }
434            return set;
435        } else {
436            // replace if previous value exists
437            filter = and(filter, eq(VALUE_KEY, expected));
438            Document doc = new Document(VALUE_KEY, value);
439            addTTL(doc, ttl);
440            UpdateResult res = coll.replaceOne(filter, doc);
441            boolean set = res.getModifiedCount() == 1;
442            if (log.isTraceEnabled()) {
443                if (set) {
444                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value);
445                } else {
446                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
447                }
448            }
449            return set;
450        }
451    }
452
453    @Override
454    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
455        Bson filter = eq(ID_KEY, key);
456        Bson update = inc(VALUE_KEY, Long.valueOf(delta));
457        Document result;
458        try {
459            result = coll.findOneAndUpdate(filter, update,
460                    new FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER));
461        } catch (MongoCommandException e) {
462            // Cannot apply $inc to a value of non-numeric type; code: 16837
463            if (!e.getMessage().contains("Cannot apply $inc")) {
464                throw new NuxeoException(e);
465            }
466            // for compatibility with other backends that don't have datatypes,
467            // try to interpret the value as the string representation of an integer
468            // (this keeps the underlying format as a String though)
469            return addAndGetGeneric(key, delta);
470        }
471        if (result == null) {
472            throw new NuxeoException("Unexpected null result, upsert failed for key: " + key);
473        }
474        return ((Long) result.get(VALUE_KEY)).longValue();
475    }
476
477    // works on any representation that can be converted to a Long
478    protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR
479        for (;;) {
480            Object value = getObject(key);
481            long result;
482            if (value == null) {
483                result = delta;
484            } else {
485                Long base = toLong(value);
486                if (base == null) {
487                    throw new NumberFormatException("Value is not a Long for key: " + key);
488                }
489                result = base.longValue() + delta;
490            }
491            Object newValue = Long.valueOf(result);
492            if (compareAndSet(key, value, newValue, 0)) {
493                return result;
494            }
495            // else loop to try again
496        }
497    }
498
499}