001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.lang.Math.min; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.time.Duration; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Objects; 032import java.util.Set; 033import java.util.stream.Collectors; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.codec.Codec; 038import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 039import org.nuxeo.lib.stream.computation.Record; 040import org.nuxeo.lib.stream.computation.Settings; 041import org.nuxeo.lib.stream.computation.StreamProcessor; 042import org.nuxeo.lib.stream.computation.Topology; 043import org.nuxeo.lib.stream.computation.Watermark; 044import org.nuxeo.lib.stream.log.Latency; 045import org.nuxeo.lib.stream.log.LogManager; 046import org.nuxeo.lib.stream.log.LogPartition; 047import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 048 049/** 050 * @since 9.3 051 */ 052public class LogStreamProcessor implements StreamProcessor { 053 private static final Log log = LogFactory.getLog(LogStreamProcessor.class); 054 055 protected final LogManager manager; 056 057 protected Topology topology; 058 059 protected Settings settings; 060 061 protected List<ComputationPool> pools; 062 063 public LogStreamProcessor(LogManager manager) { 064 this.manager = manager; 065 } 066 067 @Override 068 public StreamProcessor init(Topology topology, Settings settings) { 069 log.debug("Initializing ..."); 070 this.topology = topology; 071 this.settings = settings; 072 initStreams(); 073 initSourceAppenders(); 074 return this; 075 } 076 077 @Override 078 public void start() { 079 log.debug("Starting ..."); 080 this.pools = initPools(); 081 Objects.requireNonNull(pools); 082 pools.forEach(ComputationPool::start); 083 } 084 085 @Override 086 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 087 for (ComputationPool pool : pools) { 088 // TODO: consider decreasing timeout 089 if (!pool.waitForAssignments(timeout)) { 090 return false; 091 } 092 } 093 return true; 094 } 095 096 @Override 097 public boolean isTerminated() { 098 return pools.stream().allMatch(ComputationPool::isTerminated); 099 } 100 101 @Override 102 public boolean stop(Duration timeout) { 103 log.debug("Stopping ..."); 104 if (pools == null) { 105 return true; 106 } 107 long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count(); 108 log.debug(String.format("Stopped %d failure", failures)); 109 return failures == 0; 110 } 111 112 @Override 113 public boolean drainAndStop(Duration timeout) { 114 // here the order matters, this must be done sequentially 115 log.debug("Drain and stop"); 116 if (pools == null) { 117 return true; 118 } 119 long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count(); 120 log.debug(String.format("Drained and stopped %d failure", failures)); 121 return failures == 0; 122 } 123 124 @Override 125 public void shutdown() { 126 log.debug("Shutdown ..."); 127 if (pools == null) { 128 return; 129 } 130 pools.parallelStream().forEach(ComputationPool::shutdown); 131 log.debug("Shutdown done"); 132 } 133 134 @Override 135 public long getLowWatermark() { 136 Map<String, Long> watermarks = new HashMap<>(pools.size()); 137 Set<String> roots = topology.getRoots(); 138 Map<String, Long> watermarkTrees = new HashMap<>(roots.size()); 139 // compute low watermark for each tree of computation 140 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 141 for (String root : roots) { 142 watermarkTrees.put(root, 143 topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0)); 144 } 145 // return the minimum wm for all trees that are not 0 146 long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::new).min().orElse(0); 147 if (log.isTraceEnabled()) { 148 log.trace("lowWatermark: " + ret); 149 watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v)); 150 } 151 return ret; 152 } 153 154 @Override 155 public Latency getLatency(String computationName) { 156 Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName); 157 ancestorsComputations.add(computationName); 158 List<Latency> latencies = new ArrayList<>(); 159 ancestorsComputations.forEach(comp -> topology.getMetadata(comp).inputStreams().forEach( 160 stream -> latencies.add(manager.getLatency(stream, comp, settings.getCodec(comp), 161 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey))))); 162 return Latency.of(latencies); 163 } 164 165 @Override 166 public long getLowWatermark(String computationName) { 167 Objects.requireNonNull(computationName); 168 // the low wm for a computation is the minimum watermark for all its ancestors 169 Map<String, Long> watermarks = new HashMap<>(pools.size()); 170 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 171 long ret = topology.getAncestorComputationNames(computationName) 172 .stream() 173 .mapToLong(watermarks::get) 174 .min() 175 .orElse(0); 176 ret = min(ret, watermarks.get(computationName)); 177 return ret; 178 } 179 180 @Override 181 public boolean isDone(long timestamp) { 182 return Watermark.ofValue(getLowWatermark()).isDone(timestamp); 183 } 184 185 protected List<ComputationPool> initPools() { 186 log.debug("Initializing pools"); 187 return topology.metadataList() 188 .stream() 189 .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta, 190 getDefaultAssignments(meta), manager, 191 getCodecForStreams(meta.name(), meta.inputStreams()), 192 getCodecForStreams(meta.name(), meta.outputStreams()))) 193 .collect(Collectors.toList()); 194 } 195 196 @SuppressWarnings("unchecked") 197 protected Codec<Record> getCodecForStreams(String name, Set<String> streams) { 198 Codec<Record> codec = null; 199 Set<String> codecNames = new HashSet<>(); 200 for (String stream : streams) { 201 codec = settings.getCodec(stream); 202 codecNames.add(codec == null ? "none" : codec.getName()); 203 } 204 if (codecNames.size() > 1) { 205 throw new IllegalArgumentException(String.format("Different codecs for computation %s: %s", name, 206 Arrays.toString(codecNames.toArray()))); 207 } 208 if (codec == null) { 209 codec = NO_CODEC; 210 } 211 return codec; 212 } 213 214 protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) { 215 int threads = settings.getConcurrency(meta.name()); 216 Map<String, Integer> streams = new HashMap<>(); 217 meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName))); 218 return KafkaUtils.roundRobinAssignments(threads, streams); 219 } 220 221 protected void initStreams() { 222 log.debug("Initializing streams"); 223 topology.streamsSet() 224 .forEach(streamName -> manager.createIfNotExists(streamName, settings.getPartitions(streamName))); 225 } 226 227 protected void initSourceAppenders() { 228 log.debug("Initializing source appenders so we ensure they use codec defined in the processor"); 229 topology.streamsSet().stream().filter(topology::isSource).forEach( 230 sourceStream -> manager.getAppender(sourceStream, settings.getCodec(sourceStream))); 231 } 232 233}