001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.util.concurrent.Executors.newFixedThreadPool; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.function.Supplier; 032import java.util.stream.Collectors; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.nuxeo.lib.stream.computation.Computation; 037import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 038import org.nuxeo.lib.stream.computation.Watermark; 039import org.nuxeo.lib.stream.log.LogManager; 040import org.nuxeo.lib.stream.log.LogPartition; 041 042/** 043 * Pool of ComputationRunner 044 * 045 * @since 9.3 046 */ 047public class ComputationPool { 048 private static final Log log = LogFactory.getLog(ComputationPool.class); 049 050 protected final ComputationMetadataMapping metadata; 051 052 protected final int threads; 053 054 protected final LogManager manager; 055 056 protected final Supplier<Computation> supplier; 057 058 protected final List<List<LogPartition>> defaultAssignments; 059 060 protected final List<ComputationRunner> runners; 061 062 protected ExecutorService threadPool; 063 064 public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 065 List<List<LogPartition>> defaultAssignments, LogManager manager) { 066 this.supplier = supplier; 067 this.manager = manager; 068 this.metadata = metadata; 069 this.threads = defaultAssignments.size(); 070 this.defaultAssignments = defaultAssignments; 071 this.runners = new ArrayList<>(threads); 072 } 073 074 public String getComputationName() { 075 return metadata.name(); 076 } 077 078 @SuppressWarnings("FutureReturnValueIgnored") 079 public void start() { 080 log.info(metadata.name() + ": Starting pool"); 081 threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool")); 082 defaultAssignments.forEach(assignments -> { 083 ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, manager); 084 threadPool.submit(runner); 085 runners.add(runner); 086 }); 087 // close the pool no new admission 088 threadPool.shutdown(); 089 log.debug(metadata.name() + ": Pool started, threads: " + threads); 090 } 091 092 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 093 log.info(metadata.name() + ": Wait for partitions assignments"); 094 if (threadPool == null || threadPool.isTerminated()) { 095 return true; 096 } 097 for (ComputationRunner runner : runners) { 098 if (!runner.waitForAssignments(timeout)) { 099 return false; 100 } 101 } 102 return true; 103 } 104 105 public boolean drainAndStop(Duration timeout) { 106 if (threadPool == null || threadPool.isTerminated()) { 107 return true; 108 } 109 log.info(metadata.name() + ": Draining"); 110 runners.forEach(ComputationRunner::drain); 111 boolean ret = awaitPoolTermination(timeout); 112 stop(Duration.ofSeconds(1)); 113 return ret; 114 } 115 116 public boolean stop(Duration timeout) { 117 if (threadPool == null || threadPool.isTerminated()) { 118 return true; 119 } 120 log.info(metadata.name() + ": Stopping"); 121 runners.forEach(ComputationRunner::stop); 122 boolean ret = awaitPoolTermination(timeout); 123 shutdown(); 124 return ret; 125 } 126 127 public void shutdown() { 128 if (threadPool != null && !threadPool.isTerminated()) { 129 log.info(metadata.name() + ": Shutting down"); 130 threadPool.shutdownNow(); 131 // give a chance to end threads with valid tailer when shutdown is followed by streams.close() 132 try { 133 threadPool.awaitTermination(1, TimeUnit.SECONDS); 134 } catch (InterruptedException e) { 135 Thread.currentThread().interrupt(); 136 log.warn(metadata.name() + ": Interrupted in shutdown"); 137 } 138 } 139 runners.clear(); 140 threadPool = null; 141 } 142 143 protected boolean awaitPoolTermination(Duration timeout) { 144 try { 145 if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 146 log.warn(metadata.name() + ": Timeout on wait for pool termination"); 147 return false; 148 } 149 } catch (InterruptedException e) { 150 log.warn(metadata.name() + ": Interrupted while waiting for pool termination"); 151 Thread.currentThread().interrupt(); 152 return false; 153 } 154 return true; 155 } 156 157 public long getLowWatermark() { 158 // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0) 159 Set<Watermark> watermarks = runners.stream() 160 .map(ComputationRunner::getLowWatermark) 161 .filter(wm -> wm.getValue() > 1) 162 .collect(Collectors.toSet()); 163 // Take the lowest watermark of unprocessed (not completed) records 164 long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0); 165 boolean pending = true; 166 if (ret == 0) { 167 pending = false; 168 // There is no known pending records we take the max completed low watermark 169 ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0); 170 } 171 if (log.isTraceEnabled() && ret > 0) 172 log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed")); 173 return ret; 174 } 175 176 protected static class NamedThreadFactory implements ThreadFactory { 177 protected final AtomicInteger count = new AtomicInteger(0); 178 179 protected final String prefix; 180 181 public NamedThreadFactory(String prefix) { 182 this.prefix = prefix; 183 } 184 185 @SuppressWarnings("NullableProblems") 186 @Override 187 public Thread newThread(Runnable r) { 188 Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement())); 189 t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e)); 190 return t; 191 } 192 } 193 194}