001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.bulk; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.Arrays; 024 025import org.apache.commons.lang3.mutable.MutableBoolean; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.action.DocWriteRequest; 029import org.elasticsearch.action.bulk.BackoffPolicy; 030import org.elasticsearch.action.bulk.BulkItemResponse; 031import org.elasticsearch.action.bulk.BulkProcessor; 032import org.elasticsearch.action.bulk.BulkRequest; 033import org.elasticsearch.action.bulk.BulkResponse; 034import org.elasticsearch.common.io.stream.ByteBufferStreamInput; 035import org.elasticsearch.common.io.stream.StreamInput; 036import org.elasticsearch.common.unit.ByteSizeUnit; 037import org.elasticsearch.common.unit.ByteSizeValue; 038import org.elasticsearch.common.unit.TimeValue; 039import org.elasticsearch.rest.RestStatus; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.bulk.BulkCodecs; 042import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 043import org.nuxeo.ecm.core.bulk.message.BulkStatus; 044import org.nuxeo.ecm.core.bulk.message.DataBucket; 045import org.nuxeo.elasticsearch.api.ESClient; 046import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 047import org.nuxeo.lib.stream.codec.Codec; 048import org.nuxeo.lib.stream.computation.AbstractComputation; 049import org.nuxeo.lib.stream.computation.ComputationContext; 050import org.nuxeo.lib.stream.computation.Record; 051import org.nuxeo.runtime.api.Framework; 052 053/** 054 * A computation that submits elasticsearch requests using the bulk API. 055 * <p> 056 * Note that the retry policy is handled by the elasticsearch bulk processor. 057 * 058 * @since 10.3 059 */ 060public class BulkIndexComputation extends AbstractComputation implements BulkProcessor.Listener { 061 private static final Log log = LogFactory.getLog(BulkIndexComputation.class); 062 063 public static final String NAME = "bulkIndex"; 064 065 protected final int esBulkSize; 066 067 protected final int esBulkActions; 068 069 protected final int flushIntervalMs; 070 071 protected BulkProcessor bulkProcessor; 072 073 protected Codec<DataBucket> codec; 074 075 protected boolean updates; 076 077 protected boolean continueOnFailure; 078 079 protected volatile boolean abort; 080 081 public BulkIndexComputation(int esBulkSize, int esBulkActions, int flushInterval) { 082 super(NAME, 1, 1); 083 this.esBulkSize = esBulkSize; 084 this.esBulkActions = esBulkActions; 085 this.flushIntervalMs = flushInterval * 1000; 086 } 087 088 @Override 089 public void init(ComputationContext context) { 090 super.init(context); 091 // note that we don't use setFlushInterval because this is done by our timer 092 continueOnFailure = context.getPolicy().continueOnFailure(); 093 long backoffDelayMs = context.getPolicy().getRetryPolicy().getDelay().toMillis(); 094 int retries = context.getPolicy().getRetryPolicy().getMaxRetries(); 095 096 bulkProcessor = getESClient().bulkProcessorBuilder(this) 097 .setConcurrentRequests(0) 098 .setBulkSize(new ByteSizeValue(esBulkSize, ByteSizeUnit.BYTES)) 099 .setBulkActions(esBulkActions) 100 .setBackoffPolicy(BackoffPolicy.exponentialBackoff( 101 TimeValue.timeValueMillis(backoffDelayMs), retries)) 102 .build(); 103 codec = BulkCodecs.getDataBucketCodec(); 104 context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs); 105 } 106 107 @Override 108 public void processTimer(ComputationContext context, String key, long timestamp) { 109 if (abort) { 110 context.askForTermination(); 111 log.error("Terminate computation due to previous error"); 112 return; 113 } 114 if (updates) { 115 // flush is sync because bulkProcessor is initialized with setConcurrentRequests(0) 116 bulkProcessor.flush(); 117 context.askForCheckpoint(); 118 updates = false; 119 } 120 context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs); 121 } 122 123 @Override 124 public void processRecord(ComputationContext context, String inputStream, Record record) { 125 DataBucket in = codec.decode(record.getData()); 126 if (in.getCount() > 0) { 127 BulkRequest bulkRequest = decodeRequest(in); 128 for (DocWriteRequest request : bulkRequest.requests()) { 129 bulkProcessor.add(request); 130 } 131 BulkStatus delta = BulkStatus.deltaOf(in.getCommandId()); 132 delta.setProcessed(in.getCount()); 133 AbstractBulkComputation.updateStatus(context, delta); 134 } 135 updates = true; 136 } 137 138 @Override 139 public void destroy() { 140 if (bulkProcessor != null) { 141 bulkProcessor.close(); 142 bulkProcessor = null; 143 } 144 } 145 146 protected ESClient getESClient() { 147 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 148 return esa.getClient(); 149 } 150 151 protected BulkRequest decodeRequest(DataBucket bucket) { 152 BulkRequest ret = new BulkRequest(); 153 ByteBuffer buffer = ByteBuffer.wrap(bucket.getData()); 154 StreamInput in = new ByteBufferStreamInput(buffer); 155 try { 156 ret.readFrom(in); 157 return ret; 158 } catch (IOException e) { 159 throw new NuxeoException("Cannot load elastic bulk request from: " + bucket); 160 } 161 } 162 163 // ------------------------------------------------------------------------------------- 164 // Elasticsearch bulk processor listener 165 // the following methods are called from a different thread than the computation 166 @Override 167 public void beforeBulk(long executionId, BulkRequest request) { 168 if (log.isDebugEnabled()) { 169 log.debug(String.format("Creating elasticsearch bulk %s with %d action", executionId, 170 request.numberOfActions())); 171 } 172 } 173 174 @Override 175 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 176 if (log.isDebugEnabled()) { 177 log.debug(String.format("After bulk: %s, actions: %d, status: %s", executionId, request.numberOfActions(), 178 response.status())); 179 } 180 if (!response.hasFailures()) { 181 return; 182 } 183 MutableBoolean inError = new MutableBoolean(false); 184 Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).forEach(item -> { 185 if (item.getFailure().getStatus() != RestStatus.CONFLICT) { 186 log.warn("Failure in bulk indexing: " + item.getFailureMessage()); 187 inError.setTrue(); 188 } else if (log.isDebugEnabled()) { 189 log.debug("Skipping version conflict: " + item.getFailureMessage()); 190 } 191 }); 192 if (inError.isTrue()) { 193 log.error(String.format("Elasticsearch bulk %s returns with failures: %s", executionId, 194 response.buildFailureMessage())); 195 if (!continueOnFailure) { 196 abort = true; 197 } 198 } 199 } 200 201 @Override 202 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 203 log.error(String.format("Elasticsearch bulk %s fails, contains %d actions", executionId, 204 request.numberOfActions()), failure); 205 if (!continueOnFailure) { 206 abort = true; 207 } 208 } 209}