001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.producer;
020
021import com.codahale.metrics.Counter;
022import com.codahale.metrics.MetricRegistry;
023import com.codahale.metrics.SharedMetricRegistries;
024import com.codahale.metrics.Timer;
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
028import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;
029
030import java.util.concurrent.Callable;
031
032import static java.lang.Thread.currentThread;
033import static org.nuxeo.ecm.platform.importer.mqueues.consumer.ConsumerRunner.NUXEO_METRICS_REGISTRY_NAME;
034
035/**
036 * A callable pulling a producer iterator in loop.
037 *
038 * @since 9.1
039 */
040public class ProducerRunner<M extends Message> implements Callable<ProducerStatus> {
041    private static final Log log = LogFactory.getLog(ProducerRunner.class);
042    private final int producerId;
043    private final MQueues<M> mq;
044    private final ProducerFactory<M> factory;
045    private String threadName;
046
047    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
048    protected final Timer producerTimer;
049    protected final Counter producersCount;
050
051    public ProducerRunner(ProducerFactory<M> factory, MQueues<M> mQueues, int producerId) {
052        this.factory = factory;
053        this.producerId = producerId;
054        this.mq = mQueues;
055        producerTimer = newTimer(MetricRegistry.name("nuxeo", "importer", "queue", "producer", String.valueOf(producerId)));
056        producersCount = newCounter(MetricRegistry.name("nuxeo", "importer", "queue", "producers"));
057        log.debug("ProducerIterator thread created: " + producerId);
058    }
059
060    private Counter newCounter(String name) {
061        registry.remove(name);
062        return registry.counter(name);
063    }
064
065    private Timer newTimer(String name) {
066        registry.remove(name);
067        return registry.timer(name);
068    }
069
070    @Override
071    public ProducerStatus call() throws Exception {
072        threadName = currentThread().getName();
073        long start = System.currentTimeMillis();
074        producersCount.inc();
075        try (ProducerIterator<M> producer = factory.createProducer(producerId)) {
076            producerLoop(producer);
077        } finally {
078            producersCount.dec();
079        }
080        return new ProducerStatus(producerId, producerTimer.getCount(), start, System.currentTimeMillis(), false);
081    }
082
083    private void producerLoop(ProducerIterator<M> producer) {
084        M message;
085        while (producer.hasNext()) {
086            try (Timer.Context ignored = producerTimer.time()) {
087                message = producer.next();
088                setThreadName(message);
089            }
090            mq.append(producer.getShard(message, mq.size()), message);
091        }
092    }
093
094    private void setThreadName(M message) {
095        String name = threadName + "-" + producerTimer.getCount();
096        if (message != null) {
097            name += "-" + message.getId();
098        } else {
099            name += "-null";
100        }
101        currentThread().setName(name);
102    }
103}