001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Adapted from https://github.com/concord/concord-jvm 018 * bdelbosc 019 */ 020package org.nuxeo.lib.stream.computation; 021 022/** 023 * Computation receives records from input streams one at a time, it can produce record on its output streams. A timer 024 * processing can be used for windowing computation. 025 * 026 * @since 9.3 027 */ 028public interface Computation { 029 030 /** 031 * Called when the framework has registered the computation successfully. Gives users a first opportunity to 032 * schedule timer callbacks and produce records. This method can be called multiple times. 033 * 034 * @param context The computation context object provided by the system. 035 */ 036 void init(ComputationContext context); 037 038 /** 039 * Called when the framework is ready to shutdown the computation. Gives users a chance to perform some cleanup 040 * before the process is killed. 041 */ 042 @SuppressWarnings("EmptyMethod") 043 default void destroy() { 044 045 } 046 047 /** 048 * Process an incoming record on one of the computation's input streams. 049 * 050 * @param context The computation context object provided by the system. 051 * @param inputStreamName Name of the input stream that provides the record. 052 * @param record The record. 053 */ 054 void processRecord(ComputationContext context, String inputStreamName, Record record); 055 056 /** 057 * Process a timer callback previously set via {@link ComputationContext#setTimer(String, long)}. 058 * 059 * @param context The computation context object provided by the system. 060 * @param key The name of the timer. 061 * @param timestamp The timestamp (in ms) for which the callback was scheduled. 062 */ 063 void processTimer(ComputationContext context, String key, long timestamp); 064 065 /** 066 * Identify the computation. 067 * 068 * @return computation's metadata. 069 */ 070 ComputationMetadata metadata(); 071 072 /** 073 * A hook to inform that computation will be soon destroyed. It gives a way for long processing to cooperate to a 074 * quick shutdown. <br> 075 * This method is not invoked from the computation thread, it should only set some volatile flag and returns 076 * immediately. 077 * 078 * @since 10.2 079 */ 080 @SuppressWarnings("EmptyMethod") 081 default void signalStop() { 082 } 083 084 /** 085 * Called after a failure in {@link #processRecord} or {@link #processTimer} before retrying. 086 * 087 * @since 10.3 088 */ 089 void processRetry(ComputationContext context, Throwable failure); 090 091 /** 092 * Called when {@link #processRecord} or {@link #processTimer} fails and cannot be retried. 093 * 094 * @since 10.3 095 */ 096 void processFailure(ComputationContext context, Throwable failure); 097 098}