001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.producer.internals;
020
021import com.codahale.metrics.Counter;
022import com.codahale.metrics.MetricRegistry;
023import com.codahale.metrics.SharedMetricRegistries;
024import com.codahale.metrics.Timer;
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
028import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message;
029import org.nuxeo.ecm.platform.importer.mqueues.pattern.producer.ProducerFactory;
030import org.nuxeo.ecm.platform.importer.mqueues.pattern.producer.ProducerIterator;
031import org.nuxeo.ecm.platform.importer.mqueues.pattern.producer.ProducerStatus;
032
033import java.util.concurrent.Callable;
034
035import static java.lang.Thread.currentThread;
036import static org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.ConsumerRunner.NUXEO_METRICS_REGISTRY_NAME;
037
038/**
039 * A callable pulling a producer iterator in loop.
040 *
041 * @since 9.1
042 */
043public class ProducerRunner<M extends Message> implements Callable<ProducerStatus> {
044    private static final Log log = LogFactory.getLog(ProducerRunner.class);
045    private final int producerId;
046    private final MQAppender<M> mq;
047    private final ProducerFactory<M> factory;
048    private String threadName;
049
050    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
051    protected final Timer producerTimer;
052    protected final Counter producersCount;
053
054    public ProducerRunner(ProducerFactory<M> factory, MQAppender<M> mQueue, int producerId) {
055        this.factory = factory;
056        this.producerId = producerId;
057        this.mq = mQueue;
058        producerTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "producer", String.valueOf(producerId)));
059        producersCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "producers"));
060        log.debug("ProducerIterator thread created: " + producerId);
061    }
062
063    private Counter newCounter(String name) {
064        registry.remove(name);
065        return registry.counter(name);
066    }
067
068    private Timer newTimer(String name) {
069        registry.remove(name);
070        return registry.timer(name);
071    }
072
073    @Override
074    public ProducerStatus call() throws Exception {
075        threadName = currentThread().getName();
076        long start = System.currentTimeMillis();
077        producersCount.inc();
078        try (ProducerIterator<M> producer = factory.createProducer(producerId)) {
079            producerLoop(producer);
080        } finally {
081            producersCount.dec();
082        }
083        return new ProducerStatus(producerId, producerTimer.getCount(), start, System.currentTimeMillis(), false);
084    }
085
086    private void producerLoop(ProducerIterator<M> producer) {
087        M message;
088        while (producer.hasNext()) {
089            try (Timer.Context ignored = producerTimer.time()) {
090                message = producer.next();
091                setThreadName(message);
092            }
093            mq.append(producer.getPartition(message, mq.size()), message);
094        }
095    }
096
097    private void setThreadName(M message) {
098        String name = threadName + "-" + producerTimer.getCount();
099        if (message != null) {
100            name += "-" + message.getId();
101        } else {
102            name += "-null";
103        }
104        currentThread().setName(name);
105    }
106}