001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.consumer;
020
021import java.util.List;
022
023/**
024 * The return status of a {@link ConsumerRunner}
025 *
026 * @since 9.1
027 */
028public class ConsumerStatus {
029    public final int consumer;
030    public final long startTime;
031    public final long stopTime;
032    public final long accepted;
033    public final long committed;
034    public final long batchFailure;
035    public final long batchCommit;
036    public final boolean fail;
037
038    public ConsumerStatus(int consumer, long accepted, long committed, long batchCommit, long batchFailure, long startTime, long stopTime, boolean fail) {
039        this.consumer = consumer;
040        this.accepted = accepted;
041        this.committed = committed;
042        this.batchCommit = batchCommit;
043        this.batchFailure = batchFailure;
044        this.startTime = startTime;
045        this.stopTime = stopTime;
046        this.fail = fail;
047    }
048
049    @Override
050    public String toString() {
051        if (fail) {
052            return "Consumer status FAILURE";
053        }
054        double elapsed = (stopTime - startTime) / 1000.;
055        double mps = (elapsed != 0) ? committed / elapsed : 0.0;
056        return String.format("Consumer %02d status: accepted (include retries): %d, committed: %d, batch: %d, batchFailure: %d, elapsed: %.2fs, throughput: %.2f msg/s.",
057                consumer, accepted, committed, batchCommit, batchFailure, elapsed, mps);
058    }
059
060    static String toString(List<ConsumerStatus> stats) {
061        long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0);
062        long stopTime = stats.stream().mapToLong(r -> r.stopTime).min().orElse(0);
063        double elapsed = (stopTime - startTime) / 1000.;
064        long committed = stats.stream().mapToLong(r -> r.committed).sum();
065        double mps = (elapsed != 0) ? committed / elapsed : 0.0;
066        int consumers = stats.size();
067        long failures = stats.stream().filter(s -> s.fail).count();
068        return String.format("Consumers status: threads: %d, failure %d, messages committed: %d, elapsed: %.2fs, throughput: %.2f msg/s",
069                consumers, failures, committed, elapsed, mps);
070
071    }
072}