001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.lang.Math.min; 022import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 023 024import java.time.Duration; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Objects; 033import java.util.Set; 034import java.util.stream.Collectors; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.jgrapht.experimental.dag.DirectedAcyclicGraph; 039import org.jgrapht.graph.DefaultEdge; 040import org.nuxeo.lib.stream.StreamRuntimeException; 041import org.nuxeo.lib.stream.codec.Codec; 042import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 043import org.nuxeo.lib.stream.computation.Record; 044import org.nuxeo.lib.stream.computation.Settings; 045import org.nuxeo.lib.stream.computation.StreamProcessor; 046import org.nuxeo.lib.stream.computation.Topology; 047import org.nuxeo.lib.stream.computation.Watermark; 048import org.nuxeo.lib.stream.log.Latency; 049import org.nuxeo.lib.stream.log.LogManager; 050import org.nuxeo.lib.stream.log.LogPartition; 051import org.nuxeo.lib.stream.log.Name; 052import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 053 054import com.fasterxml.jackson.core.JsonProcessingException; 055import com.fasterxml.jackson.databind.ObjectMapper; 056import com.fasterxml.jackson.databind.node.ArrayNode; 057import com.fasterxml.jackson.databind.node.ObjectNode; 058 059/** 060 * @since 9.3 061 */ 062public class LogStreamProcessor implements StreamProcessor { 063 private static final Log log = LogFactory.getLog(LogStreamProcessor.class); 064 065 protected final LogManager manager; 066 067 protected Topology topology; 068 069 protected Settings settings; 070 071 protected List<ComputationPool> pools; 072 073 protected LogStreamManager streamManager; 074 075 protected final boolean needRegister; 076 077 protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 078 079 @Deprecated 080 public LogStreamProcessor(LogManager manager) { 081 needRegister = true; 082 this.manager = manager; 083 this.streamManager = new LogStreamManager(manager); 084 } 085 086 public LogStreamProcessor(LogStreamManager streamManager) { 087 needRegister = false; 088 this.streamManager = streamManager; 089 this.manager = streamManager.getLogManager(); 090 } 091 092 @Override 093 public StreamProcessor init(Topology topology, Settings settings) { 094 log.debug("Initializing ..."); 095 this.topology = topology; 096 this.settings = settings; 097 if (needRegister) { 098 // backward compat when using a LogManager instead of StreamManager 099 streamManager.register("_", topology, settings); 100 } 101 return this; 102 } 103 104 @Override 105 public void start() { 106 log.debug("Starting ..."); 107 this.pools = initPools(); 108 Objects.requireNonNull(pools); 109 pools.forEach(ComputationPool::start); 110 } 111 112 @Override 113 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 114 for (ComputationPool pool : pools) { 115 // TODO: consider decreasing timeout 116 if (!pool.waitForAssignments(timeout)) { 117 return false; 118 } 119 } 120 return true; 121 } 122 123 @Override 124 public boolean isTerminated() { 125 return pools.stream().allMatch(ComputationPool::isTerminated); 126 } 127 128 @Override 129 public String toJson(Map<String, String> meta) { 130 try { 131 ObjectNode ret = OBJECT_MAPPER.createObjectNode(); 132 ObjectNode metaNode = OBJECT_MAPPER.createObjectNode(); 133 meta.forEach((key, value) -> metaNode.put(key, value)); 134 ret.set("metadata", metaNode); 135 // list streams with settings 136 ArrayNode streamsNode = OBJECT_MAPPER.createArrayNode(); 137 topology.streamsSet().forEach(stream -> { 138 ObjectNode item = OBJECT_MAPPER.createObjectNode(); 139 item.put("name", stream); 140 item.put("partitions", settings.getPartitions(stream)); 141 item.put("codec", settings.getCodec(stream).getName()); 142 streamsNode.add(item); 143 }); 144 ret.set("streams", streamsNode); 145 // list computations with settings 146 ArrayNode computationsNode = OBJECT_MAPPER.createArrayNode(); 147 topology.metadataList().forEach(comp -> { 148 ObjectNode item = OBJECT_MAPPER.createObjectNode(); 149 item.put("name", comp.name()); 150 item.put("threads", settings.getConcurrency(comp.name())); 151 item.put("continueOnFailure", settings.getPolicy(comp.name()).continueOnFailure()); 152 item.put("batchCapacity", settings.getPolicy(comp.name()).getBatchCapacity()); 153 item.put("batchThresholdMs", settings.getPolicy(comp.name()).getBatchThreshold().toMillis()); 154 item.put("maxRetries", settings.getPolicy(comp.name()).getRetryPolicy().getMaxRetries()); 155 item.put("retryDelayMs", settings.getPolicy(comp.name()).getRetryPolicy().getDelay().toMillis()); 156 computationsNode.add(item); 157 }); 158 ret.set("computations", computationsNode); 159 // list DAG edges 160 ArrayNode topologyNode = OBJECT_MAPPER.createArrayNode(); 161 DirectedAcyclicGraph<Topology.Vertex, DefaultEdge> dag = topology.getDag(); 162 for (DefaultEdge edge : dag.edgeSet()) { 163 ArrayNode edgeNode = OBJECT_MAPPER.createArrayNode(); 164 edgeNode.add(getEdgeName(dag.getEdgeSource(edge))); 165 edgeNode.add(getEdgeName(dag.getEdgeTarget(edge))); 166 topologyNode.add(edgeNode); 167 } 168 ret.set("topology", topologyNode); 169 String json = OBJECT_MAPPER.writer().writeValueAsString(ret); 170 if (log.isDebugEnabled()) { 171 log.debug("Starting processor: " + json); 172 } 173 return json; 174 } catch (JsonProcessingException e) { 175 throw new StreamRuntimeException("Fail to dump processor as JSON", e); 176 } 177 } 178 179 protected String getEdgeName(Topology.Vertex edge) { 180 return (edge.getType().equals(Topology.VertexType.COMPUTATION) ? "computation:" : "stream:") + edge.getName(); 181 } 182 183 @Override 184 public boolean stop(Duration timeout) { 185 log.debug("Stopping ..."); 186 if (pools == null) { 187 return true; 188 } 189 long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count(); 190 log.debug(String.format("Stopped %d failure", failures)); 191 return failures == 0; 192 } 193 194 @Override 195 public boolean drainAndStop(Duration timeout) { 196 // here the order matters, this must be done sequentially 197 log.debug("Drain and stop"); 198 if (pools == null) { 199 return true; 200 } 201 long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count(); 202 log.debug(String.format("Drained and stopped %d failure", failures)); 203 return failures == 0; 204 } 205 206 @Override 207 public void shutdown() { 208 log.debug("Shutdown ..."); 209 if (pools == null) { 210 return; 211 } 212 pools.parallelStream().forEach(ComputationPool::shutdown); 213 log.debug("Shutdown done"); 214 } 215 216 @Override 217 public long getLowWatermark() { 218 Map<String, Long> watermarks = new HashMap<>(pools.size()); 219 Set<String> roots = topology.getRoots(); 220 Map<String, Long> watermarkTrees = new HashMap<>(roots.size()); 221 // compute low watermark for each tree of computation 222 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 223 for (String root : roots) { 224 watermarkTrees.put(root, 225 topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0)); 226 } 227 // return the minimum wm for all trees that are not 0 228 long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::valueOf).min().orElse(0); 229 if (log.isTraceEnabled()) { 230 log.trace("lowWatermark: " + ret); 231 watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v)); 232 } 233 return ret; 234 } 235 236 @Override 237 public Latency getLatency(String computationName) { 238 Set<String> ancestorsComputations = topology.getAncestorComputationNames(computationName); 239 ancestorsComputations.add(computationName); 240 List<Latency> latencies = new ArrayList<>(); 241 ancestorsComputations.forEach( 242 comp -> topology.getMetadata(comp) 243 .inputStreams() 244 .forEach(stream -> latencies.add( 245 manager.getLatency(Name.ofUrn(stream), Name.ofUrn(comp), 246 settings.getCodec(comp), 247 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), 248 (Record::getKey))))); 249 return Latency.of(latencies); 250 } 251 252 @Override 253 public long getLowWatermark(String computationName) { 254 Objects.requireNonNull(computationName); 255 // the low wm for a computation is the minimum watermark for all its ancestors 256 Map<String, Long> watermarks = new HashMap<>(pools.size()); 257 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 258 long ret = topology.getAncestorComputationNames(computationName) 259 .stream() 260 .mapToLong(watermarks::get) 261 .min() 262 .orElse(0); 263 ret = min(ret, watermarks.get(computationName)); 264 return ret; 265 } 266 267 @Override 268 public boolean isDone(long timestamp) { 269 return Watermark.ofValue(getLowWatermark()).isDone(timestamp); 270 } 271 272 protected List<ComputationPool> initPools() { 273 log.debug("Initializing pools"); 274 return topology.metadataList() 275 .stream() 276 .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta, 277 getDefaultAssignments(meta), streamManager, 278 settings.getPolicy(meta.name()))) 279 .collect(Collectors.toList()); 280 } 281 282 @SuppressWarnings("unchecked") 283 protected Codec<Record> getCodecForStreams(String name, Set<String> streams) { 284 Codec<Record> codec = null; 285 Set<String> codecNames = new HashSet<>(); 286 for (String stream : streams) { 287 codec = settings.getCodec(stream); 288 codecNames.add(codec == null ? "none" : codec.getName()); 289 } 290 if (codecNames.size() > 1) { 291 throw new IllegalArgumentException(String.format("Different codecs for computation %s: %s", name, 292 Arrays.toString(codecNames.toArray()))); 293 } 294 if (codec == null) { 295 codec = NO_CODEC; 296 } 297 return codec; 298 } 299 300 protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) { 301 int threads = settings.getConcurrency(meta.name()); 302 if (threads == 0) { 303 return Collections.emptyList(); 304 } 305 Map<String, Integer> streams = new HashMap<>(); 306 meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName))); 307 return KafkaUtils.roundRobinAssignments(threads, streams); 308 } 309}