001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer;
020
021import java.util.List;
022
023import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner;
024
025/**
026 * The return status of a {@link ConsumerRunner}
027 *
028 * @since 9.1
029 */
030public class ConsumerStatus {
031    public final String consumerId;
032
033    public final long startTime;
034
035    public final long stopTime;
036
037    public final long accepted;
038
039    public final long committed;
040
041    public final long batchFailure;
042
043    public final long batchCommit;
044
045    public final boolean fail;
046
047    public ConsumerStatus(String consumerId, long accepted, long committed, long batchCommit, long batchFailure,
048            long startTime, long stopTime, boolean fail) {
049        this.consumerId = consumerId;
050        this.accepted = accepted;
051        this.committed = committed;
052        this.batchCommit = batchCommit;
053        this.batchFailure = batchFailure;
054        this.startTime = startTime;
055        this.stopTime = stopTime;
056        this.fail = fail;
057    }
058
059    @Override
060    public String toString() {
061        if (fail) {
062            return "Consumer status FAILURE";
063        }
064        double elapsed = (stopTime - startTime) / 1000.;
065        double mps = (elapsed != 0) ? committed / elapsed : 0.0;
066        return String.format(
067                "Consumer %s status: accepted (include retries): %s, committed: %d, batch: %d, batchFailure: %d, elapsed: %.2fs, throughput: %.2f msg/s.",
068                consumerId, accepted, committed, batchCommit, batchFailure, elapsed, mps);
069    }
070
071    static String toString(List<ConsumerStatus> stats) {
072        long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0);
073        long stopTime = stats.stream().mapToLong(r -> r.stopTime).max().orElse(0);
074        double elapsed = (stopTime - startTime) / 1000.;
075        long committed = stats.stream().mapToLong(r -> r.committed).sum();
076        double mps = (elapsed != 0) ? committed / elapsed : 0.0;
077        int consumers = stats.size();
078        long failures = stats.stream().filter(s -> s.fail).count();
079        return String.format(
080                "Consumers status: threads: %d, failure %d, messages committed: %d, elapsed: %.2fs, throughput: %.2f msg/s",
081                consumers, failures, committed, elapsed, mps);
082
083    }
084}