001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer;
020
021import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.internals.ConsumerRunner;
022
023import java.util.List;
024
025/**
026 * The return status of a {@link ConsumerRunner}
027 *
028 * @since 9.1
029 */
030public class ConsumerStatus {
031    public final String consumerId;
032    public final long startTime;
033    public final long stopTime;
034    public final long accepted;
035    public final long committed;
036    public final long batchFailure;
037    public final long batchCommit;
038    public final boolean fail;
039
040    public ConsumerStatus(String consumerId, long accepted, long committed, long batchCommit, long batchFailure, long startTime, long stopTime, boolean fail) {
041        this.consumerId = consumerId;
042        this.accepted = accepted;
043        this.committed = committed;
044        this.batchCommit = batchCommit;
045        this.batchFailure = batchFailure;
046        this.startTime = startTime;
047        this.stopTime = stopTime;
048        this.fail = fail;
049    }
050
051    @Override
052    public String toString() {
053        if (fail) {
054            return "Consumer status FAILURE";
055        }
056        double elapsed = (stopTime - startTime) / 1000.;
057        double mps = (elapsed != 0) ? committed / elapsed : 0.0;
058        return String.format("Consumer %s status: accepted (include retries): %s, committed: %d, batch: %d, batchFailure: %d, elapsed: %.2fs, throughput: %.2f msg/s.",
059                consumerId, accepted, committed, batchCommit, batchFailure, elapsed, mps);
060    }
061
062    static String toString(List<ConsumerStatus> stats) {
063        long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0);
064        long stopTime = stats.stream().mapToLong(r -> r.stopTime).min().orElse(0);
065        double elapsed = (stopTime - startTime) / 1000.;
066        long committed = stats.stream().mapToLong(r -> r.committed).sum();
067        double mps = (elapsed != 0) ? committed / elapsed : 0.0;
068        int consumers = stats.size();
069        long failures = stats.stream().filter(s -> s.fail).count();
070        return String.format("Consumers status: threads: %d, failure %d, messages committed: %d, elapsed: %.2fs, throughput: %.2f msg/s",
071                consumers, failures, committed, elapsed, mps);
072
073    }
074}