001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Adapted from from https://github.com/concord/concord-jvm 018 * bdelbosc 019 */ 020package org.nuxeo.lib.stream.computation; 021 022import org.nuxeo.lib.stream.log.LogOffset; 023 024/** 025 * @since 9.3 026 */ 027public interface ComputationContext { 028 029 /** 030 * Register a timer callback for some point in the future 031 * 032 * @param key Name of the timer callback. 033 * @param time The (ms since epoch) at which the callback should be fired 034 */ 035 void setTimer(String key, long time); 036 037 /** 038 * Emit a record downstream. Records are send effectively on checkpoint using {@link #askForCheckpoint()}. 039 * 040 * @param streamName The name of the stream on which the record should be emitted. 041 * @param key The key associated with the record. Only relevant when routing method is `GROUP_BY`. 042 * @param data: The binary blob to send downstream. 043 */ 044 default void produceRecord(String streamName, String key, byte[] data) { 045 produceRecord(streamName, Record.of(key, data)); 046 } 047 048 void produceRecord(String streamName, Record record); 049 050 /** 051 * Set the low watermark for a source computation. 052 */ 053 void setSourceLowWatermark(long watermark); 054 055 /** 056 * Ask for checkpoint in order to send records, save input stream offset positions. 057 */ 058 void askForCheckpoint(); 059 060 /** 061 * Finally cancel the request to checkpoint the positions. 062 */ 063 void cancelAskForCheckpoint(); 064 065 /** 066 * Ask to terminate this computation. 067 * 068 * @since 10.1 069 */ 070 void askForTermination(); 071 072 /** 073 * @return the LogOffset of the last record read. 074 * @since 10.3 075 */ 076 LogOffset getLastOffset(); 077 078 /** 079 * Gets the policy used to run the computation. 080 * 081 * @since 10.3 082 */ 083 ComputationPolicy getPolicy(); 084 085} 086