001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.producer; 020 021import org.nuxeo.ecm.platform.importer.mqueues.pattern.producer.internals.ProducerRunner; 022 023import java.util.List; 024 025/** 026 * The return status of a {@link ProducerRunner} 027 * 028 * @since 9.1 029 */ 030public class ProducerStatus { 031 public final long startTime; 032 public final long stopTime; 033 public final long nbProcessed; 034 public final int producer; 035 private final boolean fail; 036 037 public ProducerStatus(int producer, long nbProcessed, long startTime, long stopTime, boolean fail) { 038 this.producer = producer; 039 this.nbProcessed = nbProcessed; 040 this.startTime = startTime; 041 this.stopTime = stopTime; 042 this.fail = fail; 043 } 044 045 @Override 046 public String toString() { 047 if (fail) { 048 return "Producer status FAILURE"; 049 } 050 double elapsed = (stopTime - startTime) / 1000.; 051 double mps = (elapsed != 0) ? nbProcessed / elapsed : 0.0; 052 return String.format("Producer %02d status: messages: %d, elapsed: %.2fs, throughput: %.2f msg/s.", 053 producer, nbProcessed, elapsed, mps); 054 } 055 056 static String toString(List<ProducerStatus> stats) { 057 long startTime = stats.stream().mapToLong(r -> r.startTime).min().orElse(0); 058 long stopTime = stats.stream().mapToLong(r -> r.stopTime).min().orElse(0); 059 double elapsed = (stopTime - startTime) / 1000.; 060 long messages = stats.stream().mapToLong(r -> r.nbProcessed).sum(); 061 double mps = (elapsed != 0) ? messages / elapsed : 0.0; 062 int producers = stats.size(); 063 long failures = stats.stream().filter(s -> s.fail).count(); 064 return String.format("Producers status: threads: %d, failures: %d, messages: %d, elapsed: %.2fs, throughput: %.2f msg/s", 065 producers, failures, messages, elapsed, mps); 066 067 } 068}