001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.bulk;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.Arrays;
024
025import org.apache.commons.lang3.mutable.MutableBoolean;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.action.DocWriteRequest;
029import org.elasticsearch.action.bulk.BackoffPolicy;
030import org.elasticsearch.action.bulk.BulkItemResponse;
031import org.elasticsearch.action.bulk.BulkProcessor;
032import org.elasticsearch.action.bulk.BulkRequest;
033import org.elasticsearch.action.bulk.BulkResponse;
034import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
035import org.elasticsearch.common.io.stream.StreamInput;
036import org.elasticsearch.common.unit.ByteSizeUnit;
037import org.elasticsearch.common.unit.ByteSizeValue;
038import org.elasticsearch.common.unit.TimeValue;
039import org.elasticsearch.rest.RestStatus;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.bulk.BulkCodecs;
042import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
043import org.nuxeo.ecm.core.bulk.message.BulkStatus;
044import org.nuxeo.ecm.core.bulk.message.DataBucket;
045import org.nuxeo.elasticsearch.api.ESClient;
046import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
047import org.nuxeo.lib.stream.codec.Codec;
048import org.nuxeo.lib.stream.computation.AbstractComputation;
049import org.nuxeo.lib.stream.computation.ComputationContext;
050import org.nuxeo.lib.stream.computation.Record;
051import org.nuxeo.runtime.api.Framework;
052
053/**
054 * A computation that submits elasticsearch requests using the bulk API.
055 * <p>
056 * Note that the retry policy is handled by the elasticsearch bulk processor.
057 *
058 * @since 10.3
059 */
060public class BulkIndexComputation extends AbstractComputation implements BulkProcessor.Listener {
061    private static final Log log = LogFactory.getLog(BulkIndexComputation.class);
062
063    public static final String NAME = "bulkIndex";
064
065    protected final int esBulkSize;
066
067    protected final int esBulkActions;
068
069    protected final int flushIntervalMs;
070
071    protected BulkProcessor bulkProcessor;
072
073    protected Codec<DataBucket> codec;
074
075    protected boolean updates;
076
077    protected boolean continueOnFailure;
078
079    protected volatile boolean abort;
080
081    public BulkIndexComputation(int esBulkSize, int esBulkActions, int flushInterval) {
082        super(NAME, 1, 1);
083        this.esBulkSize = esBulkSize;
084        this.esBulkActions = esBulkActions;
085        this.flushIntervalMs = flushInterval * 1000;
086    }
087
088    @Override
089    public void init(ComputationContext context) {
090        super.init(context);
091        // note that we don't use setFlushInterval because this is done by our timer
092        continueOnFailure = context.getPolicy().continueOnFailure();
093        long backoffDelayMs = context.getPolicy().getRetryPolicy().getDelay().toMillis();
094        int retries = context.getPolicy().getRetryPolicy().getMaxRetries();
095
096        bulkProcessor = getESClient().bulkProcessorBuilder(this)
097                                     .setConcurrentRequests(0)
098                                     .setBulkSize(new ByteSizeValue(esBulkSize, ByteSizeUnit.BYTES))
099                                     .setBulkActions(esBulkActions)
100                                     .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
101                                             TimeValue.timeValueMillis(backoffDelayMs), retries))
102                                     .build();
103        codec = BulkCodecs.getDataBucketCodec();
104        context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs);
105    }
106
107    @Override
108    public void processTimer(ComputationContext context, String key, long timestamp) {
109        if (abort) {
110            context.askForTermination();
111            log.error("Terminate computation due to previous error");
112            return;
113        }
114        if (updates) {
115            // flush is sync because bulkProcessor is initialized with setConcurrentRequests(0)
116            bulkProcessor.flush();
117            context.askForCheckpoint();
118            updates = false;
119        }
120        context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs);
121    }
122
123    @Override
124    public void processRecord(ComputationContext context, String inputStream, Record record) {
125        DataBucket in = codec.decode(record.getData());
126        if (in.getCount() > 0) {
127            BulkRequest bulkRequest = decodeRequest(in);
128            for (DocWriteRequest request : bulkRequest.requests()) {
129                bulkProcessor.add(request);
130            }
131            BulkStatus delta = BulkStatus.deltaOf(in.getCommandId());
132            delta.setProcessed(in.getCount());
133            AbstractBulkComputation.updateStatus(context, delta);
134        }
135        updates = true;
136    }
137
138    @Override
139    public void destroy() {
140        if (bulkProcessor != null) {
141            bulkProcessor.close();
142            bulkProcessor = null;
143        }
144    }
145
146    protected ESClient getESClient() {
147        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
148        return esa.getClient();
149    }
150
151    protected BulkRequest decodeRequest(DataBucket bucket) {
152        BulkRequest ret = new BulkRequest();
153        ByteBuffer buffer = ByteBuffer.wrap(bucket.getData());
154        StreamInput in = new ByteBufferStreamInput(buffer);
155        try {
156            ret.readFrom(in);
157            return ret;
158        } catch (IOException e) {
159            throw new NuxeoException("Cannot load elastic bulk request from: " + bucket);
160        }
161    }
162
163    // -------------------------------------------------------------------------------------
164    // Elasticsearch bulk processor listener
165    // the following methods are called from a different thread than the computation
166    @Override
167    public void beforeBulk(long executionId, BulkRequest request) {
168        if (log.isDebugEnabled()) {
169            log.debug(String.format("Creating elasticsearch bulk %s with %d action", executionId,
170                    request.numberOfActions()));
171        }
172    }
173
174    @Override
175    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
176        if (log.isDebugEnabled()) {
177            log.debug(String.format("After bulk: %s, actions: %d, status: %s", executionId, request.numberOfActions(),
178                    response.status()));
179        }
180        if (!response.hasFailures()) {
181            return;
182        }
183        MutableBoolean inError = new MutableBoolean(false);
184        Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).forEach(item -> {
185            if (item.getFailure().getStatus() != RestStatus.CONFLICT) {
186                log.warn("Failure in bulk indexing: " + item.getFailureMessage());
187                inError.setTrue();
188            } else if (log.isDebugEnabled()) {
189                log.debug("Skipping version conflict: " + item.getFailureMessage());
190            }
191        });
192        if (inError.isTrue()) {
193            log.error(String.format("Elasticsearch bulk %s returns with failures: %s", executionId,
194                    response.buildFailureMessage()));
195            if (!continueOnFailure) {
196                abort = true;
197            }
198        }
199    }
200
201    @Override
202    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
203        log.error(String.format("Elasticsearch bulk %s fails, contains %d actions", executionId,
204                request.numberOfActions()), failure);
205        if (!continueOnFailure) {
206            abort = true;
207        }
208    }
209}