001/*
002 * (C) Copyright 2019 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.List;
022
023import org.nuxeo.lib.stream.log.LogOffset;
024
025/**
026 * Gives access to StreamProcessor and appender for source provider.
027 *
028 * @since 11.1
029 */
030public interface StreamManager {
031    /**
032     * Registers a processor and initializes the underlying streams, this is needed before creating a processor or
033     * appending record in source streams.
034     */
035    void register(String processorName, Topology topology, Settings settings);
036
037    /**
038     * Creates a registered processor without starting it.
039     */
040    StreamProcessor createStreamProcessor(String processorName);
041
042    /**
043     * Registers and creates a processor without starting it.
044     */
045    default StreamProcessor registerAndCreateProcessor(String processorName, Topology topology, Settings settings) {
046        register(processorName, topology, settings);
047        return createStreamProcessor(processorName);
048    }
049
050    /**
051     * Registers some source Streams without any processors.
052     *
053     * @since 11.4
054     */
055    void register(List<String> streams, Settings settings);
056
057    /**
058     * Appends a record to a processor's source stream.
059     */
060    LogOffset append(String stream, Record record);
061}