001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Adapted from https://github.com/concord/concord-jvm
018 *     bdelbosc
019 */
020package org.nuxeo.lib.stream.computation;
021
022/**
023 * Computation receives records from input streams one at a time, it can produce record on its output streams. A timer
024 * processing can be used for windowing computation.
025 *
026 * @since 9.3
027 */
028public interface Computation {
029
030    /**
031     * Called when the framework has registered the computation successfully. Gives users a first opportunity to
032     * schedule timer callbacks and produce records. This method can be called multiple times.
033     *
034     * @param context The computation context object provided by the system.
035     */
036    void init(ComputationContext context);
037
038    /**
039     * Called when the framework is ready to shutdown the computation. Gives users a chance to perform some cleanup
040     * before the process is killed.
041     */
042    @SuppressWarnings("EmptyMethod")
043    default void destroy() {
044
045    }
046
047    /**
048     * Process an incoming record on one of the computation's input streams.
049     *
050     * @param context The computation context object provided by the system.
051     * @param inputStreamName Name of the input stream that provides the record.
052     * @param record The record.
053     */
054    void processRecord(ComputationContext context, String inputStreamName, Record record);
055
056    /**
057     * Process a timer callback previously set via {@link ComputationContext#setTimer(String, long)}.
058     *
059     * @param context The computation context object provided by the system.
060     * @param key The name of the timer.
061     * @param timestamp The timestamp (in ms) for which the callback was scheduled.
062     */
063    void processTimer(ComputationContext context, String key, long timestamp);
064
065    /**
066     * Identify the computation.
067     *
068     * @return computation's metadata.
069     */
070    ComputationMetadata metadata();
071
072    /**
073     * A hook to inform that computation will be soon destroyed. It gives a way for long processing to cooperate to a
074     * quick shutdown. <br>
075     * This method is not invoked from the computation thread, it should only set some volatile flag and returns
076     * immediately.
077     *
078     * @since 10.2
079     */
080    @SuppressWarnings("EmptyMethod")
081    default void signalStop() {
082    }
083
084    /**
085     * Called after a failure in {@link #processRecord} or {@link #processTimer} before retrying.
086     *
087     * @since 10.3
088     */
089    void processRetry(ComputationContext context, Throwable failure);
090
091    /**
092     * Called when {@link #processRecord} or {@link #processTimer} fails and cannot be retried.
093     *
094     * @since 10.3
095     */
096    void processFailure(ComputationContext context, Throwable failure);
097
098}