001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.lang.Math.min; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.time.Duration; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Objects; 032import java.util.Set; 033import java.util.stream.Collectors; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.codec.Codec; 038import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 039import org.nuxeo.lib.stream.computation.Record; 040import org.nuxeo.lib.stream.computation.Settings; 041import org.nuxeo.lib.stream.computation.StreamProcessor; 042import org.nuxeo.lib.stream.computation.Topology; 043import org.nuxeo.lib.stream.computation.Watermark; 044import org.nuxeo.lib.stream.log.Latency; 045import org.nuxeo.lib.stream.log.LogManager; 046import org.nuxeo.lib.stream.log.LogPartition; 047import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 048 049/** 050 * @since 9.3 051 */ 052public class LogStreamProcessor implements StreamProcessor { 053 private static final Log log = LogFactory.getLog(LogStreamProcessor.class); 054 055 protected final LogManager manager; 056 057 protected Topology topology; 058 059 protected Settings settings; 060 061 protected List<ComputationPool> pools; 062 063 public LogStreamProcessor(LogManager manager) { 064 this.manager = manager; 065 } 066 067 @Override 068 public StreamProcessor init(Topology topology, Settings settings) { 069 log.debug("Initializing ..."); 070 this.topology = topology; 071 this.settings = settings; 072 initStreams(); 073 initSourceAppenders(); 074 return this; 075 } 076 077 @Override 078 public void start() { 079 log.debug("Starting ..."); 080 this.pools = initPools(); 081 Objects.requireNonNull(pools); 082 pools.forEach(ComputationPool::start); 083 } 084 085 @Override 086 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 087 for (ComputationPool pool : pools) { 088 // TODO: consider decreasing timeout 089 if (!pool.waitForAssignments(timeout)) { 090 return false; 091 } 092 } 093 return true; 094 } 095 096 @Override 097 public boolean isTerminated() { 098 return pools.stream().allMatch(ComputationPool::isTerminated); 099 } 100 101 @Override 102 public boolean stop(Duration timeout) { 103 log.debug("Stopping ..."); 104 if (pools == null) { 105 return true; 106 } 107 long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count(); 108 log.debug(String.format("Stopped %d failure", failures)); 109 return failures == 0; 110 } 111 112 @Override 113 public boolean drainAndStop(Duration timeout) { 114 // here the order matters, this must be done sequentially 115 log.debug("Drain and stop"); 116 if (pools == null) { 117 return true; 118 } 119 long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count(); 120 log.debug(String.format("Drained and stopped %d failure", failures)); 121 return failures == 0; 122 } 123 124 @Override 125 public void shutdown() { 126 log.debug("Shutdown ..."); 127 if (pools == null) { 128 return; 129 } 130 pools.parallelStream().forEach(ComputationPool::shutdown); 131 log.debug("Shutdown done"); 132 } 133 134 @Override 135 public long getLowWatermark() { 136 Map<String, Long> watermarks = new HashMap<>(pools.size()); 137 Set<String> roots = topology.getRoots(); 138 Map<String, Long> watermarkTrees = new HashMap<>(roots.size()); 139 // compute low watermark for each tree of computation 140 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 141 for (String root : roots) { 142 watermarkTrees.put(root, 143 topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0)); 144 } 145 // return the minimum wm for all trees that are not 0 146 long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::new).min().orElse(0); 147 if (log.isTraceEnabled()) { 148 log.trace("lowWatermark: " + ret); 149 watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v)); 150 } 151 return ret; 152 } 153 154 @Override 155 public Latency getLatency(String computationName) { 156 Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName); 157 ancestorsComputations.add(computationName); 158 List<Latency> latencies = new ArrayList<>(); 159 ancestorsComputations.forEach( 160 comp -> topology.getMetadata(comp) 161 .inputStreams() 162 .forEach(stream -> latencies.add( 163 manager.getLatency(stream, comp, settings.getCodec(comp), 164 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), 165 (Record::getKey))))); 166 return Latency.of(latencies); 167 } 168 169 @Override 170 public long getLowWatermark(String computationName) { 171 Objects.requireNonNull(computationName); 172 // the low wm for a computation is the minimum watermark for all its ancestors 173 Map<String, Long> watermarks = new HashMap<>(pools.size()); 174 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 175 long ret = topology.getAncestorComputationNames(computationName) 176 .stream() 177 .mapToLong(watermarks::get) 178 .min() 179 .orElse(0); 180 ret = min(ret, watermarks.get(computationName)); 181 return ret; 182 } 183 184 @Override 185 public boolean isDone(long timestamp) { 186 return Watermark.ofValue(getLowWatermark()).isDone(timestamp); 187 } 188 189 protected List<ComputationPool> initPools() { 190 log.debug("Initializing pools"); 191 return topology.metadataList() 192 .stream() 193 .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta, 194 getDefaultAssignments(meta), manager, 195 getCodecForStreams(meta.name(), meta.inputStreams()), 196 getCodecForStreams(meta.name(), meta.outputStreams()), settings.getPolicy(meta.name()))) 197 .collect(Collectors.toList()); 198 } 199 200 @SuppressWarnings("unchecked") 201 protected Codec<Record> getCodecForStreams(String name, Set<String> streams) { 202 Codec<Record> codec = null; 203 Set<String> codecNames = new HashSet<>(); 204 for (String stream : streams) { 205 codec = settings.getCodec(stream); 206 codecNames.add(codec == null ? "none" : codec.getName()); 207 } 208 if (codecNames.size() > 1) { 209 throw new IllegalArgumentException(String.format("Different codecs for computation %s: %s", name, 210 Arrays.toString(codecNames.toArray()))); 211 } 212 if (codec == null) { 213 codec = NO_CODEC; 214 } 215 return codec; 216 } 217 218 protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) { 219 int threads = settings.getConcurrency(meta.name()); 220 Map<String, Integer> streams = new HashMap<>(); 221 meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName))); 222 return KafkaUtils.roundRobinAssignments(threads, streams); 223 } 224 225 protected void initStreams() { 226 log.debug("Initializing streams"); 227 topology.streamsSet().forEach(streamName -> { 228 if (manager.exists(streamName)) { 229 int size = manager.size(streamName); 230 if (settings.getPartitions(streamName) != size) { 231 log.debug(String.format( 232 "Update settings for stream: %s defined with %d partitions but exists with %d partitions", 233 streamName, settings.getPartitions(streamName), size)); 234 settings.setPartitions(streamName, size); 235 } 236 } else { 237 manager.createIfNotExists(streamName, settings.getPartitions(streamName)); 238 } 239 }); 240 } 241 242 protected void initSourceAppenders() { 243 log.debug("Initializing source appenders so we ensure they use codec defined in the processor"); 244 topology.streamsSet() 245 .forEach(sourceStream -> manager.getAppender(sourceStream, settings.getCodec(sourceStream))); 246 } 247 248}