001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat <tdelprat@nuxeo.com>
018 */
019package org.nuxeo.elasticsearch.seqgen;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.NoSuchElementException;
025
026import org.elasticsearch.action.index.IndexRequest;
027import org.elasticsearch.action.index.IndexResponse;
028import org.elasticsearch.common.xcontent.XContentType;
029import org.elasticsearch.index.VersionType;
030import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
031import org.nuxeo.ecm.core.api.NuxeoException;
032import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer;
033import org.nuxeo.ecm.core.uidgen.UIDSequencer;
034import org.nuxeo.elasticsearch.ElasticSearchConstants;
035import org.nuxeo.elasticsearch.api.ESClient;
036import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
037import org.nuxeo.runtime.api.Framework;
038
039/**
040 * Elasticsearch implementation of {@link UIDSequencer}.
041 * <p>
042 * Since elasticsearch does not seem to support a notion of native sequence, the implementation uses the auto-increment
043 * of the version attribute as described in the <a href=
044 * "http://blogs.perl.org/users/clinton_gormley/2011/10/elasticsearchsequence---a-blazing-fast-ticket-server.html"
045 * >ElasticSearch::Sequence - a blazing fast ticket server</a> blog post.
046 *
047 * @since 7.3
048 */
049public class ESUIDSequencer extends AbstractUIDSequencer {
050
051    protected static final int MAX_RETRY = 3;
052
053    protected ESClient esClient = null;
054
055    protected String indexName;
056
057    @Override
058    public void init() {
059        if (esClient != null) {
060            return;
061        }
062        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
063        esClient = esa.getClient();
064        indexName = esa.getIndexNameForType(ElasticSearchConstants.SEQ_ID_TYPE);
065        try {
066            boolean indexExists = esClient.indexExists(indexName);
067            if (!indexExists) {
068                throw new NuxeoException(
069                        String.format("Sequencer %s needs an elasticSearchIndex contribution with type %s", getName(),
070                                ElasticSearchConstants.SEQ_ID_TYPE));
071            }
072        } catch (NoSuchElementException | NuxeoException e) {
073            dispose();
074            throw e;
075        }
076    }
077
078    @Override
079    public void dispose() {
080        if (esClient == null) {
081            return;
082        }
083        esClient = null;
084        indexName = null;
085    }
086
087    @Override
088    public void initSequence(String key, long id) {
089        String source = "{ \"ts\" : " + System.currentTimeMillis() + "}";
090        esClient.index(
091                new IndexRequest(indexName).id(key)
092                                           .versionType(VersionType.EXTERNAL)
093                                           .version(id)
094                                           .source(source, XContentType.JSON));
095    }
096
097    @Override
098    public long getNextLong(String sequenceName) {
099        String source = "{ \"ts\" : " + System.currentTimeMillis() + "}";
100        IndexResponse res = esClient.index(
101                new IndexRequest(indexName).id(sequenceName).source(source,
102                        XContentType.JSON));
103        return res.getVersion();
104    }
105
106    @Override
107    public List<Long> getNextBlock(String key, int blockSize) {
108        if (blockSize == 1) {
109            return Collections.singletonList(getNextLong(key));
110        }
111        List<Long> ret = new ArrayList<>(blockSize);
112        long first = getNextBlockWithRetry(key, blockSize);
113        for (long i = 0; i < blockSize; i++) {
114            ret.add(first + i);
115        }
116        return ret;
117    }
118
119    protected long getNextBlockWithRetry(String key, int blockSize) {
120        long ret;
121        for (int i = 0; i < MAX_RETRY; i++) {
122            ret = getNextLong(key);
123            try {
124                initSequence(key, ret + blockSize - 1);
125                return ret;
126            } catch (ConcurrentUpdateException e) {
127                if (i == MAX_RETRY - 1) {
128                    throw e;
129                }
130            }
131        }
132        throw new NuxeoException("Unable to get a block of sequence");
133    }
134}