001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.pattern.consumer; 020 021import java.util.List; 022 023import org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner; 024 025/** 026 * The return status of a {@link ConsumerRunner} 027 * 028 * @since 9.1 029 */ 030public class ConsumerStatus { 031 public final String consumerId; 032 033 public final long startTime; 034 035 public final long stopTime; 036 037 public final long accepted; 038 039 public final long committed; 040 041 public final long batchFailure; 042 043 public final long batchCommit; 044 045 public final boolean fail; 046 047 public ConsumerStatus(String consumerId, long accepted, long committed, long batchCommit, long batchFailure, 048 long startTime, long stopTime, boolean fail) { 049 this.consumerId = consumerId; 050 this.accepted = accepted; 051 this.committed = committed; 052 this.batchCommit = batchCommit; 053 this.batchFailure = batchFailure; 054 this.startTime = startTime; 055 this.stopTime = stopTime; 056 this.fail = fail; 057 } 058 059 @Override 060 public String toString() { 061 if (fail) { 062 return "Consumer status FAILURE"; 063 } 064 double elapsed = (stopTime - startTime) / 1000.; 065 double mps = (elapsed != 0) ? committed / elapsed : 0.0; 066 return String.format( 067 "Consumer %s status: accepted (include retries): %s, committed: %d, batch: %d, batchFailure: %d, elapsed: %.2fs, throughput: %.2f msg/s.", 068 consumerId, accepted, committed, batchCommit, batchFailure, elapsed, mps); 069 } 070 071 static String toString(List<ConsumerStatus> stats) { 072 long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0); 073 long stopTime = stats.stream().mapToLong(r -> r.stopTime).max().orElse(0); 074 double elapsed = (stopTime - startTime) / 1000.; 075 long committed = stats.stream().mapToLong(r -> r.committed).sum(); 076 double mps = (elapsed != 0) ? committed / elapsed : 0.0; 077 int consumers = stats.size(); 078 long failures = stats.stream().filter(s -> s.fail).count(); 079 return String.format( 080 "Consumers status: threads: %d, failure %d, messages committed: %d, elapsed: %.2fs, throughput: %.2f msg/s", 081 consumers, failures, committed, elapsed, mps); 082 083 } 084}