001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.lang.Math.min; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Set; 030import java.util.stream.Collectors; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.lib.stream.computation.Settings; 037import org.nuxeo.lib.stream.computation.StreamProcessor; 038import org.nuxeo.lib.stream.computation.Topology; 039import org.nuxeo.lib.stream.computation.Watermark; 040import org.nuxeo.lib.stream.log.Latency; 041import org.nuxeo.lib.stream.log.LogManager; 042import org.nuxeo.lib.stream.log.LogPartition; 043import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 044 045/** 046 * @since 9.3 047 */ 048public class LogStreamProcessor implements StreamProcessor { 049 private static final Log log = LogFactory.getLog(LogStreamProcessor.class); 050 051 protected final LogManager manager; 052 053 protected Topology topology; 054 055 protected Settings settings; 056 057 protected List<ComputationPool> pools; 058 059 public LogStreamProcessor(LogManager manager) { 060 this.manager = manager; 061 } 062 063 @Override 064 public StreamProcessor init(Topology topology, Settings settings) { 065 log.debug("Initializing ..."); 066 this.topology = topology; 067 this.settings = settings; 068 initStreams(); 069 return this; 070 } 071 072 @Override 073 public void start() { 074 log.debug("Starting ..."); 075 this.pools = initPools(); 076 Objects.requireNonNull(pools); 077 pools.forEach(ComputationPool::start); 078 } 079 080 @Override 081 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 082 for (ComputationPool pool : pools) { 083 // TODO: consider decreasing timeout 084 if (!pool.waitForAssignments(timeout)) { 085 return false; 086 } 087 } 088 return true; 089 } 090 091 @Override 092 public boolean isTerminated() { 093 return pools.stream().allMatch(ComputationPool::isTerminated); 094 } 095 096 @Override 097 public boolean stop(Duration timeout) { 098 log.debug("Stopping ..."); 099 if (pools == null) { 100 return true; 101 } 102 long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count(); 103 log.debug(String.format("Stopped %d failure", failures)); 104 return failures == 0; 105 } 106 107 @Override 108 public boolean drainAndStop(Duration timeout) { 109 // here the order matters, this must be done sequentially 110 log.debug("Drain and stop"); 111 if (pools == null) { 112 return true; 113 } 114 long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count(); 115 log.debug(String.format("Drained and stopped %d failure", failures)); 116 return failures == 0; 117 } 118 119 @Override 120 public void shutdown() { 121 log.debug("Shutdown ..."); 122 if (pools == null) { 123 return; 124 } 125 pools.parallelStream().forEach(ComputationPool::shutdown); 126 log.debug("Shutdown done"); 127 } 128 129 @Override 130 public long getLowWatermark() { 131 Map<String, Long> watermarks = new HashMap<>(pools.size()); 132 Set<String> roots = topology.getRoots(); 133 Map<String, Long> watermarkTrees = new HashMap<>(roots.size()); 134 // compute low watermark for each tree of computation 135 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 136 for (String root : roots) { 137 watermarkTrees.put(root, 138 topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0)); 139 } 140 // return the minimum wm for all trees that are not 0 141 long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::new).min().orElse(0); 142 if (log.isTraceEnabled()) { 143 log.trace("lowWatermark: " + ret); 144 watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v)); 145 // topology.metadataList().forEach(meta -> System.out.println(" low " + meta.name + " : \t" + 146 // getLowWatermark(meta.name))); 147 } 148 return ret; 149 } 150 151 @Override 152 public Latency getLatency(String computationName) { 153 Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName); 154 ancestorsComputations.add(computationName); 155 long now = System.currentTimeMillis(); 156 List<Latency> latencies = new ArrayList<>(); 157 ancestorsComputations.forEach(comp -> topology.getMetadata(comp).inputStreams().forEach(stream -> { 158 latencies.add(manager.<Record> getLatency(stream, comp, 159 (rec -> Watermark.ofValue(rec.watermark).getTimestamp()), 160 (rec -> rec.key))); 161 })); 162 return Latency.of(latencies); 163 } 164 165 @Override 166 public long getLowWatermark(String computationName) { 167 Objects.requireNonNull(computationName); 168 // the low wm for a computation is the minimum watermark for all its ancestors 169 Map<String, Long> watermarks = new HashMap<>(pools.size()); 170 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 171 long ret = topology.getAncestorComputationNames(computationName) 172 .stream() 173 .mapToLong(watermarks::get) 174 .min() 175 .orElse(0); 176 ret = min(ret, watermarks.get(computationName)); 177 return ret; 178 } 179 180 @Override 181 public boolean isDone(long timestamp) { 182 return Watermark.ofValue(getLowWatermark()).isDone(timestamp); 183 } 184 185 protected List<ComputationPool> initPools() { 186 log.debug("Initializing pools"); 187 return topology.metadataList() 188 .stream() 189 .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta, 190 getDefaultAssignments(meta), manager)) 191 .collect(Collectors.toList()); 192 } 193 194 protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) { 195 int threads = settings.getConcurrency(meta.name()); 196 Map<String, Integer> streams = new HashMap<>(); 197 meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName))); 198 return KafkaUtils.roundRobinAssignments(threads, streams); 199 } 200 201 protected void initStreams() { 202 log.debug("Initializing streams"); 203 topology.streamsSet() 204 .forEach(streamName -> manager.createIfNotExists(streamName, settings.getPartitions(streamName))); 205 } 206 207}