001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.util.concurrent.Executors.newFixedThreadPool;
022
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Objects;
027import java.util.Set;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.function.Supplier;
033import java.util.stream.Collectors;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.computation.Computation;
038import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
039import org.nuxeo.lib.stream.computation.ComputationPolicy;
040import org.nuxeo.lib.stream.computation.Watermark;
041import org.nuxeo.lib.stream.log.LogPartition;
042
043/**
044 * Pool of ComputationRunner
045 *
046 * @since 9.3
047 */
048public class ComputationPool {
049    private static final Log log = LogFactory.getLog(ComputationPool.class);
050
051    protected final ComputationMetadataMapping metadata;
052
053    protected final int threads;
054
055    protected final Supplier<Computation> supplier;
056
057    protected final List<List<LogPartition>> defaultAssignments;
058
059    protected final List<ComputationRunner> runners;
060
061    protected final LogStreamManager streamManager;
062
063    protected final ComputationPolicy policy;
064
065    protected ExecutorService threadPool;
066
067    public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
068            List<List<LogPartition>> defaultAssignments, LogStreamManager streamManager, ComputationPolicy policy) {
069        Objects.requireNonNull(policy);
070        this.supplier = supplier;
071        this.metadata = metadata;
072        this.threads = defaultAssignments.size();
073        this.streamManager = streamManager;
074        this.defaultAssignments = defaultAssignments;
075        this.policy = policy;
076        this.runners = new ArrayList<>(threads);
077    }
078
079    public String getComputationName() {
080        return metadata.name();
081    }
082
083    @SuppressWarnings("FutureReturnValueIgnored")
084    public void start() {
085        if (threads == 0) {
086            log.info(metadata.name() + ": Empty pool");
087            return;
088        }
089        log.info(metadata.name() + ": Starting pool");
090        threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool"));
091        defaultAssignments.forEach(assignments -> {
092            ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, streamManager, policy);
093            threadPool.submit(runner);
094            runners.add(runner);
095        });
096        // close the pool no new admission
097        threadPool.shutdown();
098        log.debug(metadata.name() + ": Pool started, threads: " + threads);
099    }
100
101    public boolean isTerminated() {
102        return threadPool == null ? true : threadPool.isTerminated();
103    }
104
105    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
106        log.info(metadata.name() + ": Wait for partitions assignments");
107        if (threadPool == null || threadPool.isTerminated()) {
108            return true;
109        }
110        for (ComputationRunner runner : runners) {
111            if (!runner.waitForAssignments(timeout)) {
112                return false;
113            }
114        }
115        return true;
116    }
117
118    public boolean drainAndStop(Duration timeout) {
119        if (threadPool == null || threadPool.isTerminated()) {
120            return true;
121        }
122        log.info(metadata.name() + ": Draining");
123        runners.forEach(ComputationRunner::drain);
124        boolean ret = awaitPoolTermination(timeout);
125        stop(Duration.ofSeconds(1));
126        return ret;
127    }
128
129    public boolean stop(Duration timeout) {
130        if (threadPool == null || threadPool.isTerminated()) {
131            return true;
132        }
133        log.info(metadata.name() + ": Stopping");
134        runners.forEach(ComputationRunner::stop);
135        boolean ret = awaitPoolTermination(timeout);
136        shutdown();
137        return ret;
138    }
139
140    public void shutdown() {
141        if (threadPool != null && !threadPool.isTerminated()) {
142            log.info(metadata.name() + ": Shutting down");
143            threadPool.shutdownNow();
144            // give a chance to end threads with valid tailer when shutdown is followed by streams.close()
145            try {
146                threadPool.awaitTermination(1, TimeUnit.SECONDS);
147            } catch (InterruptedException e) {
148                Thread.currentThread().interrupt();
149                log.warn(metadata.name() + ": Interrupted in shutdown");
150            }
151        }
152        runners.clear();
153        threadPool = null;
154    }
155
156    protected boolean awaitPoolTermination(Duration timeout) {
157        try {
158            if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
159                log.warn(metadata.name() + ": Timeout on wait for pool termination");
160                return false;
161            }
162        } catch (InterruptedException e) {
163            Thread.currentThread().interrupt();
164            log.warn(metadata.name() + ": Interrupted while waiting for pool termination");
165            return false;
166        }
167        return true;
168    }
169
170    public long getLowWatermark() {
171        // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0)
172        Set<Watermark> watermarks = runners.stream()
173                                           .map(ComputationRunner::getLowWatermark)
174                                           .filter(wm -> wm.getValue() > 1)
175                                           .collect(Collectors.toSet());
176        // Take the lowest watermark of unprocessed (not completed) records
177        long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0);
178        boolean pending = true;
179        if (ret == 0) {
180            pending = false;
181            // There is no known pending records we take the max completed low watermark
182            ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0);
183        }
184        if (log.isTraceEnabled() && ret > 0)
185            log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed"));
186        return ret;
187    }
188
189    protected static class NamedThreadFactory implements ThreadFactory {
190        protected final AtomicInteger count = new AtomicInteger(0);
191
192        protected final String prefix;
193
194        public NamedThreadFactory(String prefix) {
195            this.prefix = prefix;
196        }
197
198        @SuppressWarnings("NullableProblems")
199        @Override
200        public Thread newThread(Runnable r) {
201            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
202            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
203            return t;
204        }
205    }
206
207}