001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 */
019package org.nuxeo.elasticsearch.seqgen;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.NoSuchElementException;
025
026import com.google.common.collect.Lists;
027import org.elasticsearch.action.index.IndexRequest;
028import org.elasticsearch.action.index.IndexResponse;
029import org.elasticsearch.common.xcontent.XContentType;
030import org.elasticsearch.index.VersionType;
031import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
032import org.nuxeo.ecm.core.api.NuxeoException;
033import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer;
034import org.nuxeo.ecm.core.uidgen.UIDSequencer;
035import org.nuxeo.elasticsearch.ElasticSearchConstants;
036import org.nuxeo.elasticsearch.api.ESClient;
037import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
038import org.nuxeo.runtime.api.Framework;
039
040/**
041 * Elasticsearch implementation of {@link UIDSequencer}.
042 * <p>
043 * Since elasticsearch does not seem to support a notion of native sequence, the implementation uses the auto-increment
044 * of the version attribute as described in the <a href=
045 * "http://blogs.perl.org/users/clinton_gormley/2011/10/elasticsearchsequence---a-blazing-fast-ticket-server.html"
046 * >ElasticSearch::Sequence - a blazing fast ticket server</a> blog post.
047 *
048 * @since 7.3
049 */
050public class ESUIDSequencer extends AbstractUIDSequencer {
051
052    protected static final int MAX_RETRY = 3;
053
054    protected ESClient esClient = null;
055
056    protected String indexName;
057
058    @Override
059    public void init() {
060        if (esClient != null) {
061            return;
062        }
063        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
064        esClient = esa.getClient();
065        indexName = esa.getIndexNameForType(ElasticSearchConstants.SEQ_ID_TYPE);
066        try {
067            boolean indexExists = esClient.indexExists(indexName);
068            if (!indexExists) {
069                throw new NuxeoException(
070                        String.format("Sequencer %s needs an elasticSearchIndex contribution with type %s", getName(),
071                                ElasticSearchConstants.SEQ_ID_TYPE));
072            }
073        } catch (NoSuchElementException | NuxeoException e) {
074            dispose();
075            throw e;
076        }
077    }
078
079    @Override
080    public void dispose() {
081        if (esClient == null) {
082            return;
083        }
084        esClient = null;
085        indexName = null;
086    }
087
088    @Override
089    public void initSequence(String key, long id) {
090        String source = "{ \"ts\" : " + System.currentTimeMillis() + "}";
091        IndexResponse res = esClient.index(
092                new IndexRequest(indexName, ElasticSearchConstants.SEQ_ID_TYPE, key).versionType(VersionType.EXTERNAL)
093                        .version(id)
094                        .source(source, XContentType.JSON));
095    }
096
097    @Override
098    public long getNextLong(String sequenceName) {
099        String source = "{ \"ts\" : " + System.currentTimeMillis() + "}";
100        IndexResponse res = esClient.index(
101                new IndexRequest(indexName, ElasticSearchConstants.SEQ_ID_TYPE, sequenceName).source(source,
102                        XContentType.JSON));
103        return res.getVersion();
104    }
105
106    @Override
107    public List<Long> getNextBlock(String key, int blockSize) {
108        if (blockSize == 1) {
109            return Collections.singletonList(getNextLong(key));
110        }
111        List<Long> ret = new ArrayList<>(blockSize);
112        long first = getNextBlockWithRetry(key, blockSize);
113        for (long i = 0; i < blockSize; i++) {
114            ret.add(first + i);
115        }
116        return ret;
117    }
118
119    protected long getNextBlockWithRetry(String key, int blockSize) {
120        long ret;
121        for (int i = 0; i < MAX_RETRY; i++) {
122            ret = getNextLong(key);
123            try {
124                initSequence(key, ret + blockSize - 1);
125                return ret;
126            } catch (ConcurrentUpdateException e) {
127                if (i == MAX_RETRY - 1) {
128                    throw e;
129                }
130            }
131        }
132        throw new NuxeoException("Unable to get a block of sequence");
133    }
134}