001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.util.concurrent.Executors.newFixedThreadPool;
022
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.function.Supplier;
032import java.util.stream.Collectors;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.nuxeo.lib.stream.computation.Computation;
037import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
038import org.nuxeo.lib.stream.computation.Watermark;
039import org.nuxeo.lib.stream.log.LogManager;
040import org.nuxeo.lib.stream.log.LogPartition;
041
042/**
043 * Pool of ComputationRunner
044 *
045 * @since 9.3
046 */
047public class ComputationPool {
048    private static final Log log = LogFactory.getLog(ComputationPool.class);
049
050    protected final ComputationMetadataMapping metadata;
051
052    protected final int threads;
053
054    protected final LogManager manager;
055
056    protected final Supplier<Computation> supplier;
057
058    protected final List<List<LogPartition>> defaultAssignments;
059
060    protected final List<ComputationRunner> runners;
061
062    protected ExecutorService threadPool;
063
064    public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
065            List<List<LogPartition>> defaultAssignments, LogManager manager) {
066        this.supplier = supplier;
067        this.manager = manager;
068        this.metadata = metadata;
069        this.threads = defaultAssignments.size();
070        this.defaultAssignments = defaultAssignments;
071        this.runners = new ArrayList<>(threads);
072    }
073
074    public String getComputationName() {
075        return metadata.name();
076    }
077
078    @SuppressWarnings("FutureReturnValueIgnored")
079    public void start() {
080        log.info(metadata.name() + ": Starting pool");
081        threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool"));
082        defaultAssignments.forEach(assignments -> {
083            ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, manager);
084            threadPool.submit(runner);
085            runners.add(runner);
086        });
087        // close the pool no new admission
088        threadPool.shutdown();
089        log.debug(metadata.name() + ": Pool started, threads: " + threads);
090    }
091
092    public boolean isTerminated() {
093        return threadPool.isTerminated();
094    }
095
096    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
097        log.info(metadata.name() + ": Wait for partitions assignments");
098        if (threadPool == null || threadPool.isTerminated()) {
099            return true;
100        }
101        for (ComputationRunner runner : runners) {
102            if (!runner.waitForAssignments(timeout)) {
103                return false;
104            }
105        }
106        return true;
107    }
108
109    public boolean drainAndStop(Duration timeout) {
110        if (threadPool == null || threadPool.isTerminated()) {
111            return true;
112        }
113        log.info(metadata.name() + ": Draining");
114        runners.forEach(ComputationRunner::drain);
115        boolean ret = awaitPoolTermination(timeout);
116        stop(Duration.ofSeconds(1));
117        return ret;
118    }
119
120    public boolean stop(Duration timeout) {
121        if (threadPool == null || threadPool.isTerminated()) {
122            return true;
123        }
124        log.info(metadata.name() + ": Stopping");
125        runners.forEach(ComputationRunner::stop);
126        boolean ret = awaitPoolTermination(timeout);
127        shutdown();
128        return ret;
129    }
130
131    public void shutdown() {
132        if (threadPool != null && !threadPool.isTerminated()) {
133            log.info(metadata.name() + ": Shutting down");
134            threadPool.shutdownNow();
135            // give a chance to end threads with valid tailer when shutdown is followed by streams.close()
136            try {
137                threadPool.awaitTermination(1, TimeUnit.SECONDS);
138            } catch (InterruptedException e) {
139                Thread.currentThread().interrupt();
140                log.warn(metadata.name() + ": Interrupted in shutdown");
141            }
142        }
143        runners.clear();
144        threadPool = null;
145    }
146
147    protected boolean awaitPoolTermination(Duration timeout) {
148        try {
149            if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
150                log.warn(metadata.name() + ": Timeout on wait for pool termination");
151                return false;
152            }
153        } catch (InterruptedException e) {
154            Thread.currentThread().interrupt();
155            log.warn(metadata.name() + ": Interrupted while waiting for pool termination");
156            return false;
157        }
158        return true;
159    }
160
161    public long getLowWatermark() {
162        // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0)
163        Set<Watermark> watermarks = runners.stream()
164                                           .map(ComputationRunner::getLowWatermark)
165                                           .filter(wm -> wm.getValue() > 1)
166                                           .collect(Collectors.toSet());
167        // Take the lowest watermark of unprocessed (not completed) records
168        long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0);
169        boolean pending = true;
170        if (ret == 0) {
171            pending = false;
172            // There is no known pending records we take the max completed low watermark
173            ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0);
174        }
175        if (log.isTraceEnabled() && ret > 0)
176            log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed"));
177        return ret;
178    }
179
180    protected static class NamedThreadFactory implements ThreadFactory {
181        protected final AtomicInteger count = new AtomicInteger(0);
182
183        protected final String prefix;
184
185        public NamedThreadFactory(String prefix) {
186            this.prefix = prefix;
187        }
188
189        @SuppressWarnings("NullableProblems")
190        @Override
191        public Thread newThread(Runnable r) {
192            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
193            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
194            return t;
195        }
196    }
197
198}