001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.producer;
020
021import java.util.List;
022
023/**
024 * The return status of a {@link ProducerRunner}
025 *
026 * @since 9.1
027 */
028public class ProducerStatus {
029    public final long startTime;
030    public final long stopTime;
031    public final long nbProcessed;
032    public final int producer;
033    private final boolean fail;
034
035    public ProducerStatus(int producer, long nbProcessed, long startTime, long stopTime, boolean fail) {
036        this.producer = producer;
037        this.nbProcessed = nbProcessed;
038        this.startTime = startTime;
039        this.stopTime = stopTime;
040        this.fail = fail;
041    }
042
043    @Override
044    public String toString() {
045        if (fail) {
046            return "Producer status FAILURE";
047        }
048        double elapsed = (stopTime - startTime) / 1000.;
049        double mps = (elapsed != 0) ? nbProcessed / elapsed : 0.0;
050        return String.format("Producer %02d status: messages: %d, elapsed: %.2fs, throughput: %.2f msg/s.",
051                producer, nbProcessed, elapsed, mps);
052    }
053
054    static String toString(List<ProducerStatus> stats) {
055        long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0);
056        long stopTime = stats.stream().mapToLong(r -> r.stopTime).min().orElse(0);
057        double elapsed = (stopTime - startTime) / 1000.;
058        long messages = stats.stream().mapToLong(r -> r.nbProcessed).sum();
059        double mps = (elapsed != 0) ? messages / elapsed : 0.0;
060        int producers = stats.size();
061        long failures = stats.stream().filter(s -> s.fail).count();
062        return String.format("Producers status: threads: %d, failures: %d, messages: %d, elapsed: %.2fs, throughput: %.2f msg/s",
063                producers, failures, messages, elapsed, mps);
064
065    }
066}