001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.time.Duration; 022 023/** 024 * Run a topology of computations according to some settings. 025 * 026 * @since 9.3 027 */ 028public interface StreamProcessor { 029 030 /** 031 * Initialize streams, but don't run the computations 032 */ 033 StreamProcessor init(Topology topology, Settings settings); 034 035 /** 036 * Run the initialized computations. 037 */ 038 void start(); 039 040 /** 041 * Try to stop computations gracefully after processing a record or a timer within the timeout duration. If this can 042 * not be done within the timeout, shutdown and returns false. 043 */ 044 boolean stop(Duration timeout); 045 046 /** 047 * Stop computations when input streams are empty. The timeout is applied for each computation, the total duration 048 * can be up to nb computations * timeout 049 * <p/> 050 * Returns {@code true} if computations are stopped during the timeout delay. 051 */ 052 boolean drainAndStop(Duration timeout); 053 054 /** 055 * Shutdown immediately. 056 */ 057 void shutdown(); 058 059 /** 060 * Return the low watermark for the computation. Any message with an offset below the low watermark has been 061 * processed by this computation and its ancestors.. 062 */ 063 long getLowWatermark(String computationName); 064 065 /** 066 * Return the low watermark for all the computations of the topology. Any message with an offset below the low 067 * watermark has been processed. 068 */ 069 long getLowWatermark(); 070 071 /** 072 * Returns true if all messages with a lower timestamp has been processed by the topology. 073 */ 074 boolean isDone(long timestamp); 075 076 /** 077 * Wait for the computations to have assigned partitions ready to process records. The processor must be started. 078 * This is useful for writing unit test. 079 * <p/> 080 * Returns {@code true} if all computations have assigned partitions during the timeout delay. 081 */ 082 boolean waitForAssignments(Duration timeout) throws InterruptedException; 083 084}