001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.Callable;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import static java.util.concurrent.Executors.newFixedThreadPool;
035
036/**
037 * Execute a pool of callables.
038 *
039 * @since 9.1
040 */
041public abstract class AbstractCallablePool<T> implements AutoCloseable {
042    private static final Log log = LogFactory.getLog(AbstractCallablePool.class);
043    private final short nbThreads;
044    private ExecutorService threadPool;
045
046    public AbstractCallablePool(short nbThreads) {
047        this.nbThreads = nbThreads;
048    }
049
050    /**
051     * Value to return when there was an exception during execution
052     */
053    protected abstract T getErrorStatus();
054
055    protected abstract Callable<T> getCallable(int i);
056
057    protected abstract String getThreadPrefix();
058
059    protected abstract void afterCall(List<T> ret);
060
061    public int getNbThreads() {
062        return nbThreads;
063    }
064
065    public CompletableFuture<List<T>> start() {
066        ExecutorService supplyThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory(getThreadPrefix() + "Pool"));
067        CompletableFuture<List<T>> ret = new CompletableFuture<>();
068        CompletableFuture.supplyAsync(() -> {
069            try {
070                ret.complete(runPool());
071            } catch (Throwable t) {
072                ret.completeExceptionally(t);
073            }
074            return ret;
075        }, supplyThreadPool);
076        // the threadpool will shutdown once the task is done
077        supplyThreadPool.shutdown();
078        return ret;
079    }
080
081    protected List<T> runPool() throws InterruptedException {
082        threadPool = newFixedThreadPool(nbThreads, new NamedThreadFactory(getThreadPrefix()));
083        log.warn("Start " + getThreadPrefix() + " Pool on " + nbThreads + " thread(s).");
084        List<CompletableFuture<T>> futures = new ArrayList<>(nbThreads);
085
086        for (int i = 0; i < nbThreads; i++) {
087            Callable<T> callable = getCallable(i);
088            CompletableFuture<T> future = new CompletableFuture<>();
089            CompletableFuture.supplyAsync(() -> {
090                try {
091                    future.complete(callable.call());
092                } catch (Throwable t) {
093                    log.error("Exception catch in runner: " + t.getMessage(), t);
094                    future.completeExceptionally(t);
095                }
096                return future;
097            }, threadPool);
098            futures.add(future);
099        }
100        log.info("Pool is up and running");
101        threadPool.shutdown();
102        // TODO: we may return here and wait only in the get impl, but the sync cost should be cheap here
103        List<T> ret = new ArrayList<>(nbThreads);
104        for (CompletableFuture<T> future : futures) {
105            T status;
106            try {
107                status = future.get();
108            } catch (ExecutionException e) {
109                log.error("End of consumer in error: " + e.getMessage() + future.toString());
110                status = getErrorStatus();
111            }
112            ret.add(status);
113        }
114        afterCall(ret);
115        return ret;
116    }
117
118    @Override
119    public void close() throws Exception {
120        threadPool.shutdownNow();
121    }
122
123    protected static class NamedThreadFactory implements ThreadFactory {
124        private final AtomicInteger count = new AtomicInteger(0);
125        private final String prefix;
126
127        public NamedThreadFactory(String prefix) {
128            this.prefix = prefix;
129        }
130
131        @SuppressWarnings("NullableProblems")
132        @Override
133        public Thread newThread(Runnable r) {
134            return new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
135        }
136    }
137
138}