public abstract class AbstractBatchComputation extends AbstractComputation
Computation
that processes records by batch.
The batch capacity and threshold are defined in the computation policy.
Modifier and Type | Field and Description |
---|---|
protected List<Record> |
batchRecords |
protected String |
currentInputStream |
protected boolean |
newBatch |
protected boolean |
removeLastRecordOnRetry |
protected long |
thresholdMillis |
static String |
TIMER_BATCH |
Constructor and Description |
---|
AbstractBatchComputation(String name,
int nbInputStreams,
int nbOutputStreams)
Constructor
|
Modifier and Type | Method and Description |
---|---|
abstract void |
batchFailure(ComputationContext context,
String inputStreamName,
List<Record> records)
Called when the retry policy has failed.
|
protected abstract void |
batchProcess(ComputationContext context,
String inputStreamName,
List<Record> records)
Called when:
- the batch capacity is reached - the time threshold is reached - the inputStreamName has changed If this method raises an exception the retry policy is applied. |
protected void |
checkpointBatch(ComputationContext context) |
void |
init(ComputationContext context)
Called when the framework has registered the computation successfully.
|
void |
processFailure(ComputationContext context,
Throwable failure)
|
void |
processRecord(ComputationContext context,
String inputStreamName,
Record record)
Process an incoming record on one of the computation's input streams.
|
void |
processRetry(ComputationContext context,
Throwable failure)
|
void |
processTimer(ComputationContext context,
String key,
long timestamp)
Process a timer callback previously set via
ComputationContext.setTimer(String, long) . |
metadata
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
destroy, signalStop
public static final String TIMER_BATCH
protected List<Record> batchRecords
protected String currentInputStream
protected boolean newBatch
protected long thresholdMillis
protected boolean removeLastRecordOnRetry
public AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams)
name
- the name of the computationnbInputStreams
- the number of input streamsnbOutputStreams
- the number of output streamsprotected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records)
context
- used to send records to output streams, note that the checkpoint is managed automatically.inputStreamName
- the input streams where the records are coming fromrecords
- the batch of recordspublic abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records)
public void init(ComputationContext context)
Computation
init
in interface Computation
init
in class AbstractComputation
context
- The computation context object provided by the system.public void processTimer(ComputationContext context, String key, long timestamp)
Computation
ComputationContext.setTimer(String, long)
.processTimer
in interface Computation
processTimer
in class AbstractComputation
context
- The computation context object provided by the system.key
- The name of the timer.timestamp
- The timestamp (in ms) for which the callback was scheduled.public void processRecord(ComputationContext context, String inputStreamName, Record record)
Computation
context
- The computation context object provided by the system.inputStreamName
- Name of the input stream that provides the record.record
- The record.protected void checkpointBatch(ComputationContext context)
public void processRetry(ComputationContext context, Throwable failure)
Computation
Computation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)
or Computation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)
before retrying.processRetry
in interface Computation
processRetry
in class AbstractComputation
public void processFailure(ComputationContext context, Throwable failure)
Computation
Computation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)
or Computation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)
fails and cannot be retried.processFailure
in interface Computation
processFailure
in class AbstractComputation
Copyright © 2019 Nuxeo. All rights reserved.