001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc 018 */ 019package org.nuxeo.ecm.core.mongodb.seqgen; 020 021import static com.mongodb.client.model.Filters.and; 022import static com.mongodb.client.model.Filters.eq; 023import static com.mongodb.client.model.Filters.gte; 024import static com.mongodb.client.model.Filters.not; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.bson.Document; 029import org.bson.conversions.Bson; 030import org.nuxeo.ecm.core.api.NuxeoException; 031import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer; 032import org.nuxeo.ecm.core.uidgen.UIDSequencer; 033import org.nuxeo.runtime.api.Framework; 034import org.nuxeo.runtime.mongodb.MongoDBConnectionService; 035import org.nuxeo.runtime.mongodb.MongoDBSerializationHelper; 036import org.nuxeo.runtime.services.config.ConfigurationService; 037 038import com.mongodb.ErrorCategory; 039import com.mongodb.MongoWriteException; 040import com.mongodb.client.MongoCollection; 041import com.mongodb.client.MongoDatabase; 042import com.mongodb.client.model.FindOneAndUpdateOptions; 043import com.mongodb.client.model.ReplaceOptions; 044import com.mongodb.client.model.ReturnDocument; 045import com.mongodb.client.model.Updates; 046 047import java.util.ArrayList; 048import java.util.List; 049 050/** 051 * MongoDB implementation of {@link UIDSequencer}. 052 * <p> 053 * We use MongoDB upsert feature to provide a sequencer. 054 * 055 * @since 9.1 056 */ 057public class MongoDBUIDSequencer extends AbstractUIDSequencer { 058 059 private static final Log log = LogFactory.getLog(MongoDBUIDSequencer.class); 060 061 public static final String SEQUENCE_DATABASE_ID = "sequence"; 062 063 public static final String COLLECTION_NAME_PROPERTY = "nuxeo.mongodb.seqgen.collection.name"; 064 065 public static final String DEFAULT_COLLECTION_NAME = "sequence"; 066 067 public static final String SEQUENCE_VALUE_FIELD = "sequence"; 068 069 protected MongoCollection<Document> coll; 070 071 @Override 072 public void init() { 073 getSequencerCollection(); 074 } 075 076 @Override 077 public void initSequence(String key, long id) { 078 Bson filter = and(eq(MongoDBSerializationHelper.MONGODB_ID, key), not(gte(SEQUENCE_VALUE_FIELD, id))); 079 Document sequence = new Document(); 080 sequence.put(MongoDBSerializationHelper.MONGODB_ID, key); 081 sequence.put(SEQUENCE_VALUE_FIELD, id); 082 try { 083 try { 084 getSequencerCollection().replaceOne(filter, sequence, new ReplaceOptions().upsert(true)); 085 } catch (MongoWriteException e) { 086 if (ErrorCategory.fromErrorCode(e.getCode()) != ErrorCategory.DUPLICATE_KEY) { 087 throw e; 088 } 089 // retry once, as not all server versions do server-side retries on upsert 090 getSequencerCollection().replaceOne(filter, sequence, new ReplaceOptions().upsert(true)); 091 } 092 } catch (MongoWriteException e) { 093 throw new NuxeoException("Failed to update the sequence '" + key + "' with value " + id, e); 094 } 095 } 096 097 public MongoCollection<Document> getSequencerCollection() { 098 if (coll == null) { 099 // Get collection name 100 ConfigurationService configurationService = Framework.getService(ConfigurationService.class); 101 String collName = configurationService.getString(COLLECTION_NAME_PROPERTY, DEFAULT_COLLECTION_NAME); 102 // Get a connection to MongoDB 103 MongoDBConnectionService mongoService = Framework.getService(MongoDBConnectionService.class); 104 // Get database 105 MongoDatabase database = mongoService.getDatabase(SEQUENCE_DATABASE_ID); 106 // Get collection 107 coll = database.getCollection(collName); 108 } 109 return coll; 110 } 111 112 @Override 113 public long getNextLong(String key) { 114 return incrementBy(key, 1); 115 } 116 117 @Override 118 public List<Long> getNextBlock(String key, int blockSize) { 119 List<Long> ret = new ArrayList<>(blockSize); 120 long last = incrementBy(key, blockSize); 121 for (int i = blockSize - 1; i >= 0; i--) { 122 ret.add(last - i); 123 } 124 return ret; 125 } 126 127 protected long incrementBy(String key, int value) { 128 FindOneAndUpdateOptions options = new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER); 129 Bson filter = eq(MongoDBSerializationHelper.MONGODB_ID, key); 130 Bson update = Updates.inc(SEQUENCE_VALUE_FIELD, Long.valueOf(value)); 131 Document sequence = getSequencerCollection().findOneAndUpdate(filter, update, options); 132 // If sequence is null, we need to create it 133 if (sequence == null) { 134 try { 135 sequence = new Document(); 136 sequence.put(MongoDBSerializationHelper.MONGODB_ID, key); 137 sequence.put(SEQUENCE_VALUE_FIELD, Long.valueOf(value)); 138 getSequencerCollection().insertOne(sequence); 139 } catch (MongoWriteException e) { 140 // There was a race condition - just re-run getNextLong 141 if (log.isTraceEnabled()) { 142 log.trace("There was a race condition during '" + key + "' sequence insertion", e); 143 } 144 return getNextLong(key); 145 } 146 } 147 return ((Long) MongoDBSerializationHelper.bsonToFieldMap(sequence).get(SEQUENCE_VALUE_FIELD)).longValue(); 148 } 149 150 @Override 151 public void dispose() { 152 if (coll != null) { 153 coll = null; 154 } 155 } 156 157}