001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.Callable;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.atomic.AtomicInteger;
033
034import static java.util.concurrent.Executors.newFixedThreadPool;
035
036/**
037 * Execute a pool of callables.
038 *
039 * @since 9.1
040 */
041public abstract class AbstractCallablePool<T> implements AutoCloseable {
042    private static final Log log = LogFactory.getLog(AbstractCallablePool.class);
043    private final int nbThreads;
044    private ExecutorService threadPool;
045
046    public AbstractCallablePool(int nbThreads) {
047        this.nbThreads = nbThreads;
048    }
049
050    /**
051     * Value to return when there was an exception during excution
052     */
053    protected abstract T getErrorStatus();
054
055    protected abstract Callable<T> getCallable(int i);
056
057    protected abstract String getThreadPrefix();
058
059    protected abstract void afterCall(List<T> ret);
060
061    public CompletableFuture<List<T>> start() {
062        ExecutorService supplyThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory(getThreadPrefix() + "Pool"));
063        CompletableFuture<List<T>> ret = new CompletableFuture<>();
064        CompletableFuture.supplyAsync(() -> {
065            try {
066                ret.complete(runPool());
067            } catch (Throwable t) {
068                ret.completeExceptionally(t);
069            }
070            return ret;
071        }, supplyThreadPool);
072        // the threadpool will shutdown once the task is done
073        supplyThreadPool.shutdown();
074        return ret;
075    }
076
077    protected List<T> runPool() throws InterruptedException {
078        threadPool = newFixedThreadPool(nbThreads, new NamedThreadFactory(getThreadPrefix()));
079        log.warn("Start " + getThreadPrefix() + " Pool on " + nbThreads + " thread(s).");
080        List<CompletableFuture<T>> futures = new ArrayList<>(nbThreads);
081
082        for (int i = 0; i < nbThreads; i++) {
083            Callable<T> callable = getCallable(i);
084            CompletableFuture<T> future = new CompletableFuture<>();
085            CompletableFuture.supplyAsync(() -> {
086                try {
087                    future.complete(callable.call());
088                } catch (Throwable t) {
089                    log.error("Exception catch in runner: " + t.getMessage(), t);
090                    future.completeExceptionally(t);
091                }
092                return future;
093            }, threadPool);
094            futures.add(future);
095        }
096        log.info("Pool is up and running");
097        threadPool.shutdown();
098        // TODO: we may return here and wait only in the get impl, but the sync cost should be cheap here
099        List<T> ret = new ArrayList<>(nbThreads);
100        for (CompletableFuture<T> future : futures) {
101            T status;
102            try {
103                status = future.get();
104            } catch (ExecutionException e) {
105                log.error("End of consumer in error: " + e.getMessage() + future.toString());
106                status = getErrorStatus();
107            }
108            ret.add(status);
109        }
110        afterCall(ret);
111        return ret;
112    }
113
114    @Override
115    public void close() throws Exception {
116        threadPool.shutdownNow();
117    }
118
119    protected static class NamedThreadFactory implements ThreadFactory {
120        private final AtomicInteger count = new AtomicInteger(0);
121        private final String prefix;
122
123        public NamedThreadFactory(String prefix) {
124            this.prefix = prefix;
125        }
126
127        @SuppressWarnings("NullableProblems")
128        @Override
129        public Thread newThread(Runnable r) {
130            return new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
131        }
132    }
133
134}