001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.util.concurrent.Executors.newFixedThreadPool;
022
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.function.Supplier;
032import java.util.stream.Collectors;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.lib.stream.computation.Computation;
037import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
038import org.nuxeo.lib.stream.computation.Watermark;
039import org.nuxeo.lib.stream.log.LogManager;
040import org.nuxeo.lib.stream.log.LogPartition;
041
042/**
043 * Pool of ComputationRunner
044 *
045 * @since 9.3
046 */
047public class ComputationPool {
048    private static final Log log = LogFactory.getLog(ComputationPool.class);
049
050    protected final ComputationMetadataMapping metadata;
051
052    protected final int threads;
053
054    protected final LogManager manager;
055
056    protected final Supplier<Computation> supplier;
057
058    protected final List<List<LogPartition>> defaultAssignments;
059
060    protected final List<ComputationRunner> runners;
061
062    protected ExecutorService threadPool;
063
064    public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
065            List<List<LogPartition>> defaultAssignments, LogManager manager) {
066        this.supplier = supplier;
067        this.manager = manager;
068        this.metadata = metadata;
069        this.threads = defaultAssignments.size();
070        this.defaultAssignments = defaultAssignments;
071        this.runners = new ArrayList<>(threads);
072    }
073
074    public String getComputationName() {
075        return metadata.name();
076    }
077
078    @SuppressWarnings("FutureReturnValueIgnored")
079    public void start() {
080        log.info(metadata.name() + ": Starting pool");
081        threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool"));
082        defaultAssignments.forEach(assignments -> {
083            ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, manager);
084            threadPool.submit(runner);
085            runners.add(runner);
086        });
087        // close the pool no new admission
088        threadPool.shutdown();
089        log.debug(metadata.name() + ": Pool started, threads: " + threads);
090    }
091
092    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
093        log.info(metadata.name() + ": Wait for partitions assignments");
094        if (threadPool == null || threadPool.isTerminated()) {
095            return true;
096        }
097        for (ComputationRunner runner : runners) {
098            if (!runner.waitForAssignments(timeout)) {
099                return false;
100            }
101        }
102        return true;
103    }
104
105    public boolean drainAndStop(Duration timeout) {
106        if (threadPool == null || threadPool.isTerminated()) {
107            return true;
108        }
109        log.info(metadata.name() + ": Draining");
110        runners.forEach(ComputationRunner::drain);
111        boolean ret = awaitPoolTermination(timeout);
112        stop(Duration.ofSeconds(1));
113        return ret;
114    }
115
116    public boolean stop(Duration timeout) {
117        if (threadPool == null || threadPool.isTerminated()) {
118            return true;
119        }
120        log.info(metadata.name() + ": Stopping");
121        runners.forEach(ComputationRunner::stop);
122        boolean ret = awaitPoolTermination(timeout);
123        shutdown();
124        return ret;
125    }
126
127    public void shutdown() {
128        if (threadPool != null && !threadPool.isTerminated()) {
129            log.info(metadata.name() + ": Shutting down");
130            threadPool.shutdownNow();
131            // give a chance to end threads with valid tailer when shutdown is followed by streams.close()
132            try {
133                threadPool.awaitTermination(1, TimeUnit.SECONDS);
134            } catch (InterruptedException e) {
135                Thread.currentThread().interrupt();
136                log.warn(metadata.name() + ": Interrupted in shutdown");
137            }
138        }
139        runners.clear();
140        threadPool = null;
141    }
142
143    protected boolean awaitPoolTermination(Duration timeout) {
144        try {
145            if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
146                log.warn(metadata.name() + ": Timeout on wait for pool termination");
147                return false;
148            }
149        } catch (InterruptedException e) {
150            log.warn(metadata.name() + ": Interrupted while waiting for pool termination");
151            Thread.currentThread().interrupt();
152            return false;
153        }
154        return true;
155    }
156
157    public long getLowWatermark() {
158        // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0)
159        Set<Watermark> watermarks = runners.stream()
160                                           .map(ComputationRunner::getLowWatermark)
161                                           .filter(wm -> wm.getValue() > 1)
162                                           .collect(Collectors.toSet());
163        // Take the lowest watermark of unprocessed (not completed) records
164        long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0);
165        boolean pending = true;
166        if (ret == 0) {
167            pending = false;
168            // There is no known pending records we take the max completed low watermark
169            ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0);
170        }
171        if (log.isTraceEnabled() && ret > 0)
172            log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed"));
173        return ret;
174    }
175
176    protected static class NamedThreadFactory implements ThreadFactory {
177        protected final AtomicInteger count = new AtomicInteger(0);
178
179        protected final String prefix;
180
181        public NamedThreadFactory(String prefix) {
182            this.prefix = prefix;
183        }
184
185        @SuppressWarnings("NullableProblems")
186        @Override
187        public Thread newThread(Runnable r) {
188            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
189            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
190            return t;
191        }
192    }
193
194}