001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.computation.Computation;
024import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping;
025import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
026import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark;
027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
028import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
029
030import java.time.Duration;
031import java.util.ArrayList;
032import java.util.Comparator;
033import java.util.List;
034import java.util.Set;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import java.util.function.Supplier;
040import java.util.stream.Collectors;
041
042import static java.util.concurrent.Executors.newFixedThreadPool;
043
044/**
045 * Pool of ComputationRunner
046 *
047 * @since 9.2
048 */
049public class MQComputationPool {
050    private static final Log log = LogFactory.getLog(MQComputationPool.class);
051    private final ComputationMetadataMapping metadata;
052    private final int threads;
053    private final MQManager<Record> manager;
054    private final Supplier<Computation> supplier;
055    private final List<List<MQPartition>> defaultAssignments;
056    private ExecutorService threadPool;
057    private final List<MQComputationRunner> runners;
058
059    public MQComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, List<List<MQPartition>> defaultAssignments, MQManager<Record> manager) {
060        this.supplier = supplier;
061        this.manager = manager;
062        this.metadata = metadata;
063        this.threads = defaultAssignments.size();
064        this.defaultAssignments = defaultAssignments;
065        this.runners = new ArrayList<>(threads);
066    }
067
068    public String getComputationName() {
069        return metadata.name();
070    }
071
072    public void start() {
073        log.info(metadata.name() + ": Starting pool");
074        threadPool = newFixedThreadPool(threads, new NamedThreadFactory(metadata.name() + "Pool"));
075        defaultAssignments.forEach(assignments -> {
076            MQComputationRunner runner = new MQComputationRunner(supplier, metadata, assignments, manager);
077            threadPool.submit(runner);
078            runners.add(runner);
079        });
080        // close the pool no new admission
081        threadPool.shutdown();
082        log.debug(metadata.name() + ": Pool started, threads: " + threads);
083    }
084
085    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
086        log.info(metadata.name() + ": Wait for partitions assignments");
087        if (threadPool == null || threadPool.isTerminated()) {
088            return true;
089        }
090        for (MQComputationRunner runner : runners) {
091            if (!runner.waitForAssignments(timeout)) {
092                return false;
093            }
094        }
095        return true;
096    }
097
098    public boolean drainAndStop(Duration timeout) {
099        if (threadPool == null || threadPool.isTerminated()) {
100            return true;
101        }
102        log.info(metadata.name() + ": Draining");
103        runners.forEach(MQComputationRunner::drain);
104        boolean ret = awaitPoolTermination(timeout);
105        stop(Duration.ofSeconds(1));
106        return ret;
107    }
108
109    public boolean stop(Duration timeout) {
110        if (threadPool == null || threadPool.isTerminated()) {
111            return true;
112        }
113        log.info(metadata.name() + ": Stopping");
114        runners.forEach(MQComputationRunner::stop);
115        boolean ret = awaitPoolTermination(timeout);
116        shutdown();
117        return ret;
118    }
119
120    public void shutdown() {
121        if (threadPool != null && !threadPool.isTerminated()) {
122            log.info(metadata.name() + ": Shutting down");
123            threadPool.shutdownNow();
124            // give a chance to end threads with valid tailer when shutdown is followed by streams.close()
125            try {
126                threadPool.awaitTermination(1, TimeUnit.SECONDS);
127            } catch (InterruptedException e) {
128                log.warn(metadata.name() + ": Interrupted in shutdown");
129                Thread.currentThread().interrupt();
130            }
131        }
132        runners.clear();
133        threadPool = null;
134    }
135
136    private boolean awaitPoolTermination(Duration timeout) {
137        try {
138            if (!threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
139                log.warn(metadata.name() + ": Timeout on wait for pool termination");
140                return false;
141            }
142        } catch (InterruptedException e) {
143            log.warn(metadata.name() + ": Interrupted while waiting for pool termination");
144            Thread.currentThread().interrupt();
145            return false;
146        }
147        return true;
148    }
149
150    public long getLowWatermark() {
151        // Collect all the low watermark of the pool, filtering 0 (or 1 which is completed of 0)
152        Set<Watermark> watermarks = runners.stream().map(MQComputationRunner::getLowWatermark)
153                .filter(wm -> wm.getValue() > 1).collect(Collectors.toSet());
154        // Take the lowest watermark of unprocessed (not completed) records
155        long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).map(Watermark::getValue)
156                .min(Comparator.naturalOrder()).orElse(0L);
157        boolean pending = true;
158        if (ret == 0) {
159            pending = false;
160            // There is no known pending records we take the max completed low watermark
161            ret = watermarks.stream().filter(Watermark::isCompleted).map(Watermark::getValue)
162                    .max(Comparator.naturalOrder()).orElse(0L);
163        }
164        if (log.isTraceEnabled() && ret > 0)
165            log.trace(metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed"));
166        return ret;
167    }
168
169    protected static class NamedThreadFactory implements ThreadFactory {
170        private final AtomicInteger count = new AtomicInteger(0);
171        private final String prefix;
172
173        public NamedThreadFactory(String prefix) {
174            this.prefix = prefix;
175        }
176
177        @SuppressWarnings("NullableProblems")
178        @Override
179        public Thread newThread(Runnable r) {
180            Thread t = new Thread(r, String.format("%s-%02d", prefix, count.getAndIncrement()));
181            t.setUncaughtExceptionHandler((t1, e) -> log.error("Uncaught exception: " + e.getMessage(), e));
182            return t;
183        }
184    }
185
186}