001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.util.concurrent.Executors.newFixedThreadPool; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Objects; 027import java.util.Set; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.function.Supplier; 033import java.util.stream.Collectors; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.computation.Computation; 038import org.nuxeo.lib.stream.computation.ComputationMetadataMapping; 039import org.nuxeo.lib.stream.computation.ComputationPolicy; 040import org.nuxeo.lib.stream.computation.Watermark; 041import org.nuxeo.lib.stream.log.LogPartition; 042 043/** 044 * Pool of ComputationRunner 045 * 046 * @since 9.3 047 */ 048public class ComputationPool { 049 private static final Log log = LogFactory.getLog(ComputationPool.class); 050 051 protected final ComputationMetadataMapping metadata; 052 053 protected final int threads; 054 055 protected final Supplier<Computation> supplier; 056 057 protected final List<List<LogPartition>> defaultAssignments; 058 059 protected final List<ComputationRunner> runners; 060 061 protected final LogStreamManager streamManager; 062 063 protected final ComputationPolicy policy; 064 065 protected ExecutorService threadPool; 066 067 public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, 068 List<List<LogPartition>> defaultAssignments, LogStreamManager streamManager, ComputationPolicy policy) { 069 Objects.requireNonNull(policy); 070 this.supplier = supplier; 071 this.metadata = metadata; 072 this.threads = defaultAssignments.size(); 073 this.streamManager = streamManager; 074 this.defaultAssignments = defaultAssignments; 075 this.policy = policy; 076 this.runners = new ArrayList<>(threads); 077 } 078 079 public String getComputationName() { 080 return metadata.name(); 081 } 082 083 @SuppressWarnings("FutureReturnValueIgnored") 084 public void start() { 085 if (threads == 0) { 086 log.info(metadata.name() + ": Empty pool"); 087 return; 088 } 089 log.info(metadata.name() + ": Starting pool"); 090 threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool")); 091 defaultAssignments.forEach(assignments -> { 092 ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, streamManager, policy); 093 threadPool.submit(runner); 094 runners.add(runner); 095 }); 096 // close the pool no new admission 097 threadPool.shutdown(); 098 log.debug(metadata.name() + ": Pool started, threads: " + threads); 099 } 100 101 public boolean isTerminated() { 102 return threadPool == null ? true : threadPool.isTerminated(); 103 } 104 105 public boolean waitForAssignments(Duration timeout) throws InterruptedException { 106 log.info(metadata.name() + ": Wait for partitions assignments"); 107 if (threadPool == null || threadPool.isTerminated()) { 108 return true; 109 } 110 for (ComputationRunner runner : runners) { 111 if (!runner.waitForAssignments(timeout)) { 112 return false; 113 } 114 } 115 return true; 116 } 117 118 public boolean drainAndStop(Duration timeout) { 119 if (threadPool == null || threadPool.isTerminated()) { 120 return true; 121 } 122 log.info(metadata.name() + ": Draining"); 123 runners.forEach(ComputationRunner::drain); 124 boolean ret = awaitPoolTermination(timeout); 125 stop(Duration.ofSeconds(1)); 126 return ret; 127 } 128 129 public boolean stop(Duration timeout) { 130 if (threadPool == null || threadPool.isTerminated()) { 131 return true; 132 } 133 log.info(metadata.name() + ": Stopping"); 134 runners.forEach(ComputationRunner::stop); 135 boolean ret = awaitPoolTermination(timeout); 136 shutdown(); 137 return ret; 138 } 139 140 public void shutdown() { 141 if (threadPool != null && !threadPool.isTerminated()) { 142 log.info(metadata.name() + ": Shutting down"); 143 threadPool.shutdownNow(); 144 // give a chance to end threads with valid tailer when shutdown is followed by streams.close() 145 try { 146 threadPool.awaitTermination(1, TimeUnit.SECONDS); 147 } catch (InterruptedException e) { 148 Thread.currentThread().interrupt(); 149 log.warn(metadata.name() + ": Interrupted in shutdown"); 150 } 151 } 152 runners.clear(); 153 threadPool = null; 154 } 155 156 protected boolean awaitPoolTermination(Duration timeout) { 157 try { 158 if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { 159 log.warn(metadata.name() + ": Timeout on wait for pool termination"); 160 return false; 161 } 162 } catch (InterruptedException e) { 163 Thread.currentThread().interrupt(); 164 log.warn(metadata.name() + ": Interrupted while waiting for pool termination"); 165 return false; 166 } 167 return true; 168 } 169 170 public long getLowWatermark() { 171 // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0) 172 Set<Watermark> watermarks = runners.stream() 173 .map(ComputationRunner::getLowWatermark) 174 .filter(wm -> wm.getValue() > 1) 175 .collect(Collectors.toSet()); 176 // Take the lowest watermark of unprocessed (not completed) records 177 long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0); 178 boolean pending = true; 179 if (ret == 0) { 180 pending = false; 181 // There is no known pending records we take the max completed low watermark 182 ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0); 183 } 184 if (log.isTraceEnabled() && ret > 0) 185 log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed")); 186 return ret; 187 } 188 189 protected static class NamedThreadFactory implements ThreadFactory { 190 protected final AtomicInteger count = new AtomicInteger(0); 191 192 protected final String prefix; 193 194 public NamedThreadFactory(String prefix) { 195 this.prefix = prefix; 196 } 197 198 @SuppressWarnings("NullableProblems") 199 @Override 200 public Thread newThread(Runnable r) { 201 Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement())); 202 t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e)); 203 return t; 204 } 205 } 206 207}