001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.util.concurrent.Executors.newFixedThreadPool;
022
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Objects;
027import java.util.Set;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.function.Supplier;
033import java.util.stream.Collectors;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.codec.Codec;
038import org.nuxeo.lib.stream.computation.Computation;
039import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
040import org.nuxeo.lib.stream.computation.ComputationPolicy;
041import org.nuxeo.lib.stream.computation.Record;
042import org.nuxeo.lib.stream.computation.Watermark;
043import org.nuxeo.lib.stream.log.LogManager;
044import org.nuxeo.lib.stream.log.LogPartition;
045
046/**
047 * Pool of ComputationRunner
048 *
049 * @since 9.3
050 */
051public class ComputationPool {
052    private static final Log log = LogFactory.getLog(ComputationPool.class);
053
054    protected final ComputationMetadataMapping metadata;
055
056    protected final int threads;
057
058    protected final LogManager manager;
059
060    protected final Supplier<Computation> supplier;
061
062    protected final List<List<LogPartition>> defaultAssignments;
063
064    protected final List<ComputationRunner> runners;
065
066    protected final Codec<Record> inputCodec;
067
068    protected final Codec<Record> outputCodec;
069
070    protected final ComputationPolicy policy;
071
072    protected ExecutorService threadPool;
073
074    public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata,
075            List<List<LogPartition>> defaultAssignments, LogManager manager, Codec<Record> inputCodec,
076            Codec<Record> outputCodec, ComputationPolicy policy) {
077        Objects.requireNonNull(inputCodec);
078        Objects.requireNonNull(outputCodec);
079        Objects.requireNonNull(policy);
080        this.supplier = supplier;
081        this.manager = manager;
082        this.metadata = metadata;
083        this.threads = defaultAssignments.size();
084        this.inputCodec = inputCodec;
085        this.outputCodec = outputCodec;
086        this.defaultAssignments = defaultAssignments;
087        this.policy = policy;
088        this.runners = new ArrayList<>(threads);
089    }
090
091    public String getComputationName() {
092        return metadata.name();
093    }
094
095    @SuppressWarnings("FutureReturnValueIgnored")
096    public void start() {
097        log.info(metadata.name() + ": Starting pool");
098        threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool"));
099        defaultAssignments.forEach(assignments -> {
100            ComputationRunner runner = new ComputationRunner(supplier, metadata, assignments, manager, inputCodec,
101                    outputCodec, policy);
102            threadPool.submit(runner);
103            runners.add(runner);
104        });
105        // close the pool no new admission
106        threadPool.shutdown();
107        log.debug(metadata.name() + ": Pool started, threads: " + threads);
108    }
109
110    public boolean isTerminated() {
111        return threadPool.isTerminated();
112    }
113
114    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
115        log.info(metadata.name() + ": Wait for partitions assignments");
116        if (threadPool == null || threadPool.isTerminated()) {
117            return true;
118        }
119        for (ComputationRunner runner : runners) {
120            if (!runner.waitForAssignments(timeout)) {
121                return false;
122            }
123        }
124        return true;
125    }
126
127    public boolean drainAndStop(Duration timeout) {
128        if (threadPool == null || threadPool.isTerminated()) {
129            return true;
130        }
131        log.info(metadata.name() + ": Draining");
132        runners.forEach(ComputationRunner::drain);
133        boolean ret = awaitPoolTermination(timeout);
134        stop(Duration.ofSeconds(1));
135        return ret;
136    }
137
138    public boolean stop(Duration timeout) {
139        if (threadPool == null || threadPool.isTerminated()) {
140            return true;
141        }
142        log.info(metadata.name() + ": Stopping");
143        runners.forEach(ComputationRunner::stop);
144        boolean ret = awaitPoolTermination(timeout);
145        shutdown();
146        return ret;
147    }
148
149    public void shutdown() {
150        if (threadPool != null && !threadPool.isTerminated()) {
151            log.info(metadata.name() + ": Shutting down");
152            threadPool.shutdownNow();
153            // give a chance to end threads with valid tailer when shutdown is followed by streams.close()
154            try {
155                threadPool.awaitTermination(1, TimeUnit.SECONDS);
156            } catch (InterruptedException e) {
157                Thread.currentThread().interrupt();
158                log.warn(metadata.name() + ": Interrupted in shutdown");
159            }
160        }
161        runners.clear();
162        threadPool = null;
163    }
164
165    protected boolean awaitPoolTermination(Duration timeout) {
166        try {
167            if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
168                log.warn(metadata.name() + ": Timeout on wait for pool termination");
169                return false;
170            }
171        } catch (InterruptedException e) {
172            Thread.currentThread().interrupt();
173            log.warn(metadata.name() + ": Interrupted while waiting for pool termination");
174            return false;
175        }
176        return true;
177    }
178
179    public long getLowWatermark() {
180        // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0)
181        Set<Watermark> watermarks = runners.stream()
182                                           .map(ComputationRunner::getLowWatermark)
183                                           .filter(wm -> wm.getValue() > 1)
184                                           .collect(Collectors.toSet());
185        // Take the lowest watermark of unprocessed (not completed) records
186        long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0);
187        boolean pending = true;
188        if (ret == 0) {
189            pending = false;
190            // There is no known pending records we take the max completed low watermark
191            ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0);
192        }
193        if (log.isTraceEnabled() && ret > 0)
194            log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed"));
195        return ret;
196    }
197
198    protected static class NamedThreadFactory implements ThreadFactory {
199        protected final AtomicInteger count = new AtomicInteger(0);
200
201        protected final String prefix;
202
203        public NamedThreadFactory(String prefix) {
204            this.prefix = prefix;
205        }
206
207        @SuppressWarnings("NullableProblems")
208        @Override
209        public Thread newThread(Runnable r) {
210            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
211            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
212            return t;
213        }
214    }
215
216}