001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer.internals;
020
021import static java.util.concurrent.Executors.newFixedThreadPool;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035
036/**
037 * Execute a pool of callables.
038 *
039 * @since 9.1
040 */
041public abstract class AbstractCallablePool<T> implements AutoCloseable {
042    private static final Log log = LogFactory.getLog(AbstractCallablePool.class);
043
044    protected final short nbThreads;
045
046    protected ExecutorService threadPool;
047
048    protected ExecutorService supplyThreadPool;
049
050    public AbstractCallablePool(short nbThreads) {
051        this.nbThreads = nbThreads;
052    }
053
054    /**
055     * Value to return when there was an exception during execution
056     */
057    protected abstract T getErrorStatus();
058
059    protected abstract Callable<T> getCallable(int i);
060
061    protected abstract String getThreadPrefix();
062
063    protected abstract void afterCall(List<T> ret);
064
065    public int getNbThreads() {
066        return nbThreads;
067    }
068
069    public CompletableFuture<List<T>> start() {
070        supplyThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory(getThreadPrefix() + "Pool"));
071        CompletableFuture<List<T>> ret = new CompletableFuture<>();
072        CompletableFuture.supplyAsync(() -> {
073            try {
074                ret.complete(runPool());
075            } catch (InterruptedException e) {
076                Thread.currentThread().interrupt();
077                ret.completeExceptionally(e);
078            } catch (Exception e) {
079                log.error("Exception catch in pool: " + e.getMessage(), e);
080                ret.completeExceptionally(e);
081            }
082            return ret;
083        }, supplyThreadPool);
084        // the threadpool will shutdown once the task is done
085        supplyThreadPool.shutdown();
086        return ret;
087    }
088
089    protected List<T> runPool() throws InterruptedException {
090        threadPool = newFixedThreadPool(nbThreads, new NamedThreadFactory(getThreadPrefix()));
091        log.warn("Start " + getThreadPrefix() + " Pool on " + nbThreads + " thread(s).");
092        List<CompletableFuture<T>> futures = new ArrayList<>(nbThreads);
093
094        for (int i = 0; i < nbThreads; i++) {
095            Callable<T> callable = getCallable(i);
096            CompletableFuture<T> future = new CompletableFuture<>();
097            CompletableFuture.supplyAsync(() -> {
098                try {
099                    future.complete(callable.call());
100                } catch (InterruptedException e) {
101                    Thread.currentThread().interrupt();
102                    future.completeExceptionally(e);
103                } catch (Throwable e) { // NOSONAR
104                    // Throwable is needed to catch all kind of problem that can happen in custom code
105                    // when using future the UncaughtExceptionHandler is not reporting all errors.
106                    // A LinkageError will stop silently the thread resulting in a hang during future.get
107                    log.error("Exception catch in runner: " + e.getMessage(), e);
108                    future.completeExceptionally(e);
109                }
110                return future;
111            }, threadPool);
112            futures.add(future);
113        }
114        log.info("Pool is up and running");
115        threadPool.shutdown();
116        // We may return here and wait only in the get impl, but the sync cost should be cheap here
117        List<T> ret = new ArrayList<>(nbThreads);
118        for (CompletableFuture<T> future : futures) {
119            T status;
120            try {
121                status = future.get();
122            } catch (InterruptedException e) {
123                Thread.currentThread().interrupt();
124                log.error("End of consumer interrupted.");
125                status = getErrorStatus();
126            } catch (ExecutionException e) {
127                log.error("End of consumer in error: " + e.getMessage() + future.toString());
128                status = getErrorStatus();
129            }
130            ret.add(status);
131        }
132        afterCall(ret);
133        return ret;
134    }
135
136    @Override
137    public void close() {
138        supplyThreadPool.shutdownNow();
139        threadPool.shutdownNow();
140    }
141
142    protected static class NamedThreadFactory implements ThreadFactory {
143        protected final AtomicInteger count = new AtomicInteger(0);
144
145        protected final String prefix;
146
147        public NamedThreadFactory(String prefix) {
148            this.prefix = prefix;
149        }
150
151        @SuppressWarnings("NullableProblems")
152        @Override
153        public Thread newThread(Runnable r) {
154            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
155            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
156            return t;
157        }
158    }
159
160}