001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.producer.internals;
020
021import static java.lang.Thread.currentThread;
022
023import java.util.concurrent.Callable;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.nuxeo.lib.stream.log.LogAppender;
028import org.nuxeo.lib.stream.pattern.Message;
029import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner;
030import org.nuxeo.lib.stream.pattern.producer.ProducerFactory;
031import org.nuxeo.lib.stream.pattern.producer.ProducerIterator;
032import org.nuxeo.lib.stream.pattern.producer.ProducerStatus;
033
034import com.codahale.metrics.Counter;
035import com.codahale.metrics.MetricRegistry;
036import com.codahale.metrics.SharedMetricRegistries;
037import com.codahale.metrics.Timer;
038
039/**
040 * A callable pulling a producer iterator in loop.
041 *
042 * @since 9.1
043 */
044public class ProducerRunner<M extends Message> implements Callable<ProducerStatus> {
045    private static final Log log = LogFactory.getLog(ProducerRunner.class);
046
047    protected final int producerId;
048
049    protected final LogAppender<M> appender;
050
051    protected final ProducerFactory<M> factory;
052
053    protected String threadName;
054
055    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(
056            ConsumerRunner.NUXEO_METRICS_REGISTRY_NAME);
057
058    protected final Timer producerTimer;
059
060    protected final Counter producersCount;
061
062    protected long counter;
063
064    public ProducerRunner(ProducerFactory<M> factory, LogAppender<M> appender, int producerId) {
065        this.factory = factory;
066        this.producerId = producerId;
067        this.appender = appender;
068        producerTimer = registry.timer(MetricRegistry.name("nuxeo", "importer", "stream", "producer"));
069        producersCount = registry.counter(MetricRegistry.name("nuxeo", "importer", "stream", "producers"));
070        log.debug("ProducerIterator thread created: " + producerId);
071    }
072
073    @Override
074    public ProducerStatus call() throws Exception {
075        threadName = currentThread().getName();
076        long start = System.currentTimeMillis();
077        producersCount.inc();
078        try (ProducerIterator<M> producer = factory.createProducer(producerId)) {
079            producerLoop(producer);
080        } finally {
081            producersCount.dec();
082        }
083        return new ProducerStatus(producerId, counter, start, System.currentTimeMillis(), false);
084    }
085
086    protected void producerLoop(ProducerIterator<M> producer) {
087        M message;
088        while (producer.hasNext()) {
089            try (Timer.Context ignored = producerTimer.time()) {
090                message = producer.next();
091                setThreadName(message);
092                counter++;
093            }
094            appender.append(producer.getPartition(message, appender.size()), message);
095        }
096    }
097
098    protected void setThreadName(M message) {
099        String name = threadName + "-" + counter;
100        if (message != null) {
101            name += "-" + message.getId();
102        } else {
103            name += "-null";
104        }
105        currentThread().setName(name);
106    }
107}