001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.computation.Computation; 024import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping; 025import org.nuxeo.ecm.platform.importer.mqueues.computation.Record; 026import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark; 027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 028import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition; 029 030import java.time.Duration; 031import java.util.ArrayList; 032import java.util.Comparator; 033import java.util.List; 034import java.util.Set; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.function.Supplier; 040import java.util.stream.Collectors; 041 042import static java.util.concurrent.Executors.newFixedThreadPool; 043 044/** 045 * Pool of ComputationRunner 046 * 047 * @since 9.2 048 */ 049public class MQComputationPool { 050 private static final Log log = LogFactory.getLog(MQComputationPool.class); 051 private final ComputationMetadataMapping metadata; 052 private final int threads; 053 private final MQManager<Record> manager; 054 private final Supplier<Computation> supplier; 055 private final List<List<MQPartition>> defaultAssignments; 056 private ExecutorService threadPool; 057 private final List<MQComputationRunner> runners; 058 059 public MQComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, List<List<MQPartition>> defaultAssignments, MQManager<Record> manager) { 060 this.supplier = supplier; 061 this.manager = manager; 062 this.metadata = metadata; 063 this.threads = defaultAssignments.size(); 064 this.defaultAssignments = defaultAssignments; 065 this.runners = new ArrayList<>(threads); 066 } 067 068 public String getComputationName() { 069 return metadata.name(); 070 } 071 072 public void start() { 073 log.info(metadata.name() + ": Starting pool"); 074 threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool")); 075 defaultAssignments.forEach(assignments -> { 076 MQComputationRunner runner = new MQComputationRunner(supplier, metadata, assignments, manager); 077 threadPool.submit(runner); 078 runners.add(runner); 079 }); 080 // close the pool no new admission 081 threadPool.shutdown(); 082 log.debug(metadata.name() + ": Pool started, threads: " + threads); 083 } 084 085 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 086 log.info(metadata.name() + ": Wait for partitions assignments"); 087 if (threadPool == null || threadPool.isTerminated()) { 088 return true; 089 } 090 for (MQComputationRunner runner : runners) { 091 if (!runner.waitForAssignments(timeout)) { 092 return false; 093 } 094 } 095 return true; 096 } 097 098 public boolean drainAndStop(Duration timeout) { 099 if (threadPool == null || threadPool.isTerminated()) { 100 return true; 101 } 102 log.info(metadata.name() + ": Draining"); 103 runners.forEach(MQComputationRunner::drain); 104 boolean ret = awaitPoolTermination(timeout); 105 stop(Duration.ofSeconds(1)); 106 return ret; 107 } 108 109 public boolean stop(Duration timeout) { 110 if (threadPool == null || threadPool.isTerminated()) { 111 return true; 112 } 113 log.info(metadata.name() + ": Stopping"); 114 runners.forEach(MQComputationRunner::stop); 115 boolean ret = awaitPoolTermination(timeout); 116 shutdown(); 117 return ret; 118 } 119 120 public void shutdown() { 121 if (threadPool != null && !threadPool.isTerminated()) { 122 log.info(metadata.name() + ": Shutting down"); 123 threadPool.shutdownNow(); 124 // give a chance to end threads with valid tailer when shutdown is followed by streams.close() 125 try { 126 threadPool.awaitTermination(1, TimeUnit.SECONDS); 127 } catch (InterruptedException e) { 128 log.warn(metadata.name() + ": Interrupted in shutdown"); 129 Thread.currentThread().interrupt(); 130 } 131 } 132 runners.clear(); 133 threadPool = null; 134 } 135 136 private boolean awaitPoolTermination(Duration timeout) { 137 try { 138 if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 139 log.warn(metadata.name() + ": Timeout on wait for pool termination"); 140 return false; 141 } 142 } catch (InterruptedException e) { 143 log.warn(metadata.name() + ": Interrupted while waiting for pool termination"); 144 Thread.currentThread().interrupt(); 145 return false; 146 } 147 return true; 148 } 149 150 public long getLowWatermark() { 151 // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0) 152 Set<Watermark> watermarks = runners.stream().map(MQComputationRunner::getLowWatermark) 153 .filter(wm -> wm.getValue() > 1).collect(Collectors.toSet()); 154 // Take the lowest watermark of unprocessed (not completed) records 155 long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).map(Watermark::getValue) 156 .min(Comparator.naturalOrder()).orElse(0L); 157 boolean pending = true; 158 if (ret == 0) { 159 pending = false; 160 // There is no known pending records we take the max completed low watermark 161 ret = watermarks.stream().filter(Watermark::isCompleted).map(Watermark::getValue) 162 .max(Comparator.naturalOrder()).orElse(0L); 163 } 164 if (log.isTraceEnabled() && ret > 0) 165 log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed")); 166 return ret; 167 } 168 169 protected static class NamedThreadFactory implements ThreadFactory { 170 private final AtomicInteger count = new AtomicInteger(0); 171 private final String prefix; 172 173 public NamedThreadFactory(String prefix) { 174 this.prefix = prefix; 175 } 176 177 @SuppressWarnings("NullableProblems") 178 @Override 179 public Thread newThread(Runnable r) { 180 Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement())); 181 t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e)); 182 return t; 183 } 184 } 185 186}