001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.elasticsearch.bulk; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.Arrays; 024 025import org.apache.commons.lang3.mutable.MutableBoolean; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.elasticsearch.ElasticsearchException; 029import org.elasticsearch.action.DocWriteRequest; 030import org.elasticsearch.action.bulk.BackoffPolicy; 031import org.elasticsearch.action.bulk.BulkItemResponse; 032import org.elasticsearch.action.bulk.BulkProcessor; 033import org.elasticsearch.action.bulk.BulkRequest; 034import org.elasticsearch.action.bulk.BulkResponse; 035import org.elasticsearch.common.io.stream.ByteBufferStreamInput; 036import org.elasticsearch.common.io.stream.StreamInput; 037import org.elasticsearch.common.unit.ByteSizeUnit; 038import org.elasticsearch.common.unit.ByteSizeValue; 039import org.elasticsearch.common.unit.TimeValue; 040import org.elasticsearch.rest.RestStatus; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.bulk.BulkCodecs; 043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 044import org.nuxeo.ecm.core.bulk.message.BulkStatus; 045import org.nuxeo.ecm.core.bulk.message.DataBucket; 046import org.nuxeo.elasticsearch.api.ESClient; 047import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 048import org.nuxeo.lib.stream.codec.Codec; 049import org.nuxeo.lib.stream.computation.AbstractComputation; 050import org.nuxeo.lib.stream.computation.ComputationContext; 051import org.nuxeo.lib.stream.computation.Record; 052import org.nuxeo.runtime.api.Framework; 053 054/** 055 * A computation that submits elasticsearch requests using the bulk API. 056 * <p> 057 * Note that the retry policy is handled by the elasticsearch bulk processor. 058 * 059 * @since 10.3 060 */ 061public class BulkIndexComputation extends AbstractComputation implements BulkProcessor.Listener { 062 private static final Log log = LogFactory.getLog(BulkIndexComputation.class); 063 064 public static final String NAME = "bulk/bulkIndex"; 065 066 protected final int esBulkSize; 067 068 protected final int esBulkActions; 069 070 protected final int flushIntervalMs; 071 072 protected BulkProcessor bulkProcessor; 073 074 protected Codec<DataBucket> codec; 075 076 protected boolean updates; 077 078 protected boolean continueOnFailure; 079 080 protected volatile boolean abort; 081 082 public BulkIndexComputation(int esBulkSize, int esBulkActions, int flushInterval) { 083 super(NAME, 1, 1); 084 this.esBulkSize = esBulkSize; 085 this.esBulkActions = esBulkActions; 086 this.flushIntervalMs = flushInterval * 1000; 087 } 088 089 @Override 090 public void init(ComputationContext context) { 091 super.init(context); 092 // note that we don't use setFlushInterval because this is done by our timer 093 continueOnFailure = context.getPolicy().continueOnFailure(); 094 long backoffDelayMs = context.getPolicy().getRetryPolicy().getDelay().toMillis(); 095 int retries = context.getPolicy().getRetryPolicy().getMaxRetries(); 096 097 bulkProcessor = getESClient().bulkProcessorBuilder(this) 098 .setConcurrentRequests(0) 099 .setBulkSize(new ByteSizeValue(esBulkSize, ByteSizeUnit.BYTES)) 100 .setBulkActions(esBulkActions) 101 .setBackoffPolicy(BackoffPolicy.exponentialBackoff( 102 TimeValue.timeValueMillis(backoffDelayMs), retries)) 103 .build(); 104 codec = BulkCodecs.getDataBucketCodec(); 105 context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs); 106 } 107 108 @Override 109 public void processTimer(ComputationContext context, String key, long timestamp) { 110 if (abort) { 111 throw new NuxeoException("Terminate computation due to previous error"); 112 } 113 if (updates) { 114 // flush is sync because bulkProcessor is initialized with setConcurrentRequests(0) 115 bulkProcessor.flush(); 116 context.askForCheckpoint(); 117 updates = false; 118 } 119 context.setTimer("flush", System.currentTimeMillis() + flushIntervalMs); 120 } 121 122 @Override 123 public void processRecord(ComputationContext context, String inputStream, Record record) { 124 if (abort) { 125 return; 126 } 127 DataBucket in = codec.decode(record.getData()); 128 if (in.getCount() > 0) { 129 BulkRequest bulkRequest = decodeRequest(in); 130 for (DocWriteRequest<?> request : bulkRequest.requests()) { 131 bulkProcessor.add(request); 132 } 133 BulkStatus delta = BulkStatus.deltaOf(in.getCommandId()); 134 delta.setProcessed(in.getCount()); 135 AbstractBulkComputation.updateStatus(context, delta); 136 } 137 updates = true; 138 } 139 140 @Override 141 public void destroy() { 142 if (bulkProcessor != null) { 143 bulkProcessor.close(); 144 bulkProcessor = null; 145 } 146 } 147 148 protected ESClient getESClient() { 149 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 150 return esa.getClient(); 151 } 152 153 protected BulkRequest decodeRequest(DataBucket bucket) { 154 ByteBuffer buffer = ByteBuffer.wrap(bucket.getData()); 155 try (StreamInput in = new ByteBufferStreamInput(buffer)) { 156 return new BulkRequest(in); 157 } catch (IOException e) { 158 throw new NuxeoException("Cannot load elastic bulk request from: " + bucket); 159 } 160 } 161 162 // ------------------------------------------------------------------------------------- 163 // Elasticsearch bulk processor listener 164 // the following methods are called from a different thread than the computation 165 @Override 166 public void beforeBulk(long executionId, BulkRequest request) { 167 if (log.isDebugEnabled()) { 168 log.debug(String.format("Creating elasticsearch bulk %s with %d action", executionId, 169 request.numberOfActions())); 170 } 171 } 172 173 @Override 174 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 175 if (log.isDebugEnabled()) { 176 log.debug(String.format("After bulk: %s, actions: %d, status: %s", executionId, request.numberOfActions(), 177 response.status())); 178 } 179 if (!response.hasFailures()) { 180 return; 181 } 182 MutableBoolean inError = new MutableBoolean(false); 183 Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).forEach(item -> { 184 if (item.getFailure().getStatus() != RestStatus.CONFLICT) { 185 log.warn("Failure in bulk indexing: " + item.getFailureMessage()); 186 inError.setTrue(); 187 } else if (log.isDebugEnabled()) { 188 log.debug("Skipping version conflict: " + item.getFailureMessage()); 189 } 190 }); 191 if (inError.isTrue()) { 192 log.error(String.format("Elasticsearch bulk %s returns with failures: %s", executionId, 193 response.buildFailureMessage())); 194 if (!continueOnFailure) { 195 abort = true; 196 } 197 } 198 } 199 200 @Override 201 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 202 log.error(String.format("Elasticsearch bulk %s fails, contains %d actions", executionId, 203 request.numberOfActions()), failure); 204 if (!continueOnFailure) { 205 abort = true; 206 } 207 } 208}