001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationManager;
024import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping;
025import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
026import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings;
027import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology;
028import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark;
029import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils;
032
033import java.time.Duration;
034import java.util.Comparator;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Objects;
039import java.util.Set;
040import java.util.stream.Collectors;
041
042/**
043 * @since 9.2
044 */
045public class MQComputationManager implements ComputationManager {
046    private static final Log log = LogFactory.getLog(MQComputationManager.class);
047    private final MQManager<Record> manager;
048    private final Topology topology;
049    private final Settings settings;
050    private final List<MQComputationPool> pools;
051
052    public MQComputationManager(MQManager<Record> manager, Topology topology, Settings settings) {
053        this.manager = manager;
054        this.topology = topology;
055        this.settings = settings;
056        initStreams();
057        this.pools = initPools();
058        Objects.requireNonNull(pools);
059    }
060
061    @Override
062    public void start() {
063        log.debug("Starting ...");
064        pools.forEach(MQComputationPool::start);
065    }
066
067    @Override
068    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
069        for (MQComputationPool pool : pools) {
070            if (!pool.waitForAssignments(timeout)) {
071                return false;
072            }
073        }
074        return true;
075    }
076
077    @Override
078    public boolean stop(Duration timeout) {
079        log.debug("Starting ...");
080        long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count();
081        log.debug(String.format("Stopped %d failure", failures));
082        return failures == 0L;
083    }
084
085    @Override
086    public boolean stop() {
087        return stop(Duration.ofSeconds(1));
088    }
089
090    @Override
091    public boolean drainAndStop(Duration timeout) {
092        // here the order matters, this must be done sequentially
093        log.debug("Drain and stop");
094        long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count();
095        log.debug(String.format("Drained and stopped %d failure", failures));
096        return failures == 0L;
097    }
098
099    @Override
100    public void shutdown() {
101        log.debug("Shutdown ...");
102        pools.parallelStream().forEach(MQComputationPool::shutdown);
103        log.debug("Shutdown done");
104    }
105
106
107    @Override
108    public long getLowWatermark() {
109        Map<String, Long> watermarks = new HashMap<>(pools.size());
110        Set<String> roots = topology.getRoots();
111        Map<String, Long> watermarkTrees = new HashMap<>(roots.size());
112        // compute low watermark for each tree of computation
113        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
114        for (String root : roots) {
115            watermarkTrees.put(root, topology.getDescendantComputationNames(root).stream().map(watermarks::get)
116                    .min(Comparator.naturalOrder()).orElse(0L));
117        }
118        // return the minimum wm for all trees that are not 0
119        long ret = watermarkTrees.values().stream()
120                .filter(wm -> wm > 1).min(Comparator.naturalOrder()).orElse(0L);
121        if (log.isTraceEnabled()) {
122            log.trace("lowWatermark: " + ret);
123            watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v));
124            // topology.metadataList().forEach(meta -> System.out.println("  low " + meta.name + " : \t" + getLowWatermark(meta.name)));
125        }
126        return ret;
127    }
128
129    @Override
130    public long getLowWatermark(String computationName) {
131        Objects.nonNull(computationName);
132        // the low wm for a computation is the minimum watermark for all its ancestors
133        Map<String, Long> watermarks = new HashMap<>(pools.size());
134        pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
135        long ret = topology.getAncestorComputationNames(computationName).stream().map(watermarks::get)
136                .min(Comparator.naturalOrder()).orElse(0L);
137        ret = Math.min(ret, watermarks.get(computationName));
138        return ret;
139    }
140
141    @Override
142    public boolean isDone(long timestamp) {
143        return Watermark.ofValue(getLowWatermark()).isDone(timestamp);
144    }
145
146    private List<MQComputationPool> initPools() {
147        log.debug("Initializing pools");
148        return topology.metadataList().stream()
149                .map(meta -> new MQComputationPool(topology.getSupplier(meta.name()), meta,
150                        getDefaultAssignments(meta), manager))
151                .collect(Collectors.toList());
152    }
153
154    private List<List<MQPartition>> getDefaultAssignments(ComputationMetadataMapping meta) {
155        int threads = settings.getConcurrency(meta.name());
156        Map<String, Integer> streams = new HashMap<>();
157        meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName)));
158        return KafkaUtils.roundRobinAssignments(threads, streams);
159    }
160
161    private void initStreams() {
162        log.debug("Initializing streams");
163        topology.streamsSet().forEach(streamName -> manager.createIfNotExists(streamName, settings.getPartitions(streamName)));
164    }
165
166}