001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Adapted from from https://github.com/concord/concord-jvm
018 *     bdelbosc
019 */
020package org.nuxeo.lib.stream.computation;
021
022import org.nuxeo.lib.stream.log.LogOffset;
023
024/**
025 * @since 9.3
026 */
027public interface ComputationContext {
028
029    /**
030     * Register a timer callback for some point in the future
031     *
032     * @param key Name of the timer callback.
033     * @param time The (ms since epoch) at which the callback should be fired
034     */
035    void setTimer(String key, long time);
036
037    /**
038     * Emit a record downstream. Records are send effectively on checkpoint using {@link #askForCheckpoint()}.
039     *
040     * @param streamName The name of the stream on which the record should be emitted.
041     * @param key The key associated with the record. Only relevant when routing method is `GROUP_BY`.
042     * @param data: The binary blob to send downstream.
043     */
044    default void produceRecord(String streamName, String key, byte[] data) {
045        produceRecord(streamName, Record.of(key, data));
046    }
047
048    void produceRecord(String streamName, Record record);
049
050    /**
051     * Set the low watermark for a source computation.
052     */
053    void setSourceLowWatermark(long watermark);
054
055    /**
056     * Ask for checkpoint in order to send records, save input stream offset positions.
057     */
058    void askForCheckpoint();
059
060    /**
061     * Finally cancel the request to checkpoint the positions.
062     */
063    void cancelAskForCheckpoint();
064
065    /**
066     * Ask to terminate this computation.
067     *
068     * @since 10.1
069     */
070    void askForTermination();
071
072    /**
073     * @return the LogOffset of the last record read.
074     * @since 10.3
075     */
076    LogOffset getLastOffset();
077
078    /**
079     * Gets the policy used to run the computation.
080     *
081     * @since 10.3
082     */
083    ComputationPolicy getPolicy();
084
085}
086