001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.producer;
020
021import java.util.List;
022
023import org.nuxeo.lib.stream.pattern.producer.internals.ProducerRunner;
024
025/**
026 * The return status of a {@link ProducerRunner}
027 *
028 * @since 9.1
029 */
030public class ProducerStatus {
031    public final long startTime;
032
033    public final long stopTime;
034
035    public final long nbProcessed;
036
037    public final int producer;
038
039    protected final boolean fail;
040
041    public ProducerStatus(int producer, long nbProcessed, long startTime, long stopTime, boolean fail) {
042        this.producer = producer;
043        this.nbProcessed = nbProcessed;
044        this.startTime = startTime;
045        this.stopTime = stopTime;
046        this.fail = fail;
047    }
048
049    @Override
050    public String toString() {
051        if (fail) {
052            return "Producer status FAILURE";
053        }
054        double elapsed = (stopTime - startTime) / 1000.;
055        double mps = (elapsed != 0) ? nbProcessed / elapsed : 0.0;
056        return String.format("Producer %02d status: messages: %d, elapsed: %.2fs, throughput: %.2f msg/s.", producer,
057                nbProcessed, elapsed, mps);
058    }
059
060    static String toString(List<ProducerStatus> stats) {
061        long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0);
062        long stopTime = stats.stream().mapToLong(r -> r.stopTime).max().orElse(0);
063        double elapsed = (stopTime - startTime) / 1000.;
064        long messages = stats.stream().mapToLong(r -> r.nbProcessed).sum();
065        double mps = (elapsed != 0) ? messages / elapsed : 0.0;
066        int producers = stats.size();
067        long failures = stats.stream().filter(s -> s.fail).count();
068        return String.format(
069                "Producers status: threads: %d, failures: %d, messages: %d, elapsed: %.2fs, throughput: %.2f msg/s",
070                producers, failures, messages, elapsed, mps);
071
072    }
073}