001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.time.Duration;
022import java.util.Map;
023
024import org.nuxeo.lib.stream.log.Latency;
025
026/**
027 * Run a topology of computations according to some settings.
028 *
029 * @since 9.3
030 */
031public interface StreamProcessor {
032
033    /**
034     * Initialize streams, but don't run the computations
035     */
036    StreamProcessor init(Topology topology, Settings settings);
037
038    /**
039     * Run the initialized computations.
040     */
041    void start();
042
043    /**
044     * Try to stop computations gracefully after processing a record or a timer within the timeout duration. If this can
045     * not be done within the timeout, shutdown and returns false.
046     */
047    boolean stop(Duration timeout);
048
049    /**
050     * Stop computations when input streams are empty. The timeout is applied for each computation, the total duration
051     * can be up to nb computations * timeout
052     * <p>
053     * Returns {@code true} if computations are stopped during the timeout delay.
054     */
055    boolean drainAndStop(Duration timeout);
056
057    /**
058     * Shutdown immediately.
059     */
060    void shutdown();
061
062    /**
063     * Returns the low watermark for the computation. Any message with an offset below the low watermark has been
064     * processed by this computation and its ancestors. The returned watermark is local to this processing node, if the
065     * computation is distributed the global low watermark is the minimum of all nodes low watermark.
066     */
067    long getLowWatermark(String computationName);
068
069    /**
070     * Returns the low watermark for all the computations of the topology. Any message with an offset below the low
071     * watermark has been processed. The returned watermark is local to this processing node.
072     */
073    long getLowWatermark();
074
075    /**
076     * Returns the latency for a computation. This works also for distributed computations.
077     *
078     * @since 10.1
079     */
080    Latency getLatency(String computationName);
081
082    /**
083     * Returns true if all messages with a lower timestamp has been processed by the topology.
084     */
085    boolean isDone(long timestamp);
086
087    /**
088     * Wait for the computations to have assigned partitions ready to process records. The processor must be started.
089     * This is useful for writing unit test.
090     * <p>
091     * Returns {@code true} if all computations have assigned partitions during the timeout delay.
092     */
093    boolean waitForAssignments(Duration timeout) throws InterruptedException;
094
095    /**
096     * True if there is no active processing threads.
097     *
098     * @since 10.1
099     */
100    boolean isTerminated();
101
102    /**
103     * Returns a JSON representation of the processor, this includes the list of streams and computations with their
104     * settings along with the topology.
105     *
106     * @since 11.5
107     */
108    String toJson(Map<String, String> meta);
109}