001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer.internals;
020
021import static java.util.concurrent.Executors.newFixedThreadPool;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.atomic.AtomicInteger;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035
036/**
037 * Execute a pool of callables.
038 *
039 * @since 9.1
040 */
041public abstract class AbstractCallablePool<T> implements AutoCloseable {
042    private static final Log log = LogFactory.getLog(AbstractCallablePool.class);
043
044    protected final short nbThreads;
045
046    protected ExecutorService threadPool;
047
048    protected ExecutorService supplyThreadPool;
049
050    public AbstractCallablePool(short nbThreads) {
051        this.nbThreads = nbThreads;
052    }
053
054    /**
055     * Value to return when there was an exception during execution
056     */
057    protected abstract T getErrorStatus();
058
059    protected abstract Callable<T> getCallable(int i);
060
061    protected abstract String getThreadPrefix();
062
063    protected abstract void afterCall(List<T> ret);
064
065    public int getNbThreads() {
066        return nbThreads;
067    }
068
069    public CompletableFuture<List<T>> start() {
070        supplyThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory(getThreadPrefix() + "Pool"));
071        CompletableFuture<List<T>> ret = new CompletableFuture<>();
072        CompletableFuture.supplyAsync(() -> {
073            try {
074                ret.complete(runPool());
075            } catch (InterruptedException e) {
076                Thread.currentThread().interrupt();
077                ret.completeExceptionally(e);
078            } catch (Exception e) {
079                log.error("Exception catch in pool: " + e.getMessage(), e);
080                ret.completeExceptionally(e);
081            }
082            return ret;
083        }, supplyThreadPool);
084        // the threadpool will shutdown once the task is done
085        supplyThreadPool.shutdown();
086        return ret;
087    }
088
089    protected List<T> runPool() throws InterruptedException {
090        threadPool = newFixedThreadPool(nbThreads, new NamedThreadFactory(getThreadPrefix()));
091        log.warn("Start " + getThreadPrefix() + " Pool on " + nbThreads + " thread(s).");
092        List<CompletableFuture<T>> futures = new ArrayList<>(nbThreads);
093
094        for (int i = 0; i < nbThreads; i++) {
095            Callable<T> callable = getCallable(i);
096            CompletableFuture<T> future = new CompletableFuture<>();
097            CompletableFuture.supplyAsync(() -> {
098                try {
099                    future.complete(callable.call());
100                } catch (InterruptedException e) {
101                    Thread.currentThread().interrupt();
102                    future.completeExceptionally(e);
103                } catch (Exception e) {
104                    log.error("Exception catch in runner: " + e.getMessage(), e);
105                    future.completeExceptionally(e);
106                }
107                return future;
108            }, threadPool);
109            futures.add(future);
110        }
111        log.info("Pool is up and running");
112        threadPool.shutdown();
113        // We may return here and wait only in the get impl, but the sync cost should be cheap here
114        List<T> ret = new ArrayList<>(nbThreads);
115        for (CompletableFuture<T> future : futures) {
116            T status;
117            try {
118                status = future.get();
119            } catch (InterruptedException e) {
120                Thread.currentThread().interrupt();
121                log.error("End of consumer interrupted.");
122                status = getErrorStatus();
123            } catch (ExecutionException e) {
124                log.error("End of consumer in error: " + e.getMessage() + future.toString());
125                status = getErrorStatus();
126            }
127            ret.add(status);
128        }
129        afterCall(ret);
130        return ret;
131    }
132
133    @Override
134    public void close() {
135        supplyThreadPool.shutdownNow();
136        threadPool.shutdownNow();
137    }
138
139    protected static class NamedThreadFactory implements ThreadFactory {
140        protected final AtomicInteger count = new AtomicInteger(0);
141
142        protected final String prefix;
143
144        public NamedThreadFactory(String prefix) {
145            this.prefix = prefix;
146        }
147
148        @SuppressWarnings("NullableProblems")
149        @Override
150        public Thread newThread(Runnable r) {
151            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
152            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
153            return t;
154        }
155    }
156
157}