001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.util.ArrayList; 022import java.util.List; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026 027/** 028 * An abstract {@link Computation} that processes records by batch. 029 * <p> 030 * The batch capacity and threshold are defined in the computation policy. 031 * 032 * @since 10.3 033 */ 034public abstract class AbstractBatchComputation extends AbstractComputation { 035 036 private static final Log log = LogFactory.getLog(AbstractBatchComputation.class); 037 038 public static final String TIMER_BATCH = "batch"; 039 040 protected List<Record> batchRecords; 041 042 protected String currentInputStream; 043 044 protected boolean newBatch = true; 045 046 protected long thresholdMillis; 047 048 protected boolean removeLastRecordOnRetry; 049 050 /** 051 * Constructor 052 * 053 * @param name the name of the computation 054 * @param nbInputStreams the number of input streams 055 * @param nbOutputStreams the number of output streams 056 */ 057 public AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams) { 058 super(name, nbInputStreams, nbOutputStreams); 059 } 060 061 /** 062 * Called when: 063 * <ul> 064 * <li>the batch capacity is reached</li> 065 * <li>the time threshold is reached</li> 066 * <li>the inputStreamName has changed</li> 067 * </ul> 068 * If this method raises an exception the retry policy is applied. 069 * 070 * @param context used to send records to output streams, note that the checkpoint is managed automatically. 071 * @param inputStreamName the input streams where the records are coming from 072 * @param records the batch of records 073 */ 074 protected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records); 075 076 /** 077 * Called when the retry policy has failed. 078 */ 079 public abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records); 080 081 @Override 082 public void init(ComputationContext context) { 083 thresholdMillis = context.getPolicy().getBatchThreshold().toMillis(); 084 context.setTimer(TIMER_BATCH, System.currentTimeMillis() + thresholdMillis); 085 batchRecords = new ArrayList<>(context.getPolicy().batchCapacity); 086 } 087 088 @Override 089 public void processTimer(ComputationContext context, String key, long timestamp) { 090 if (!TIMER_BATCH.equals(key)) { 091 return; 092 } 093 if (!batchRecords.isEmpty()) { 094 batchProcess(context); 095 } 096 context.setTimer(TIMER_BATCH, System.currentTimeMillis() + thresholdMillis); 097 } 098 099 @Override 100 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 101 if (!inputStreamName.equals(currentInputStream) && !batchRecords.isEmpty()) { 102 batchProcess(context); 103 } 104 if (newBatch) { 105 currentInputStream = inputStreamName; 106 newBatch = false; 107 } 108 batchRecords.add(record); 109 if (batchRecords.size() >= context.getPolicy().getBatchCapacity()) { 110 removeLastRecordOnRetry = true; 111 batchProcess(context); 112 removeLastRecordOnRetry = false; 113 } 114 } 115 116 private void batchProcess(ComputationContext context) { 117 batchProcess(context, currentInputStream, batchRecords); 118 checkpointBatch(context); 119 } 120 121 protected void checkpointBatch(ComputationContext context) { 122 context.askForCheckpoint(); 123 batchRecords.clear(); 124 newBatch = true; 125 } 126 127 @Override 128 public void processRetry(ComputationContext context, Throwable failure) { 129 if (removeLastRecordOnRetry) { 130 // the batchProcess has failed, processRecord will be retried with the same record 131 // but first we have to remove the record from the batch 132 batchRecords.remove(batchRecords.size() -1); 133 removeLastRecordOnRetry = false; 134 } 135 log.warn(String.format("Computation: %s fails to process batch of %d records, last record: %s, retrying ...", 136 metadata.name(), batchRecords.size(), context.getLastOffset()), failure); 137 } 138 139 @Override 140 public void processFailure(ComputationContext context, Throwable failure) { 141 log.error(String.format( 142 "Computation: %s fails to process batch of %d records after retries, last record: %s, policy: %s", 143 metadata.name(), batchRecords.size(), context.getLastOffset(), context.getPolicy()), failure); 144 batchFailure(context, currentInputStream, batchRecords); 145 batchRecords.clear(); 146 newBatch = true; 147 } 148 149}