001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.util.concurrent.Executors.newFixedThreadPool; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Objects; 027import java.util.Set; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.function.Supplier; 033import java.util.stream.Collectors; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.codec.Codec; 038import org.nuxeo.lib.stream.computation.Computation; 039import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 040import org.nuxeo.lib.stream.computation.ComputationPolicy; 041import org.nuxeo.lib.stream.computation.Record; 042import org.nuxeo.lib.stream.computation.Watermark; 043import org.nuxeo.lib.stream.log.LogManager; 044import org.nuxeo.lib.stream.log.LogPartition; 045 046/** 047 * Pool of ComputationRunner 048 * 049 * @since 9.3 050 */ 051public class ComputationPool { 052 private static final Log log = LogFactory.getLog(ComputationPool.class); 053 054 protected final ComputationMetadataMapping metadata; 055 056 protected final int threads; 057 058 protected final LogManager manager; 059 060 protected final Supplier<Computation> supplier; 061 062 protected final List<List<LogPartition>> defaultAssignments; 063 064 protected final List<ComputationRunner> runners; 065 066 protected final Codec<Record> inputCodec; 067 068 protected final Codec<Record> outputCodec; 069 070 protected final ComputationPolicy policy; 071 072 protected ExecutorService threadPool; 073 074 public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 075 List<List<LogPartition>> defaultAssignments, LogManager manager, Codec<Record> inputCodec, 076 Codec<Record> outputCodec, ComputationPolicy policy) { 077 Objects.requireNonNull(inputCodec); 078 Objects.requireNonNull(outputCodec); 079 Objects.requireNonNull(policy); 080 this.supplier = supplier; 081 this.manager = manager; 082 this.metadata = metadata; 083 this.threads = defaultAssignments.size(); 084 this.inputCodec = inputCodec; 085 this.outputCodec = outputCodec; 086 this.defaultAssignments = defaultAssignments; 087 this.policy = policy; 088 this.runners = new ArrayList<>(threads); 089 } 090 091 public String getComputationName() { 092 return metadata.name(); 093 } 094 095 @SuppressWarnings("FutureReturnValueIgnored") 096 public void start() { 097 log.info(metadata.name() + ": Starting pool"); 098 threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool")); 099 defaultAssignments.forEach(assignments -> { 100 ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, manager, inputCodec, 101 outputCodec, policy); 102 threadPool.submit(runner); 103 runners.add(runner); 104 }); 105 // close the pool no new admission 106 threadPool.shutdown(); 107 log.debug(metadata.name() + ": Pool started, threads: " + threads); 108 } 109 110 public boolean isTerminated() { 111 return threadPool.isTerminated(); 112 } 113 114 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 115 log.info(metadata.name() + ": Wait for partitions assignments"); 116 if (threadPool == null || threadPool.isTerminated()) { 117 return true; 118 } 119 for (ComputationRunner runner : runners) { 120 if (!runner.waitForAssignments(timeout)) { 121 return false; 122 } 123 } 124 return true; 125 } 126 127 public boolean drainAndStop(Duration timeout) { 128 if (threadPool == null || threadPool.isTerminated()) { 129 return true; 130 } 131 log.info(metadata.name() + ": Draining"); 132 runners.forEach(ComputationRunner::drain); 133 boolean ret = awaitPoolTermination(timeout); 134 stop(Duration.ofSeconds(1)); 135 return ret; 136 } 137 138 public boolean stop(Duration timeout) { 139 if (threadPool == null || threadPool.isTerminated()) { 140 return true; 141 } 142 log.info(metadata.name() + ": Stopping"); 143 runners.forEach(ComputationRunner::stop); 144 boolean ret = awaitPoolTermination(timeout); 145 shutdown(); 146 return ret; 147 } 148 149 public void shutdown() { 150 if (threadPool != null && !threadPool.isTerminated()) { 151 log.info(metadata.name() + ": Shutting down"); 152 threadPool.shutdownNow(); 153 // give a chance to end threads with valid tailer when shutdown is followed by streams.close() 154 try { 155 threadPool.awaitTermination(1, TimeUnit.SECONDS); 156 } catch (InterruptedException e) { 157 Thread.currentThread().interrupt(); 158 log.warn(metadata.name() + ": Interrupted in shutdown"); 159 } 160 } 161 runners.clear(); 162 threadPool = null; 163 } 164 165 protected boolean awaitPoolTermination(Duration timeout) { 166 try { 167 if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 168 log.warn(metadata.name() + ": Timeout on wait for pool termination"); 169 return false; 170 } 171 } catch (InterruptedException e) { 172 Thread.currentThread().interrupt(); 173 log.warn(metadata.name() + ": Interrupted while waiting for pool termination"); 174 return false; 175 } 176 return true; 177 } 178 179 public long getLowWatermark() { 180 // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0) 181 Set<Watermark> watermarks = runners.stream() 182 .map(ComputationRunner::getLowWatermark) 183 .filter(wm -> wm.getValue() > 1) 184 .collect(Collectors.toSet()); 185 // Take the lowest watermark of unprocessed (not completed) records 186 long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0); 187 boolean pending = true; 188 if (ret == 0) { 189 pending = false; 190 // There is no known pending records we take the max completed low watermark 191 ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0); 192 } 193 if (log.isTraceEnabled() && ret > 0) 194 log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed")); 195 return ret; 196 } 197 198 protected static class NamedThreadFactory implements ThreadFactory { 199 protected final AtomicInteger count = new AtomicInteger(0); 200 201 protected final String prefix; 202 203 public NamedThreadFactory(String prefix) { 204 this.prefix = prefix; 205 } 206 207 @SuppressWarnings("NullableProblems") 208 @Override 209 public Thread newThread(Runnable r) { 210 Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement())); 211 t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e)); 212 return t; 213 } 214 } 215 216}