001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.elasticsearch.bulk;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.Arrays;
024
025import org.apache.commons.lang3.mutable.MutableBoolean;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.elasticsearch.ElasticsearchException;
029import org.elasticsearch.action.DocWriteRequest;
030import org.elasticsearch.action.bulk.BackoffPolicy;
031import org.elasticsearch.action.bulk.BulkItemResponse;
032import org.elasticsearch.action.bulk.BulkProcessor;
033import org.elasticsearch.action.bulk.BulkRequest;
034import org.elasticsearch.action.bulk.BulkResponse;
035import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
036import org.elasticsearch.common.io.stream.StreamInput;
037import org.elasticsearch.common.unit.ByteSizeUnit;
038import org.elasticsearch.common.unit.ByteSizeValue;
039import org.elasticsearch.common.unit.TimeValue;
040import org.elasticsearch.rest.RestStatus;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.bulk.BulkCodecs;
043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
044import org.nuxeo.ecm.core.bulk.message.BulkStatus;
045import org.nuxeo.ecm.core.bulk.message.DataBucket;
046import org.nuxeo.elasticsearch.api.ESClient;
047import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
048import org.nuxeo.lib.stream.codec.Codec;
049import org.nuxeo.lib.stream.computation.AbstractComputation;
050import org.nuxeo.lib.stream.computation.ComputationContext;
051import org.nuxeo.lib.stream.computation.Record;
052import org.nuxeo.runtime.api.Framework;
053
054/**
055 * A computation that submits elasticsearch requests using the bulk API.
056 * <p>
057 * Note that the retry policy is handled by the elasticsearch bulk processor.
058 *
059 * @since 10.3
060 */
061public class BulkIndexComputation extends AbstractComputation implements BulkProcessor.Listener {
062    private static final Log log = LogFactory.getLog(BulkIndexComputation.class);
063
064    public static final String NAME = "bulk/bulkIndex";
065
066    protected final int esBulkSize;
067
068    protected final int esBulkActions;
069
070    protected final int flushIntervalMs;
071
072    protected BulkProcessor bulkProcessor;
073
074    protected Codec<DataBucket> codec;
075
076    protected boolean updates;
077
078    protected boolean continueOnFailure;
079
080    protected volatile boolean abort;
081
082    public BulkIndexComputation(int esBulkSize, int esBulkActions, int flushInterval) {
083        super(NAME, 1, 1);
084        this.esBulkSize = esBulkSize;
085        this.esBulkActions = esBulkActions;
086        this.flushIntervalMs = flushInterval * 1000;
087    }
088
089    @Override
090    public void init(ComputationContext context) {
091        super.init(context);
092        // note that we don't use setFlushInterval because this is done by our timer
093        continueOnFailure = context.getPolicy().continueOnFailure();
094        long backoffDelayMs = context.getPolicy().getRetryPolicy().getDelay().toMillis();
095        int retries = context.getPolicy().getRetryPolicy().getMaxRetries();
096
097        bulkProcessor = getESClient().bulkProcessorBuilder(this)
098                                     .setConcurrentRequests(0)
099                                     .setBulkSize(new ByteSizeValue(esBulkSize, ByteSizeUnit.BYTES))
100                                     .setBulkActions(esBulkActions)
101                                     .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
102                                             TimeValue.timeValueMillis(backoffDelayMs), retries))
103                                     .build();
104        codec = BulkCodecs.getDataBucketCodec();
105        context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs);
106    }
107
108    @Override
109    public void processTimer(ComputationContext context, String key, long timestamp) {
110        if (abort) {
111            throw new NuxeoException("Terminate computation due to previous error");
112        }
113        if (updates) {
114            // flush is sync because bulkProcessor is initialized with setConcurrentRequests(0)
115            bulkProcessor.flush();
116            context.askForCheckpoint();
117            updates = false;
118        }
119        context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs);
120    }
121
122    @Override
123    public void processRecord(ComputationContext context, String inputStream, Record record) {
124        if (abort) {
125            return;
126        }
127        DataBucket in = codec.decode(record.getData());
128        if (in.getCount() > 0) {
129            BulkRequest bulkRequest = decodeRequest(in);
130            for (DocWriteRequest<?> request : bulkRequest.requests()) {
131                bulkProcessor.add(request);
132            }
133            BulkStatus delta = BulkStatus.deltaOf(in.getCommandId());
134            delta.setProcessed(in.getCount());
135            AbstractBulkComputation.updateStatus(context, delta);
136        }
137        updates = true;
138    }
139
140    @Override
141    public void destroy() {
142        if (bulkProcessor != null) {
143            bulkProcessor.close();
144            bulkProcessor = null;
145        }
146    }
147
148    protected ESClient getESClient() {
149        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
150        return esa.getClient();
151    }
152
153    protected BulkRequest decodeRequest(DataBucket bucket) {
154        ByteBuffer buffer = ByteBuffer.wrap(bucket.getData());
155        try (StreamInput in = new ByteBufferStreamInput(buffer)) {
156            return new BulkRequest(in);
157        } catch (IOException e) {
158            throw new NuxeoException("Cannot load elastic bulk request from: " + bucket);
159        }
160    }
161
162    // -------------------------------------------------------------------------------------
163    // Elasticsearch bulk processor listener
164    // the following methods are called from a different thread than the computation
165    @Override
166    public void beforeBulk(long executionId, BulkRequest request) {
167        if (log.isDebugEnabled()) {
168            log.debug(String.format("Creating elasticsearch bulk %s with %d action", executionId,
169                    request.numberOfActions()));
170        }
171    }
172
173    @Override
174    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
175        if (log.isDebugEnabled()) {
176            log.debug(String.format("After bulk: %s, actions: %d, status: %s", executionId, request.numberOfActions(),
177                    response.status()));
178        }
179        if (!response.hasFailures()) {
180            return;
181        }
182        MutableBoolean inError = new MutableBoolean(false);
183        Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).forEach(item -> {
184            if (item.getFailure().getStatus() != RestStatus.CONFLICT) {
185                log.warn("Failure in bulk indexing: " + item.getFailureMessage());
186                inError.setTrue();
187            } else if (log.isDebugEnabled()) {
188                log.debug("Skipping version conflict: " + item.getFailureMessage());
189            }
190        });
191        if (inError.isTrue()) {
192            log.error(String.format("Elasticsearch bulk %s returns with failures: %s", executionId,
193                    response.buildFailureMessage()));
194            if (!continueOnFailure) {
195                abort = true;
196            }
197        }
198    }
199
200    @Override
201    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
202        log.error(String.format("Elasticsearch bulk %s fails, contains %d actions", executionId,
203                request.numberOfActions()), failure);
204        if (!continueOnFailure) {
205            abort = true;
206        }
207    }
208}