001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.mongodb.seqgen;
020
021import static com.mongodb.client.model.Filters.and;
022import static com.mongodb.client.model.Filters.eq;
023import static com.mongodb.client.model.Filters.gte;
024import static com.mongodb.client.model.Filters.not;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.bson.Document;
029import org.bson.conversions.Bson;
030import org.nuxeo.ecm.core.api.NuxeoException;
031import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer;
032import org.nuxeo.ecm.core.uidgen.UIDSequencer;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
035import org.nuxeo.runtime.mongodb.MongoDBSerializationHelper;
036import org.nuxeo.runtime.services.config.ConfigurationService;
037
038import com.mongodb.MongoWriteException;
039import com.mongodb.client.MongoCollection;
040import com.mongodb.client.MongoDatabase;
041import com.mongodb.client.model.FindOneAndUpdateOptions;
042import com.mongodb.client.model.ReturnDocument;
043import com.mongodb.client.model.UpdateOptions;
044import com.mongodb.client.model.Updates;
045
046import java.util.ArrayList;
047import java.util.List;
048
049/**
050 * MongoDB implementation of {@link UIDSequencer}.
051 * <p>
052 * We use MongoDB upsert feature to provide a sequencer.
053 *
054 * @since 9.1
055 */
056public class MongoDBUIDSequencer extends AbstractUIDSequencer {
057
058    private static final Log log = LogFactory.getLog(MongoDBUIDSequencer.class);
059
060    public static final String SEQUENCE_DATABASE_ID = "sequence";
061
062    public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.seqgen.collection.name";
063
064    public static final String DEFAULT_COLLECTION_NAME = "sequence";
065
066    public static final String SEQUENCE_VALUE_FIELD = "sequence";
067
068    protected MongoCollection<Document> coll;
069
070    @Override
071    public void init() {
072        getSequencerCollection();
073    }
074
075    @Override
076    public void initSequence(String key, long id) {
077        try {
078            Bson filter = and(eq(MongoDBSerializationHelper.MONGODB_ID, key), not(gte(SEQUENCE_VALUE_FIELD, id)));
079            Document sequence = new Document();
080            sequence.put(MongoDBSerializationHelper.MONGODB_ID, key);
081            sequence.put(SEQUENCE_VALUE_FIELD, id);
082            getSequencerCollection().replaceOne(filter, sequence, new UpdateOptions().upsert(true));
083        } catch (MongoWriteException e) {
084            throw new NuxeoException("Failed to update the sequence '" + key + "' with value " + id, e);
085        }
086    }
087
088    public MongoCollection<Document> getSequencerCollection() {
089        if (coll == null) {
090            // Get collection name
091            ConfigurationService configurationService = Framework.getService(ConfigurationService.class);
092            String collName = configurationService.getProperty(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME);
093            // Get a connection to MongoDB
094            MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
095            // Get database
096            MongoDatabase database = mongoService.getDatabase(SEQUENCE_DATABASE_ID);
097            // Get collection
098            coll = database.getCollection(collName);
099        }
100        return coll;
101    }
102
103    @Override
104    public long getNextLong(String key) {
105        return incrementBy(key, 1);
106    }
107
108    @Override
109    public List<Long> getNextBlock(String key, int blockSize) {
110        List<Long> ret = new ArrayList<>(blockSize);
111        long last = incrementBy(key, blockSize);
112        for (int i = blockSize - 1; i >= 0; i--) {
113            ret.add(last - i);
114        }
115        return ret;
116    }
117
118    protected long incrementBy(String key, int value) {
119        FindOneAndUpdateOptions options = new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
120        Bson filter = eq(MongoDBSerializationHelper.MONGODB_ID, key);
121        Bson update = Updates.inc(SEQUENCE_VALUE_FIELD, Long.valueOf(value));
122        Document sequence = getSequencerCollection().findOneAndUpdate(filter, update, options);
123        // If sequence is null, we need to create it
124        if (sequence == null) {
125            try {
126                sequence = new Document();
127                sequence.put(MongoDBSerializationHelper.MONGODB_ID, key);
128                sequence.put(SEQUENCE_VALUE_FIELD, Long.valueOf(value));
129                getSequencerCollection().insertOne(sequence);
130            } catch (MongoWriteException e) {
131                // There was a race condition - just re-run getNextLong
132                if (log.isTraceEnabled()) {
133                    log.trace("There was a race condition during '" + key + "' sequence insertion", e);
134                }
135                return getNextLong(key);
136            }
137        }
138        return ((Long) MongoDBSerializationHelper.bsonToFieldMap(sequence).get(SEQUENCE_VALUE_FIELD)).longValue();
139    }
140
141    @Override
142    public void dispose() {
143        if (coll != null) {
144            coll = null;
145        }
146    }
147
148}