001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import java.time.Duration; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Objects; 026import java.util.Set; 027import java.util.stream.Collectors; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 032import org.nuxeo.lib.stream.computation.Settings; 033import org.nuxeo.lib.stream.computation.StreamProcessor; 034import org.nuxeo.lib.stream.computation.Topology; 035import org.nuxeo.lib.stream.computation.Watermark; 036import org.nuxeo.lib.stream.log.LogManager; 037import org.nuxeo.lib.stream.log.LogPartition; 038import org.nuxeo.lib.stream.log.kafka.KafkaUtils; 039 040/** 041 * @since 9.3 042 */ 043public class LogStreamProcessor implements StreamProcessor { 044 private static final Log log = LogFactory.getLog(LogStreamProcessor.class); 045 046 protected final LogManager manager; 047 048 protected Topology topology; 049 050 protected Settings settings; 051 052 protected List<ComputationPool> pools; 053 054 public LogStreamProcessor(LogManager manager) { 055 this.manager = manager; 056 } 057 058 @Override 059 public StreamProcessor init(Topology topology, Settings settings) { 060 log.debug("Initializing ..."); 061 this.topology = topology; 062 this.settings = settings; 063 initStreams(); 064 return this; 065 } 066 067 @Override 068 public void start() { 069 log.debug("Starting ..."); 070 this.pools = initPools(); 071 Objects.requireNonNull(pools); 072 pools.forEach(ComputationPool::start); 073 } 074 075 @Override 076 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 077 for (ComputationPool pool : pools) { 078 // TODO: consider decreasing timeout 079 if (!pool.waitForAssignments(timeout)) { 080 return false; 081 } 082 } 083 return true; 084 } 085 086 @Override 087 public boolean stop(Duration timeout) { 088 log.debug("Stopping ..."); 089 if (pools == null) { 090 return true; 091 } 092 long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count(); 093 log.debug(String.format("Stopped %d failure", failures)); 094 return failures == 0; 095 } 096 097 @Override 098 public boolean drainAndStop(Duration timeout) { 099 // here the order matters, this must be done sequentially 100 log.debug("Drain and stop"); 101 if (pools == null) { 102 return true; 103 } 104 long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count(); 105 log.debug(String.format("Drained and stopped %d failure", failures)); 106 return failures == 0; 107 } 108 109 @Override 110 public void shutdown() { 111 log.debug("Shutdown ..."); 112 if (pools == null) { 113 return; 114 } 115 pools.parallelStream().forEach(ComputationPool::shutdown); 116 log.debug("Shutdown done"); 117 } 118 119 @Override 120 public long getLowWatermark() { 121 Map<String, Long> watermarks = new HashMap<>(pools.size()); 122 Set<String> roots = topology.getRoots(); 123 Map<String, Long> watermarkTrees = new HashMap<>(roots.size()); 124 // compute low watermark for each tree of computation 125 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 126 for (String root : roots) { 127 watermarkTrees.put(root, 128 topology.getDescendantComputationNames(root).stream().mapToLong(watermarks::get).min().orElse(0)); 129 } 130 // return the minimum wm for all trees that are not 0 131 long ret = watermarkTrees.values().stream().filter(wm -> wm > 1).mapToLong(Long::new).min().orElse(0); 132 if (log.isTraceEnabled()) { 133 log.trace("lowWatermark: " + ret); 134 watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v)); 135 // topology.metadataList().forEach(meta -> System.out.println(" low " + meta.name + " : \t" + 136 // getLowWatermark(meta.name))); 137 } 138 return ret; 139 } 140 141 @Override 142 public long getLowWatermark(String computationName) { 143 Objects.requireNonNull(computationName); 144 // the low wm for a computation is the minimum watermark for all its ancestors 145 Map<String, Long> watermarks = new HashMap<>(pools.size()); 146 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 147 long ret = topology.getAncestorComputationNames(computationName) 148 .stream() 149 .mapToLong(watermarks::get) 150 .min() 151 .orElse(0); 152 ret = Math.min(ret, watermarks.get(computationName)); 153 return ret; 154 } 155 156 @Override 157 public boolean isDone(long timestamp) { 158 return Watermark.ofValue(getLowWatermark()).isDone(timestamp); 159 } 160 161 protected List<ComputationPool> initPools() { 162 log.debug("Initializing pools"); 163 return topology.metadataList() 164 .stream() 165 .map(meta -> new ComputationPool(topology.getSupplier(meta.name()), meta, 166 getDefaultAssignments(meta), manager)) 167 .collect(Collectors.toList()); 168 } 169 170 protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta) { 171 int threads = settings.getConcurrency(meta.name()); 172 Map<String, Integer> streams = new HashMap<>(); 173 meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName))); 174 return KafkaUtils.roundRobinAssignments(threads, streams); 175 } 176 177 protected void initStreams() { 178 log.debug("Initializing streams"); 179 topology.streamsSet() 180 .forEach(streamName -> manager.createIfNotExists(streamName, settings.getPartitions(streamName))); 181 } 182 183}