001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Kevin Leturc
018 */
019package org.nuxeo.ecm.core.mongodb.seqgen;
020
021import static com.mongodb.client.model.Filters.and;
022import static com.mongodb.client.model.Filters.eq;
023import static com.mongodb.client.model.Filters.gte;
024import static com.mongodb.client.model.Filters.not;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.bson.Document;
029import org.bson.conversions.Bson;
030import org.nuxeo.ecm.core.api.NuxeoException;
031import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer;
032import org.nuxeo.ecm.core.uidgen.UIDSequencer;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.mongodb.MongoDBConnectionService;
035import org.nuxeo.runtime.mongodb.MongoDBSerializationHelper;
036import org.nuxeo.runtime.services.config.ConfigurationService;
037
038import com.mongodb.ErrorCategory;
039import com.mongodb.MongoWriteException;
040import com.mongodb.client.MongoCollection;
041import com.mongodb.client.MongoDatabase;
042import com.mongodb.client.model.FindOneAndUpdateOptions;
043import com.mongodb.client.model.ReplaceOptions;
044import com.mongodb.client.model.ReturnDocument;
045import com.mongodb.client.model.Updates;
046
047import java.util.ArrayList;
048import java.util.List;
049
050/**
051 * MongoDB implementation of {@link UIDSequencer}.
052 * <p>
053 * We use MongoDB upsert feature to provide a sequencer.
054 *
055 * @since 9.1
056 */
057public class MongoDBUIDSequencer extends AbstractUIDSequencer {
058
059    private static final Log log = LogFactory.getLog(MongoDBUIDSequencer.class);
060
061    public static final String SEQUENCE_DATABASE_ID = "sequence";
062
063    public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.seqgen.collection.name";
064
065    public static final String DEFAULT_COLLECTION_NAME = "sequence";
066
067    public static final String SEQUENCE_VALUE_FIELD = "sequence";
068
069    protected MongoCollection<Document> coll;
070
071    @Override
072    public void init() {
073        getSequencerCollection();
074    }
075
076    @Override
077    public void initSequence(String key, long id) {
078        Bson filter = and(eq(MongoDBSerializationHelper.MONGODB_ID, key), not(gte(SEQUENCE_VALUE_FIELD, id)));
079        Document sequence = new Document();
080        sequence.put(MongoDBSerializationHelper.MONGODB_ID, key);
081        sequence.put(SEQUENCE_VALUE_FIELD, id);
082        try {
083            try {
084                getSequencerCollection().replaceOne(filter, sequence, new ReplaceOptions().upsert(true));
085            } catch (MongoWriteException e) {
086                if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) {
087                    throw e;
088                }
089                // retry once, as not all server versions do server-side retries on upsert
090                getSequencerCollection().replaceOne(filter, sequence, new ReplaceOptions().upsert(true));
091            }
092        } catch (MongoWriteException e) {
093            throw new NuxeoException("Failed to update the sequence '" + key + "' with value " + id, e);
094        }
095    }
096
097    public MongoCollection<Document> getSequencerCollection() {
098        if (coll == null) {
099            // Get collection name
100            ConfigurationService configurationService = Framework.getService(ConfigurationService.class);
101            String collName = configurationService.getString(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME);
102            // Get a connection to MongoDB
103            MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class);
104            // Get database
105            MongoDatabase database = mongoService.getDatabase(SEQUENCE_DATABASE_ID);
106            // Get collection
107            coll = database.getCollection(collName);
108        }
109        return coll;
110    }
111
112    @Override
113    public long getNextLong(String key) {
114        return incrementBy(key, 1);
115    }
116
117    @Override
118    public List<Long> getNextBlock(String key, int blockSize) {
119        List<Long> ret = new ArrayList<>(blockSize);
120        long last = incrementBy(key, blockSize);
121        for (int i = blockSize - 1; i >= 0; i--) {
122            ret.add(last - i);
123        }
124        return ret;
125    }
126
127    protected long incrementBy(String key, int value) {
128        FindOneAndUpdateOptions options = new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
129        Bson filter = eq(MongoDBSerializationHelper.MONGODB_ID, key);
130        Bson update = Updates.inc(SEQUENCE_VALUE_FIELD, Long.valueOf(value));
131        Document sequence = getSequencerCollection().findOneAndUpdate(filter, update, options);
132        // If sequence is null, we need to create it
133        if (sequence == null) {
134            try {
135                sequence = new Document();
136                sequence.put(MongoDBSerializationHelper.MONGODB_ID, key);
137                sequence.put(SEQUENCE_VALUE_FIELD, Long.valueOf(value));
138                getSequencerCollection().insertOne(sequence);
139            } catch (MongoWriteException e) {
140                // There was a race condition - just re-run getNextLong
141                if (log.isTraceEnabled()) {
142                    log.trace("There was a race condition during '" + key + "' sequence insertion", e);
143                }
144                return getNextLong(key);
145            }
146        }
147        return ((Long) MongoDBSerializationHelper.bsonToFieldMap(sequence).get(SEQUENCE_VALUE_FIELD)).longValue();
148    }
149
150    @Override
151    public void dispose() {
152        if (coll != null) {
153            coll = null;
154        }
155    }
156
157}