001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.ArrayList;
022import java.util.List;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026
027/**
028 * An abstract {@link Computation} that processes records by batch.
029 * <p>
030 * The batch capacity and threshold are defined in the computation policy.
031 *
032 * @since 10.3
033 */
034public abstract class AbstractBatchComputation extends AbstractComputation {
035
036    private static final Log log = LogFactory.getLog(AbstractBatchComputation.class);
037
038    public static final String TIMER_BATCH = "batch";
039
040    protected List<Record> batchRecords;
041
042    protected String currentInputStream;
043
044    protected boolean newBatch = true;
045
046    protected long thresholdMillis;
047
048    protected boolean removeLastRecordOnRetry;
049
050    /**
051     * Constructor
052     *
053     * @param name the name of the computation
054     * @param nbInputStreams the number of input streams
055     * @param nbOutputStreams the number of output streams
056     */
057    public AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams) {
058        super(name, nbInputStreams, nbOutputStreams);
059    }
060
061    /**
062     * Called when:
063     * <ul>
064     * <li>the batch capacity is reached</li>
065     * <li>the time threshold is reached</li>
066     * <li>the inputStreamName has changed</li>
067     * </ul>
068     * If this method raises an exception the retry policy is applied.
069     *
070     * @param context used to send records to output streams, note that the checkpoint is managed automatically.
071     * @param inputStreamName the input streams where the records are coming from
072     * @param records the batch of records
073     */
074    protected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records);
075
076    /**
077     * Called when the retry policy has failed.
078     */
079    public abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records);
080
081    @Override
082    public void init(ComputationContext context) {
083        thresholdMillis = context.getPolicy().getBatchThreshold().toMillis();
084        context.setTimer(TIMER_BATCH, System.currentTimeMillis() + thresholdMillis);
085        batchRecords = new ArrayList<>(context.getPolicy().batchCapacity);
086    }
087
088    @Override
089    public void processTimer(ComputationContext context, String key, long timestamp) {
090        if (!TIMER_BATCH.equals(key)) {
091            return;
092        }
093        if (!batchRecords.isEmpty()) {
094            batchProcess(context);
095        }
096        context.setTimer(TIMER_BATCH, System.currentTimeMillis() + thresholdMillis);
097    }
098
099    @Override
100    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
101        if (!inputStreamName.equals(currentInputStream) && !batchRecords.isEmpty()) {
102            batchProcess(context);
103        }
104        if (newBatch) {
105            currentInputStream = inputStreamName;
106            newBatch = false;
107        }
108        batchRecords.add(record);
109        if (batchRecords.size() >= context.getPolicy().getBatchCapacity()) {
110            removeLastRecordOnRetry = true;
111            batchProcess(context);
112            removeLastRecordOnRetry = false;
113        }
114    }
115
116    private void batchProcess(ComputationContext context) {
117        batchProcess(context, currentInputStream, batchRecords);
118        checkpointBatch(context);
119    }
120
121    protected void checkpointBatch(ComputationContext context) {
122        context.askForCheckpoint();
123        batchRecords.clear();
124        newBatch = true;
125    }
126
127    @Override
128    public void processRetry(ComputationContext context, Throwable failure) {
129        if (removeLastRecordOnRetry) {
130            // the batchProcess has failed, processRecord will be retried with the same record
131            // but first we have to remove the record from the batch
132            batchRecords.remove(batchRecords.size() -1);
133            removeLastRecordOnRetry = false;
134        }
135        log.warn(String.format("Computation: %s fails to process batch of %d records, last record: %s, retrying ...",
136                metadata.name(), batchRecords.size(), context.getLastOffset()), failure);
137    }
138
139    @Override
140    public void processFailure(ComputationContext context, Throwable failure) {
141        log.error(String.format(
142                "Computation: %s fails to process batch of %d records after retries, last record: %s, policy: %s",
143                metadata.name(), batchRecords.size(), context.getLastOffset(), context.getPolicy()), failure);
144        batchFailure(context, currentInputStream, batchRecords);
145        batchRecords.clear();
146        newBatch = true;
147    }
148
149}