001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.mongodb.kv;
020
021import static com.mongodb.client.model.Filters.and;
022import static com.mongodb.client.model.Filters.eq;
023import static com.mongodb.client.model.Filters.in;
024import static com.mongodb.client.model.Projections.include;
025import static com.mongodb.client.model.Updates.set;
026import static com.mongodb.client.model.Updates.unset;
027import static java.nio.charset.StandardCharsets.UTF_8;
028
029import java.nio.charset.CharacterCodingException;
030import java.util.Collection;
031import java.util.Date;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.concurrent.TimeUnit;
035import java.util.stream.Stream;
036import java.util.stream.StreamSupport;
037
038import com.mongodb.Block;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.bson.Document;
043import org.bson.conversions.Bson;
044import org.bson.types.Binary;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
047import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
048import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
049
050import com.mongodb.ErrorCategory;
051import com.mongodb.MongoWriteException;
052import com.mongodb.client.MongoCollection;
053import com.mongodb.client.MongoDatabase;
054import com.mongodb.client.model.IndexOptions;
055import com.mongodb.client.model.UpdateOptions;
056import com.mongodb.client.result.DeleteResult;
057import com.mongodb.client.result.UpdateResult;
058
059/**
060 * MongoDB implementation of a Key/Value Store Provider.
061 * <p>
062 * The following configuration properties are available:
063 * <ul>
064 * <li>collection: the MongoDB collection prefix to use, the default is "kv". This will be followed by the Store name.
065 * </ul>
066 *
067 * @since 9.3
068 */
069public class MongoDBKeyValueStore extends AbstractKeyValueStoreProvider {
070
071    private static final Log log = LogFactory.getLog(MongoDBKeyValueStore.class);
072
073    public static final String KEYVALUE_CONNECTION_ID = "keyvalue";
074
075    public static final String COLLECTION_PROP = "collection";
076
077    public static final String COLLECTION_DEFAULT = "kv";
078
079    public static final String ID_KEY = "_id";
080
081    public static final String VALUE_KEY = "v";
082
083    public static final String TTL_KEY = "ttl";
084
085    public static final Double ONE = Double.valueOf(1);
086
087    protected String name;
088
089    protected MongoCollection<Document> coll;
090
091    @Override
092    public void initialize(KeyValueStoreDescriptor descriptor) {
093        name = descriptor.name;
094        Map<String, String> properties = descriptor.getProperties();
095        // find which collection prefix to use
096        String collectionName = properties.get(COLLECTION_PROP);
097        if (StringUtils.isBlank(collectionName)) {
098            collectionName = COLLECTION_DEFAULT;
099        }
100        collectionName += "." + name;
101        // get a connection to MongoDB
102        MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
103        MongoDatabase database = mongoService.getDatabase(KEYVALUE_CONNECTION_ID);
104        coll = database.getCollection(collectionName);
105        // make sure TTL works by creating the appropriate index
106        IndexOptions indexOptions = new IndexOptions().expireAfter(Long.valueOf(0), TimeUnit.SECONDS);
107        coll.createIndex(new Document(TTL_KEY, ONE), indexOptions);
108    }
109
110    @Override
111    public Stream<String> keyStream() {
112        return StreamSupport.stream(coll.find().projection(include(ID_KEY)).spliterator(), false)
113                            .map(doc -> doc.getString(ID_KEY));
114    }
115
116    @Override
117    public void close() {
118        if (coll != null) {
119            coll = null;
120        }
121    }
122
123    @Override
124    public void clear() {
125        if (log.isTraceEnabled()) {
126            log.trace("MongoDB: CLEAR");
127        }
128        coll.deleteMany(new Document());
129    }
130
131    // if possible, store the bytes as a UTF-8 string
132    protected static Object toStorage(byte[] bytes) {
133        try {
134            return bytesToString(bytes);
135        } catch (CharacterCodingException e) {
136            // could not decode as UTF-8, use a binary
137            return new Binary(bytes);
138        }
139    }
140
141    @Override
142    public byte[] get(String key) {
143        Object value = getObject(key);
144        if (value == null) {
145            return null;
146        } else if (value instanceof String) {
147            return ((String) value).getBytes(UTF_8);
148        } else if (value instanceof Binary) {
149            return ((Binary) value).getData();
150        }
151        throw new UnsupportedOperationException(value.getClass().getName());
152    }
153
154    @Override
155    public String getString(String key) {
156        Object value = getObject(key);
157        if (value == null) {
158            return null;
159        } else if (value instanceof String) {
160            return (String) value;
161        } else if (value instanceof Binary) {
162            byte[] bytes = ((Binary) value).getData();
163            try {
164                return bytesToString(bytes);
165            } catch (CharacterCodingException e) {
166                // fall through to throw
167            }
168        }
169        throw new IllegalArgumentException("Value is not a String for key: " + key);
170    }
171
172    protected Object getObject(String key) {
173        Bson filter = eq(ID_KEY, key);
174        Document doc = coll.find(filter).first();
175        if (doc == null) {
176            if (log.isTraceEnabled()) {
177                log.trace("MongoDB: GET " + key + " = null");
178            }
179            return null;
180        }
181        Object value = doc.get(VALUE_KEY);
182        if (log.isTraceEnabled()) {
183            log.trace("MongoDB: GET " + key + " = " + value);
184        }
185        return value;
186    }
187
188    @Override
189    public Map<String, byte[]> get(Collection<String> keys) {
190        Map<String, byte[]> map = new HashMap<>(keys.size());
191        Block<Document> block = doc -> {
192            String key = doc.getString(ID_KEY);
193            Object value = doc.get(VALUE_KEY);
194            if (value != null) {
195                byte[] bytesValue = null;
196                if (value instanceof String) {
197                    bytesValue = ((String) value).getBytes(UTF_8);
198                } else if (value instanceof Binary) {
199                    bytesValue = ((Binary) value).getData();
200                } else {
201                    throw new UnsupportedOperationException(String.format(
202                            "Value of class %s is not supported for key: %s", value.getClass().getName(), key));
203                }
204                map.put(key, bytesValue);
205            }
206        };
207        findByKeys(keys, block);
208        return map;
209    }
210
211    @Override
212    public Map<String, String> getStrings(Collection<String> keys) {
213        Map<String, String> map = new HashMap<>(keys.size());
214        Block<Document> block = doc -> {
215            String key = doc.getString(ID_KEY);
216            Object value = doc.get(VALUE_KEY);
217            if (value != null) {
218                String strValue = null;
219                if (value instanceof String) {
220                    strValue = (String) value;
221                } else if (value instanceof Binary) {
222                    byte[] bytes = ((Binary) value).getData();
223                    try {
224                        strValue = bytesToString(bytes);
225                    } catch (CharacterCodingException e) {
226                        // fall through to throw
227                    }
228                }
229                if (strValue == null) {
230                    throw new IllegalArgumentException("Value is not a String for key: " + key);
231                }
232                map.put(key, strValue);
233            }
234        };
235        findByKeys(keys, block);
236        return map;
237    }
238
239    /**
240     * @since 9.10
241     */
242    protected void findByKeys(Collection<String> keys, Block<Document> block) {
243        coll.find(in(ID_KEY, keys)).projection(include(ID_KEY, VALUE_KEY)).forEach(block);
244    }
245
246    protected Date getDateFromTTL(long ttl) {
247        return new Date(System.currentTimeMillis() + ttl * 1000);
248    }
249
250    @Override
251    public void put(String key, byte[] bytes, long ttl) {
252        put(key, toStorage(bytes), ttl);
253    }
254
255    @Override
256    public void put(String key, String string) {
257        put(key, (Object) string, 0);
258    }
259
260    @Override
261    public void put(String key, String string, long ttl) {
262        put(key, (Object) string, ttl);
263    }
264
265    protected void put(String key, Object value, long ttl) {
266        Bson filter = eq(ID_KEY, key);
267        if (value == null) {
268            if (log.isTraceEnabled()) {
269                log.trace("MongoDB: DEL " + key);
270            }
271            coll.deleteOne(filter);
272        } else {
273            Document doc = new Document(VALUE_KEY, value);
274            addTTL(doc, ttl);
275            if (log.isTraceEnabled()) {
276                log.trace("MongoDB: PUT " + key + " = " + value + (ttl == 0 ? "" : " (TTL " + ttl + ")"));
277            }
278            coll.replaceOne(filter, doc, new UpdateOptions().upsert(true));
279        }
280    }
281
282    protected void addTTL(Document doc, long ttl) {
283        if (ttl != 0) {
284            doc.append(TTL_KEY, getDateFromTTL(ttl));
285        }
286    }
287
288    @Override
289    public boolean setTTL(String key, long ttl) {
290        Bson filter = eq(ID_KEY, key);
291        Bson update;
292        if (ttl == 0) {
293            update = unset(TTL_KEY);
294        } else {
295            update = set(TTL_KEY, getDateFromTTL(ttl));
296        }
297        if (log.isTraceEnabled()) {
298            log.trace("MongoDB: SETTTL " + key + " = " + ttl);
299        }
300        UpdateResult res = coll.updateOne(filter, update);
301        return res.getModifiedCount() == 1;
302    }
303
304    @Override
305    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
306        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
307    }
308
309    @Override
310    public boolean compareAndSet(String key, String expected, String value, long ttl) {
311        return compareAndSet(key, (Object) expected, (Object) value, ttl);
312    }
313
314    protected boolean compareAndSet(String key, Object expected, Object value, long ttl) {
315        Bson filter = eq(ID_KEY, key);
316        if (expected == null && value == null) {
317            // check that document doesn't exist
318            Document doc = coll.find(filter).first();
319            boolean set = doc == null;
320            if (log.isTraceEnabled()) {
321                if (set) {
322                    log.trace("MongoDB: TEST " + key + " = null ? NOP");
323                } else {
324                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
325                }
326            }
327            return set;
328        } else if (expected == null) {
329            // set value if no document already exists: regular insert
330            Document doc = new Document(ID_KEY, key).append(VALUE_KEY, value);
331            addTTL(doc, ttl);
332            boolean set;
333            try {
334                coll.insertOne(doc);
335                set = true;
336            } catch (MongoWriteException e) {
337                if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) {
338                    throw e;
339                }
340                set = false;
341            }
342            if (log.isTraceEnabled()) {
343                if (set) {
344                    log.trace("MongoDB: TEST " + key + " = null ? SET " + value);
345                } else {
346                    log.trace("MongoDB: TEST " + key + " = null ? FAILED");
347                }
348            }
349            return set;
350        } else if (value == null) {
351            // delete if previous value exists
352            filter = and(filter, eq(VALUE_KEY, expected));
353            DeleteResult res = coll.deleteOne(filter);
354            boolean set = res.getDeletedCount() == 1;
355            if (log.isTraceEnabled()) {
356                if (set) {
357                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? DEL");
358                } else {
359                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
360                }
361            }
362            return set;
363        } else {
364            // replace if previous value exists
365            filter = and(filter, eq(VALUE_KEY, expected));
366            Document doc = new Document(VALUE_KEY, value);
367            addTTL(doc, ttl);
368            UpdateResult res = coll.replaceOne(filter, doc);
369            boolean set = res.getModifiedCount() == 1;
370            if (log.isTraceEnabled()) {
371                if (set) {
372                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? SET " + value);
373                } else {
374                    log.trace("MongoDB: TEST " + key + " = " + expected + " ? FAILED");
375                }
376            }
377            return set;
378        }
379    }
380
381    @Override
382    public String toString() {
383        return getClass().getSimpleName() + "(" + name + ")";
384    }
385
386}