001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.lang.Math.min;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.time.Duration;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Objects;
032import java.util.Set;
033import java.util.stream.Collectors;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.codec.Codec;
038import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
039import org.nuxeo.lib.stream.computation.Record;
040import org.nuxeo.lib.stream.computation.Settings;
041import org.nuxeo.lib.stream.computation.StreamProcessor;
042import org.nuxeo.lib.stream.computation.Topology;
043import org.nuxeo.lib.stream.computation.Watermark;
044import org.nuxeo.lib.stream.log.Latency;
045import org.nuxeo.lib.stream.log.LogManager;
046import org.nuxeo.lib.stream.log.LogPartition;
047import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
048
049/**
050 * @since 9.3
051 */
052public class LogStreamProcessor implements StreamProcessor {
053    private static final Log log = LogFactory.getLog(LogStreamProcessor.class);
054
055    protected final LogManager manager;
056
057    protected Topology topology;
058
059    protected Settings settings;
060
061    protected List<ComputationPool> pools;
062
063    public LogStreamProcessor(LogManager manager) {
064        this.manager = manager;
065    }
066
067    @Override
068    public StreamProcessor init(Topology topology, Settings settings) {
069        log.debug("Initializing ...");
070        this.topology = topology;
071        this.settings = settings;
072        initStreams();
073        initSourceAppenders();
074        return this;
075    }
076
077    @Override
078    public void start() {
079        log.debug("Starting ...");
080        this.pools = initPools();
081        Objects.requireNonNull(pools);
082        pools.forEach(ComputationPool::start);
083    }
084
085    @Override
086    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
087        for (ComputationPool pool : pools) {
088            // TODO: consider decreasing timeout
089            if (!pool.waitForAssignments(timeout)) {
090                return false;
091            }
092        }
093        return true;
094    }
095
096    @Override
097    public boolean isTerminated() {
098        return pools.stream().allMatch(ComputationPool::isTerminated);
099    }
100
101    @Override
102    public boolean stop(Duration timeout) {
103        log.debug("Stopping ...");
104        if (pools == null) {
105            return true;
106        }
107        long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count();
108        log.debug(String.format("Stopped %d failure", failures));
109        return failures == 0;
110    }
111
112    @Override
113    public boolean drainAndStop(Duration timeout) {
114        // here the order matters, this must be done sequentially
115        log.debug("Drain and stop");
116        if (pools == null) {
117            return true;
118        }
119        long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count();
120        log.debug(String.format("Drained and stopped %d failure", failures));
121        return failures == 0;
122    }
123
124    @Override
125    public void shutdown() {
126        log.debug("Shutdown ...");
127        if (pools == null) {
128            return;
129        }
130        pools.parallelStream().forEach(ComputationPool::shutdown);
131        log.debug("Shutdown done");
132    }
133
134    @Override
135    public long getLowWatermark() {
136        Map<String, Long> watermarks = new HashMap<>(pools.size());
137        Set<String> roots = topology.getRoots();
138        Map<String, Long> watermarkTrees = new HashMap<>(roots.size());
139        // compute low watermark for each tree of computation
140        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
141        for (String root : roots) {
142            watermarkTrees.put(root,
143                    topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0));
144        }
145        // return the minimum wm for all trees that are not 0
146        long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::new).min().orElse(0);
147        if (log.isTraceEnabled()) {
148            log.trace("lowWatermark: " + ret);
149            watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v));
150        }
151        return ret;
152    }
153
154    @Override
155    public Latency getLatency(String computationName) {
156        Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName);
157        ancestorsComputations.add(computationName);
158        List<Latency> latencies = new ArrayList<>();
159        ancestorsComputations.forEach(
160                comp -> topology.getMetadata(comp)
161                                .inputStreams()
162                                .forEach(stream -> latencies.add(
163                                        manager.getLatency(stream, comp, settings.getCodec(comp),
164                                                (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()),
165                                                (Record::getKey)))));
166        return Latency.of(latencies);
167    }
168
169    @Override
170    public long getLowWatermark(String computationName) {
171        Objects.requireNonNull(computationName);
172        // the low wm for a computation is the minimum watermark for all its ancestors
173        Map<String, Long> watermarks = new HashMap<>(pools.size());
174        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
175        long ret = topology.getAncestorComputationNames(computationName)
176                           .stream()
177                           .mapToLong(watermarks::get)
178                           .min()
179                           .orElse(0);
180        ret = min(ret, watermarks.get(computationName));
181        return ret;
182    }
183
184    @Override
185    public boolean isDone(long timestamp) {
186        return Watermark.ofValue(getLowWatermark()).isDone(timestamp);
187    }
188
189    protected List<ComputationPool> initPools() {
190        log.debug("Initializing pools");
191        return topology.metadataList()
192                       .stream()
193                       .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta,
194                               getDefaultAssignments(meta), manager,
195                               getCodecForStreams(meta.name(), meta.inputStreams()),
196                               getCodecForStreams(meta.name(), meta.outputStreams()), settings.getPolicy(meta.name())))
197                       .collect(Collectors.toList());
198    }
199
200    @SuppressWarnings("unchecked")
201    protected Codec<Record> getCodecForStreams(String name, Set<String> streams) {
202        Codec<Record> codec = null;
203        Set<String> codecNames = new HashSet<>();
204        for (String stream : streams) {
205            codec = settings.getCodec(stream);
206            codecNames.add(codec == null ? "none" : codec.getName());
207        }
208        if (codecNames.size() > 1) {
209            throw new IllegalArgumentException(String.format("Different codecs for computation %s: %s", name,
210                    Arrays.toString(codecNames.toArray())));
211        }
212        if (codec == null) {
213            codec = NO_CODEC;
214        }
215        return codec;
216    }
217
218    protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) {
219        int threads = settings.getConcurrency(meta.name());
220        Map<String, Integer> streams = new HashMap<>();
221        meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName)));
222        return KafkaUtils.roundRobinAssignments(threads, streams);
223    }
224
225    protected void initStreams() {
226        log.debug("Initializing streams");
227        topology.streamsSet().forEach(streamName -> {
228            if (manager.exists(streamName)) {
229                int size = manager.size(streamName);
230                if (settings.getPartitions(streamName) != size) {
231                    log.debug(String.format(
232                            "Update settings for stream: %s defined with %d partitions but exists with %d partitions",
233                            streamName, settings.getPartitions(streamName), size));
234                    settings.setPartitions(streamName, size);
235                }
236            } else {
237                manager.createIfNotExists(streamName, settings.getPartitions(streamName));
238            }
239        });
240    }
241
242    protected void initSourceAppenders() {
243        log.debug("Initializing source appenders so we ensure they use codec defined in the processor");
244        topology.streamsSet()
245                .forEach(sourceStream -> manager.getAppender(sourceStream, settings.getCodec(sourceStream)));
246    }
247
248}