001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.producer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.consumer.AbstractCallablePool;
024import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;
026
027import java.util.List;
028import java.util.concurrent.Callable;
029
030/**
031 * A Pool of ProducerRunner
032 *
033 * @since 9.1
034 */
035public class ProducerPool<M extends Message> extends AbstractCallablePool<ProducerStatus> {
036    private static final Log log = LogFactory.getLog(ProducerPool.class);
037    private final MQueues<M> mq;
038    private final ProducerFactory<M> factory;
039
040    public ProducerPool(MQueues<M> mq, ProducerFactory<M> factory, int nbThreads) {
041        super(nbThreads);
042        this.mq = mq;
043        this.factory = factory;
044    }
045
046    @Override
047    protected ProducerStatus getErrorStatus() {
048        return new ProducerStatus(0, 0, 0, 0, true);
049    }
050
051    @Override
052    protected Callable<ProducerStatus> getCallable(int i) {
053        return new ProducerRunner<>(factory, mq, i);
054    }
055
056    @Override
057    protected String getThreadPrefix() {
058        return "Nuxeo-Producer";
059    }
060
061    @Override
062    protected void afterCall(List<ProducerStatus> ret) {
063        ret.forEach(log::info);
064        log.warn(ProducerStatus.toString(ret));
065    }
066
067
068}