001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.lang.Math.min;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.time.Duration;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Set;
033import java.util.stream.Collectors;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.codec.Codec;
038import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
039import org.nuxeo.lib.stream.computation.Record;
040import org.nuxeo.lib.stream.computation.Settings;
041import org.nuxeo.lib.stream.computation.StreamProcessor;
042import org.nuxeo.lib.stream.computation.Topology;
043import org.nuxeo.lib.stream.computation.Watermark;
044import org.nuxeo.lib.stream.log.Latency;
045import org.nuxeo.lib.stream.log.LogManager;
046import org.nuxeo.lib.stream.log.LogPartition;
047import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
048
049/**
050 * @since 9.3
051 */
052public class LogStreamProcessor implements StreamProcessor {
053    private static final Log log = LogFactory.getLog(LogStreamProcessor.class);
054
055    protected final LogManager manager;
056
057    protected Topology topology;
058
059    protected Settings settings;
060
061    protected List<ComputationPool> pools;
062
063    public LogStreamProcessor(LogManager manager) {
064        this.manager = manager;
065    }
066
067    @Override
068    public StreamProcessor init(Topology topology, Settings settings) {
069        log.debug("Initializing ...");
070        this.topology = topology;
071        this.settings = settings;
072        initStreams();
073        initSourceAppenders();
074        return this;
075    }
076
077    @Override
078    public void start() {
079        log.debug("Starting ...");
080        this.pools = initPools();
081        Objects.requireNonNull(pools);
082        pools.forEach(ComputationPool::start);
083    }
084
085    @Override
086    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
087        for (ComputationPool pool : pools) {
088            // TODO: consider decreasing timeout
089            if (!pool.waitForAssignments(timeout)) {
090                return false;
091            }
092        }
093        return true;
094    }
095
096    @Override
097    public boolean isTerminated() {
098        return pools.stream().allMatch(ComputationPool::isTerminated);
099    }
100
101    @Override
102    public boolean stop(Duration timeout) {
103        log.debug("Stopping ...");
104        if (pools == null) {
105            return true;
106        }
107        long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count();
108        log.debug(String.format("Stopped %d failure", failures));
109        return failures == 0;
110    }
111
112    @Override
113    public boolean drainAndStop(Duration timeout) {
114        // here the order matters, this must be done sequentially
115        log.debug("Drain and stop");
116        if (pools == null) {
117            return true;
118        }
119        long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count();
120        log.debug(String.format("Drained and stopped %d failure", failures));
121        return failures == 0;
122    }
123
124    @Override
125    public void shutdown() {
126        log.debug("Shutdown ...");
127        if (pools == null) {
128            return;
129        }
130        pools.parallelStream().forEach(ComputationPool::shutdown);
131        log.debug("Shutdown done");
132    }
133
134    @Override
135    public long getLowWatermark() {
136        Map<String, Long> watermarks = new HashMap<>(pools.size());
137        Set<String> roots = topology.getRoots();
138        Map<String, Long> watermarkTrees = new HashMap<>(roots.size());
139        // compute low watermark for each tree of computation
140        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
141        for (String root : roots) {
142            watermarkTrees.put(root,
143                    topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0));
144        }
145        // return the minimum wm for all trees that are not 0
146        long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::new).min().orElse(0);
147        if (log.isTraceEnabled()) {
148            log.trace("lowWatermark: " + ret);
149            watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v));
150        }
151        return ret;
152    }
153
154    @Override
155    public Latency getLatency(String computationName) {
156        Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName);
157        ancestorsComputations.add(computationName);
158        List<Latency> latencies = new ArrayList<>();
159        ancestorsComputations.forEach(comp -> topology.getMetadata(comp).inputStreams().forEach(
160                stream -> latencies.add(manager.getLatency(stream, comp, settings.getCodec(comp),
161                        (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey)))));
162        return Latency.of(latencies);
163    }
164
165    @Override
166    public long getLowWatermark(String computationName) {
167        Objects.requireNonNull(computationName);
168        // the low wm for a computation is the minimum watermark for all its ancestors
169        Map<String, Long> watermarks = new HashMap<>(pools.size());
170        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
171        long ret = topology.getAncestorComputationNames(computationName)
172                           .stream()
173                           .mapToLong(watermarks::get)
174                           .min()
175                           .orElse(0);
176        ret = min(ret, watermarks.get(computationName));
177        return ret;
178    }
179
180    @Override
181    public boolean isDone(long timestamp) {
182        return Watermark.ofValue(getLowWatermark()).isDone(timestamp);
183    }
184
185    protected List<ComputationPool> initPools() {
186        log.debug("Initializing pools");
187        return topology.metadataList()
188                       .stream()
189                       .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta,
190                               getDefaultAssignments(meta), manager,
191                               getCodecForStreams(meta.name(), meta.inputStreams()),
192                               getCodecForStreams(meta.name(), meta.outputStreams())))
193                       .collect(Collectors.toList());
194    }
195
196    @SuppressWarnings("unchecked")
197    protected Codec<Record> getCodecForStreams(String name, Set<String> streams) {
198        Codec<Record> codec = null;
199        Set<String> codecNames = new HashSet<>();
200        for (String stream : streams) {
201            codec = settings.getCodec(stream);
202            codecNames.add(codec == null ? "none" : codec.getName());
203        }
204        if (codecNames.size() > 1) {
205            throw new IllegalArgumentException(String.format("Different codecs for computation %s: %s", name,
206                    Arrays.toString(codecNames.toArray())));
207        }
208        if (codec == null) {
209            codec = NO_CODEC;
210        }
211        return codec;
212    }
213
214    protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) {
215        int threads = settings.getConcurrency(meta.name());
216        Map<String, Integer> streams = new HashMap<>();
217        meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName)));
218        return KafkaUtils.roundRobinAssignments(threads, streams);
219    }
220
221    protected void initStreams() {
222        log.debug("Initializing streams");
223        topology.streamsSet()
224                .forEach(streamName -> manager.createIfNotExists(streamName, settings.getPartitions(streamName)));
225    }
226
227    protected void initSourceAppenders() {
228        log.debug("Initializing source appenders so we ensure they use codec defined in the processor");
229        topology.streamsSet().stream().filter(topology::isSource).forEach(
230                sourceStream -> manager.getAppender(sourceStream, settings.getCodec(sourceStream)));
231    }
232
233}