001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationManager; 024import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping; 025import org.nuxeo.ecm.platform.importer.mqueues.computation.Record; 026import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings; 027import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology; 028import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark; 029import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 030import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 031import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils; 032 033import java.time.Duration; 034import java.util.Comparator; 035import java.util.HashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Objects; 039import java.util.Set; 040import java.util.stream.Collectors; 041 042/** 043 * @since 9.2 044 */ 045public class MQComputationManager implements ComputationManager { 046 private static final Log log = LogFactory.getLog(MQComputationManager.class); 047 private final MQManager<Record> manager; 048 private final Topology topology; 049 private final Settings settings; 050 private final List<MQComputationPool> pools; 051 052 public MQComputationManager(MQManager<Record> manager, Topology topology, Settings settings) { 053 this.manager = manager; 054 this.topology = topology; 055 this.settings = settings; 056 initStreams(); 057 this.pools = initPools(); 058 Objects.requireNonNull(pools); 059 } 060 061 @Override 062 public void start() { 063 log.debug("Starting ..."); 064 pools.forEach(MQComputationPool::start); 065 } 066 067 @Override 068 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 069 for (MQComputationPool pool : pools) { 070 if (!pool.waitForAssignments(timeout)) { 071 return false; 072 } 073 } 074 return true; 075 } 076 077 @Override 078 public boolean stop(Duration timeout) { 079 log.debug("Starting ..."); 080 long failures = pools.parallelStream().filter(comp -> !comp.stop(timeout)).count(); 081 log.debug(String.format("Stopped %d failure", failures)); 082 return failures == 0L; 083 } 084 085 @Override 086 public boolean stop() { 087 return stop(Duration.ofSeconds(1)); 088 } 089 090 @Override 091 public boolean drainAndStop(Duration timeout) { 092 // here the order matters, this must be done sequentially 093 log.debug("Drain and stop"); 094 long failures = pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count(); 095 log.debug(String.format("Drained and stopped %d failure", failures)); 096 return failures == 0L; 097 } 098 099 @Override 100 public void shutdown() { 101 log.debug("Shutdown ..."); 102 pools.parallelStream().forEach(MQComputationPool::shutdown); 103 log.debug("Shutdown done"); 104 } 105 106 107 @Override 108 public long getLowWatermark() { 109 Map<String, Long> watermarks = new HashMap<>(pools.size()); 110 Set<String> roots = topology.getRoots(); 111 Map<String, Long> watermarkTrees = new HashMap<>(roots.size()); 112 // compute low watermark for each tree of computation 113 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 114 for (String root : roots) { 115 watermarkTrees.put(root, topology.getDescendantComputationNames(root).stream().map(watermarks::get) 116 .min(Comparator.naturalOrder()).orElse(0L)); 117 } 118 // return the minimum wm for all trees that are not 0 119 long ret = watermarkTrees.values().stream() 120 .filter(wm -> wm > 1).min(Comparator.naturalOrder()).orElse(0L); 121 if (log.isTraceEnabled()) { 122 log.trace("lowWatermark: " + ret); 123 watermarkTrees.forEach((k, v) -> log.trace("tree " + k + ": " + v)); 124 // topology.metadataList().forEach(meta -> System.out.println(" low " + meta.name + " : \t" + getLowWatermark(meta.name))); 125 } 126 return ret; 127 } 128 129 @Override 130 public long getLowWatermark(String computationName) { 131 Objects.nonNull(computationName); 132 // the low wm for a computation is the minimum watermark for all its ancestors 133 Map<String, Long> watermarks = new HashMap<>(pools.size()); 134 pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark())); 135 long ret = topology.getAncestorComputationNames(computationName).stream().map(watermarks::get) 136 .min(Comparator.naturalOrder()).orElse(0L); 137 ret = Math.min(ret, watermarks.get(computationName)); 138 return ret; 139 } 140 141 @Override 142 public boolean isDone(long timestamp) { 143 return Watermark.ofValue(getLowWatermark()).isDone(timestamp); 144 } 145 146 private List<MQComputationPool> initPools() { 147 log.debug("Initializing pools"); 148 return topology.metadataList().stream() 149 .map(meta -> new MQComputationPool(topology.getSupplier(meta.name()), meta, 150 getDefaultAssignments(meta), manager)) 151 .collect(Collectors.toList()); 152 } 153 154 private List<List<MQPartition>> getDefaultAssignments(ComputationMetadataMapping meta) { 155 int threads = settings.getConcurrency(meta.name()); 156 Map<String, Integer> streams = new HashMap<>(); 157 meta.inputStreams().forEach(streamName -> streams.put(streamName, settings.getPartitions(streamName))); 158 return KafkaUtils.roundRobinAssignments(threads, streams); 159 } 160 161 private void initStreams() { 162 log.debug("Initializing streams"); 163 topology.streamsSet().forEach(streamName -> manager.createIfNotExists(streamName, settings.getPartitions(streamName))); 164 } 165 166}