001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.util.concurrent.Executors.newFixedThreadPool; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.function.Supplier; 032import java.util.stream.Collectors; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.lib.stream.computation.Computation; 037import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 038import org.nuxeo.lib.stream.computation.Watermark; 039import org.nuxeo.lib.stream.log.LogManager; 040import org.nuxeo.lib.stream.log.LogPartition; 041 042/** 043 * Pool of ComputationRunner 044 * 045 * @since 9.3 046 */ 047public class ComputationPool { 048 private static final Log log = LogFactory.getLog(ComputationPool.class); 049 050 protected final ComputationMetadataMapping metadata; 051 052 protected final int threads; 053 054 protected final LogManager manager; 055 056 protected final Supplier<Computation> supplier; 057 058 protected final List<List<LogPartition>> defaultAssignments; 059 060 protected final List<ComputationRunner> runners; 061 062 protected ExecutorService threadPool; 063 064 public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 065 List<List<LogPartition>> defaultAssignments, LogManager manager) { 066 this.supplier = supplier; 067 this.manager = manager; 068 this.metadata = metadata; 069 this.threads = defaultAssignments.size(); 070 this.defaultAssignments = defaultAssignments; 071 this.runners = new ArrayList<>(threads); 072 } 073 074 public String getComputationName() { 075 return metadata.name(); 076 } 077 078 @SuppressWarnings("FutureReturnValueIgnored") 079 public void start() { 080 log.info(metadata.name() + ": Starting pool"); 081 threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool")); 082 defaultAssignments.forEach(assignments -> { 083 ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, manager); 084 threadPool.submit(runner); 085 runners.add(runner); 086 }); 087 // close the pool no new admission 088 threadPool.shutdown(); 089 log.debug(metadata.name() + ": Pool started, threads: " + threads); 090 } 091 092 public boolean isTerminated() { 093 return threadPool.isTerminated(); 094 } 095 096 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 097 log.info(metadata.name() + ": Wait for partitions assignments"); 098 if (threadPool == null || threadPool.isTerminated()) { 099 return true; 100 } 101 for (ComputationRunner runner : runners) { 102 if (!runner.waitForAssignments(timeout)) { 103 return false; 104 } 105 } 106 return true; 107 } 108 109 public boolean drainAndStop(Duration timeout) { 110 if (threadPool == null || threadPool.isTerminated()) { 111 return true; 112 } 113 log.info(metadata.name() + ": Draining"); 114 runners.forEach(ComputationRunner::drain); 115 boolean ret = awaitPoolTermination(timeout); 116 stop(Duration.ofSeconds(1)); 117 return ret; 118 } 119 120 public boolean stop(Duration timeout) { 121 if (threadPool == null || threadPool.isTerminated()) { 122 return true; 123 } 124 log.info(metadata.name() + ": Stopping"); 125 runners.forEach(ComputationRunner::stop); 126 boolean ret = awaitPoolTermination(timeout); 127 shutdown(); 128 return ret; 129 } 130 131 public void shutdown() { 132 if (threadPool != null && !threadPool.isTerminated()) { 133 log.info(metadata.name() + ": Shutting down"); 134 threadPool.shutdownNow(); 135 // give a chance to end threads with valid tailer when shutdown is followed by streams.close() 136 try { 137 threadPool.awaitTermination(1, TimeUnit.SECONDS); 138 } catch (InterruptedException e) { 139 Thread.currentThread().interrupt(); 140 log.warn(metadata.name() + ": Interrupted in shutdown"); 141 } 142 } 143 runners.clear(); 144 threadPool = null; 145 } 146 147 protected boolean awaitPoolTermination(Duration timeout) { 148 try { 149 if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 150 log.warn(metadata.name() + ": Timeout on wait for pool termination"); 151 return false; 152 } 153 } catch (InterruptedException e) { 154 Thread.currentThread().interrupt(); 155 log.warn(metadata.name() + ": Interrupted while waiting for pool termination"); 156 return false; 157 } 158 return true; 159 } 160 161 public long getLowWatermark() { 162 // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0) 163 Set<Watermark> watermarks = runners.stream() 164 .map(ComputationRunner::getLowWatermark) 165 .filter(wm -> wm.getValue() > 1) 166 .collect(Collectors.toSet()); 167 // Take the lowest watermark of unprocessed (not completed) records 168 long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0); 169 boolean pending = true; 170 if (ret == 0) { 171 pending = false; 172 // There is no known pending records we take the max completed low watermark 173 ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0); 174 } 175 if (log.isTraceEnabled() && ret > 0) 176 log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed")); 177 return ret; 178 } 179 180 protected static class NamedThreadFactory implements ThreadFactory { 181 protected final AtomicInteger count = new AtomicInteger(0); 182 183 protected final String prefix; 184 185 public NamedThreadFactory(String prefix) { 186 this.prefix = prefix; 187 } 188 189 @SuppressWarnings("NullableProblems") 190 @Override 191 public Thread newThread(Runnable r) { 192 Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement())); 193 t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e)); 194 return t; 195 } 196 } 197 198}