001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.producer;
020
021import org.nuxeo.ecm.platform.importer.mqueues.pattern.producer.internals.ProducerRunner;
022
023import java.util.List;
024
025/**
026 * The return status of a {@link ProducerRunner}
027 *
028 * @since 9.1
029 */
030public class ProducerStatus {
031    public final long startTime;
032    public final long stopTime;
033    public final long nbProcessed;
034    public final int producer;
035    private final boolean fail;
036
037    public ProducerStatus(int producer, long nbProcessed, long startTime, long stopTime, boolean fail) {
038        this.producer = producer;
039        this.nbProcessed = nbProcessed;
040        this.startTime = startTime;
041        this.stopTime = stopTime;
042        this.fail = fail;
043    }
044
045    @Override
046    public String toString() {
047        if (fail) {
048            return "Producer status FAILURE";
049        }
050        double elapsed = (stopTime - startTime) / 1000.;
051        double mps = (elapsed != 0) ? nbProcessed / elapsed : 0.0;
052        return String.format("Producer %02d status: messages: %d, elapsed: %.2fs, throughput: %.2f msg/s.",
053                producer, nbProcessed, elapsed, mps);
054    }
055
056    static String toString(List<ProducerStatus> stats) {
057        long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0);
058        long stopTime = stats.stream().mapToLong(r -> r.stopTime).min().orElse(0);
059        double elapsed = (stopTime - startTime) / 1000.;
060        long messages = stats.stream().mapToLong(r -> r.nbProcessed).sum();
061        double mps = (elapsed != 0) ? messages / elapsed : 0.0;
062        int producers = stats.size();
063        long failures = stats.stream().filter(s -> s.fail).count();
064        return String.format("Producers status: threads: %d, failures: %d, messages: %d, elapsed: %.2fs, throughput: %.2f msg/s",
065                producers, failures, messages, elapsed, mps);
066
067    }
068}