001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.lang.Math.min;
022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
023
024import java.time.Duration;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Objects;
033import java.util.Set;
034import java.util.stream.Collectors;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
039import org.jgrapht.graph.DefaultEdge;
040import org.nuxeo.lib.stream.StreamRuntimeException;
041import org.nuxeo.lib.stream.codec.Codec;
042import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
043import org.nuxeo.lib.stream.computation.Record;
044import org.nuxeo.lib.stream.computation.Settings;
045import org.nuxeo.lib.stream.computation.StreamProcessor;
046import org.nuxeo.lib.stream.computation.Topology;
047import org.nuxeo.lib.stream.computation.Watermark;
048import org.nuxeo.lib.stream.log.Latency;
049import org.nuxeo.lib.stream.log.LogManager;
050import org.nuxeo.lib.stream.log.LogPartition;
051import org.nuxeo.lib.stream.log.Name;
052import org.nuxeo.lib.stream.log.kafka.KafkaUtils;
053
054import com.fasterxml.jackson.core.JsonProcessingException;
055import com.fasterxml.jackson.databind.ObjectMapper;
056import com.fasterxml.jackson.databind.node.ArrayNode;
057import com.fasterxml.jackson.databind.node.ObjectNode;
058
059/**
060 * @since 9.3
061 */
062public class LogStreamProcessor implements StreamProcessor {
063    private static final Log log = LogFactory.getLog(LogStreamProcessor.class);
064
065    protected final LogManager manager;
066
067    protected Topology topology;
068
069    protected Settings settings;
070
071    protected List<ComputationPool> pools;
072
073    protected LogStreamManager streamManager;
074
075    protected final boolean needRegister;
076
077    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
078
079    @Deprecated
080    public LogStreamProcessor(LogManager manager) {
081        needRegister = true;
082        this.manager = manager;
083        this.streamManager = new LogStreamManager(manager);
084    }
085
086    public LogStreamProcessor(LogStreamManager streamManager) {
087        needRegister = false;
088        this.streamManager = streamManager;
089        this.manager = streamManager.getLogManager();
090    }
091
092    @Override
093    public StreamProcessor init(Topology topology, Settings settings) {
094        log.debug("Initializing ...");
095        this.topology = topology;
096        this.settings = settings;
097        if (needRegister) {
098            // backward compat when using a LogManager instead of StreamManager
099            streamManager.register("_", topology, settings);
100        }
101        return this;
102    }
103
104    @Override
105    public void start() {
106        log.debug("Starting ...");
107        this.pools = initPools();
108        Objects.requireNonNull(pools);
109        pools.forEach(ComputationPool::start);
110    }
111
112    @Override
113    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
114        for (ComputationPool pool : pools) {
115            // TODO: consider decreasing timeout
116            if (!pool.waitForAssignments(timeout)) {
117                return false;
118            }
119        }
120        return true;
121    }
122
123    @Override
124    public boolean isTerminated() {
125        return pools.stream().allMatch(ComputationPool::isTerminated);
126    }
127
128    @Override
129    public String toJson(Map<String, String> meta) {
130        try {
131            ObjectNode ret = OBJECT_MAPPER.createObjectNode();
132            ObjectNode metaNode = OBJECT_MAPPER.createObjectNode();
133            meta.forEach((key, value) -> metaNode.put(key, value));
134            ret.set("metadata", metaNode);
135            // list streams with settings
136            ArrayNode streamsNode = OBJECT_MAPPER.createArrayNode();
137            topology.streamsSet().forEach(stream -> {
138                ObjectNode item = OBJECT_MAPPER.createObjectNode();
139                item.put("name", stream);
140                item.put("partitions", settings.getPartitions(stream));
141                item.put("codec", settings.getCodec(stream).getName());
142                streamsNode.add(item);
143            });
144            ret.set("streams", streamsNode);
145            // list computations with settings
146            ArrayNode computationsNode = OBJECT_MAPPER.createArrayNode();
147            topology.metadataList().forEach(comp -> {
148                ObjectNode item = OBJECT_MAPPER.createObjectNode();
149                item.put("name", comp.name());
150                item.put("threads", settings.getConcurrency(comp.name()));
151                item.put("continueOnFailure", settings.getPolicy(comp.name()).continueOnFailure());
152                item.put("batchCapacity", settings.getPolicy(comp.name()).getBatchCapacity());
153                item.put("batchThresholdMs", settings.getPolicy(comp.name()).getBatchThreshold().toMillis());
154                item.put("maxRetries", settings.getPolicy(comp.name()).getRetryPolicy().getMaxRetries());
155                item.put("retryDelayMs", settings.getPolicy(comp.name()).getRetryPolicy().getDelay().toMillis());
156                computationsNode.add(item);
157            });
158            ret.set("computations", computationsNode);
159            // list DAG edges
160            ArrayNode topologyNode = OBJECT_MAPPER.createArrayNode();
161            DirectedAcyclicGraph<Topology.Vertex, DefaultEdge> dag = topology.getDag();
162            for (DefaultEdge edge : dag.edgeSet()) {
163                ArrayNode edgeNode = OBJECT_MAPPER.createArrayNode();
164                edgeNode.add(getEdgeName(dag.getEdgeSource(edge)));
165                edgeNode.add(getEdgeName(dag.getEdgeTarget(edge)));
166                topologyNode.add(edgeNode);
167            }
168            ret.set("topology", topologyNode);
169            String json = OBJECT_MAPPER.writer().writeValueAsString(ret);
170            if (log.isDebugEnabled()) {
171                log.debug("Starting processor: " + json);
172            }
173            return json;
174        } catch (JsonProcessingException e) {
175            throw new StreamRuntimeException("Fail to dump processor as JSON", e);
176        }
177    }
178
179    protected String getEdgeName(Topology.Vertex edge) {
180        return (edge.getType().equals(Topology.VertexType.COMPUTATION) ? "computation:" : "stream:") + edge.getName();
181    }
182
183    @Override
184    public boolean stop(Duration timeout) {
185        log.debug("Stopping ...");
186        if (pools == null) {
187            return true;
188        }
189        long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count();
190        log.debug(String.format("Stopped %d failure", failures));
191        return failures == 0;
192    }
193
194    @Override
195    public boolean drainAndStop(Duration timeout) {
196        // here the order matters, this must be done sequentially
197        log.debug("Drain and stop");
198        if (pools == null) {
199            return true;
200        }
201        long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count();
202        log.debug(String.format("Drained and stopped %d failure", failures));
203        return failures == 0;
204    }
205
206    @Override
207    public void shutdown() {
208        log.debug("Shutdown ...");
209        if (pools == null) {
210            return;
211        }
212        pools.parallelStream().forEach(ComputationPool::shutdown);
213        log.debug("Shutdown done");
214    }
215
216    @Override
217    public long getLowWatermark() {
218        Map<String, Long> watermarks = new HashMap<>(pools.size());
219        Set<String> roots = topology.getRoots();
220        Map<String, Long> watermarkTrees = new HashMap<>(roots.size());
221        // compute low watermark for each tree of computation
222        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
223        for (String root : roots) {
224            watermarkTrees.put(root,
225                    topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0));
226        }
227        // return the minimum wm for all trees that are not 0
228        long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::valueOf).min().orElse(0);
229        if (log.isTraceEnabled()) {
230            log.trace("lowWatermark: " + ret);
231            watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v));
232        }
233        return ret;
234    }
235
236    @Override
237    public Latency getLatency(String computationName) {
238        Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName);
239        ancestorsComputations.add(computationName);
240        List<Latency> latencies = new ArrayList<>();
241        ancestorsComputations.forEach(
242                comp -> topology.getMetadata(comp)
243                                .inputStreams()
244                                .forEach(stream -> latencies.add(
245                                        manager.getLatency(Name.ofUrn(stream), Name.ofUrn(comp),
246                                                settings.getCodec(comp),
247                                                (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()),
248                                                (Record::getKey)))));
249        return Latency.of(latencies);
250    }
251
252    @Override
253    public long getLowWatermark(String computationName) {
254        Objects.requireNonNull(computationName);
255        // the low wm for a computation is the minimum watermark for all its ancestors
256        Map<String, Long> watermarks = new HashMap<>(pools.size());
257        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
258        long ret = topology.getAncestorComputationNames(computationName)
259                           .stream()
260                           .mapToLong(watermarks::get)
261                           .min()
262                           .orElse(0);
263        ret = min(ret, watermarks.get(computationName));
264        return ret;
265    }
266
267    @Override
268    public boolean isDone(long timestamp) {
269        return Watermark.ofValue(getLowWatermark()).isDone(timestamp);
270    }
271
272    protected List<ComputationPool> initPools() {
273        log.debug("Initializing pools");
274        return topology.metadataList()
275                       .stream()
276                       .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta,
277                               getDefaultAssignments(meta), streamManager,
278                               settings.getPolicy(meta.name())))
279                       .collect(Collectors.toList());
280    }
281
282    @SuppressWarnings("unchecked")
283    protected Codec<Record> getCodecForStreams(String name, Set<String> streams) {
284        Codec<Record> codec = null;
285        Set<String> codecNames = new HashSet<>();
286        for (String stream : streams) {
287            codec = settings.getCodec(stream);
288            codecNames.add(codec == null ? "none" : codec.getName());
289        }
290        if (codecNames.size() > 1) {
291            throw new IllegalArgumentException(String.format("Different codecs for computation %s: %s", name,
292                    Arrays.toString(codecNames.toArray())));
293        }
294        if (codec == null) {
295            codec = NO_CODEC;
296        }
297        return codec;
298    }
299
300    protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) {
301        int threads = settings.getConcurrency(meta.name());
302        if (threads == 0) {
303            return Collections.emptyList();
304        }
305        Map<String, Integer> streams = new HashMap<>();
306        meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName)));
307        return KafkaUtils.roundRobinAssignments(threads, streams);
308    }
309}