001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.nio.charset.StandardCharsets; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.List; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.lib.stream.codec.Codec; 029import org.nuxeo.lib.stream.computation.AbstractComputation; 030import org.nuxeo.lib.stream.computation.ComputationContext; 031import org.nuxeo.lib.stream.computation.Record; 032import org.nuxeo.lib.stream.computation.Watermark; 033import org.nuxeo.lib.stream.log.Latency; 034import org.nuxeo.lib.stream.log.LogManager; 035import org.nuxeo.lib.stream.log.Name; 036import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 037 038/** 039 * A computation that sends periodically latencies information into a Log. 040 * 041 * @since 10.1 042 */ 043public class LatencyTrackerComputation extends AbstractComputation { 044 private static final Log log = LogFactory.getLog(LatencyTrackerComputation.class); 045 046 protected static final String OUTPUT_STREAM = "o1"; 047 048 protected final LogManager manager; 049 050 protected final List<Name> logNames; 051 052 protected final int intervalMs; 053 054 protected final int count; 055 056 protected final boolean verbose; 057 058 protected final Codec<Record> codec; 059 060 protected int remaining; 061 062 protected final List<LogPartitionGroup> logGroups = new ArrayList<>(); 063 064 protected int refreshGroupCounter; 065 066 public LatencyTrackerComputation(LogManager manager, List<Name> logNames, String computationName, 067 int intervalSecond, int count, boolean verbose, Codec<Record> codec, int outputStream) { 068 super(computationName, 1, outputStream); 069 this.manager = manager; 070 this.logNames = logNames; 071 this.intervalMs = 1000 * intervalSecond; 072 this.count = count; 073 this.remaining = count; 074 this.verbose = verbose; 075 this.codec = codec; 076 } 077 078 @Override 079 public void init(ComputationContext context) { 080 log.info(String.format("Tracking %d streams: %s, count: %d, interval: %dms", logNames.size(), 081 Arrays.toString(logNames.toArray()), count, 082 intervalMs)); 083 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 084 } 085 086 @Override 087 public void processTimer(ComputationContext context, String key, long timestamp) { 088 if (remaining == 0) { 089 if (verbose) { 090 log.info("Exiting after " + count + " captures"); 091 } 092 context.askForTermination(); 093 return; 094 } 095 if (verbose) { 096 log.info(String.format("Tracking latency %d/%d", count - remaining, count)); 097 } 098 List<LogPartitionGroup> toRemove = new ArrayList<>(); 099 for (LogPartitionGroup logGroup : getLogGroup()) { 100 try { 101 List<Latency> latencies = manager.getLatencyPerPartition(logGroup.name, logGroup.group, codec, 102 (rec -> Watermark.ofValue(rec.getWatermark()).getTimestamp()), (Record::getKey)); 103 if (!latencies.isEmpty()) { 104 processLatencies(context, logGroup, latencies); 105 } 106 } catch (Exception e) { 107 if (e.getCause() instanceof ClassNotFoundException || e.getCause() instanceof ClassCastException 108 || e instanceof IllegalStateException || e instanceof IllegalArgumentException) { 109 log.warn("log does not contains computation Record, removing partition: " + logGroup); 110 toRemove.add(logGroup); 111 continue; 112 } 113 throw e; 114 } 115 } 116 context.askForCheckpoint(); 117 context.setTimer("tracker", System.currentTimeMillis() + intervalMs); 118 remaining--; 119 if (!toRemove.isEmpty()) { 120 logGroups.removeAll(toRemove); 121 if (logGroups.isEmpty()) { 122 log.error("Exiting because all logs have been skipped"); 123 context.askForTermination(); 124 } 125 } 126 } 127 128 protected List<LogPartitionGroup> getLogGroup() { 129 if (logGroups.isEmpty() || refreshGroup()) { 130 logGroups.clear(); 131 logNames.forEach(name -> { 132 for (Name group : manager.listConsumerGroups(name)) { 133 logGroups.add(new LogPartitionGroup(group, name, 0)); 134 } 135 }); 136 if (verbose) { 137 log.info("Update list of consumers: " + Arrays.toString(logGroups.toArray())); 138 } 139 } 140 return logGroups; 141 } 142 143 protected boolean refreshGroup() { 144 refreshGroupCounter += 1; 145 return (refreshGroupCounter % 5) == 0; 146 } 147 148 protected void processLatencies(ComputationContext context, LogPartitionGroup logGroup, List<Latency> latencies) { 149 for (int partition = 0; partition < latencies.size(); partition++) { 150 Latency latency = latencies.get(partition); 151 if (latency.lower() <= 0) { 152 // lower is the watermark timestamp for the latest processed record, without this info we cannot do 153 // anything 154 continue; 155 } 156 // upper is the time when the latency has been measured it is used as the watermark 157 long recordWatermark = Watermark.ofTimestamp(latency.upper()).getValue(); 158 String recordKey = encodeKey(logGroup, partition); 159 byte[] recordValue = encodeLatency(latency); 160 Record record = new Record(recordKey, recordValue, recordWatermark); 161 if (verbose) { 162 log.info("out: " + record); 163 } 164 context.produceRecord(OUTPUT_STREAM, record); 165 context.setSourceLowWatermark(recordWatermark); 166 } 167 } 168 169 protected byte[] encodeLatency(Latency latency) { 170 return latency.asJson().getBytes(StandardCharsets.UTF_8); 171 } 172 173 public static String encodeKey(LogPartitionGroup logGroup, int partition) { 174 return String.format("%s:%s:%s", logGroup.group.getId(), logGroup.name.getId(), partition); 175 } 176 177 public static LogPartitionGroup decodeKey(String key) { 178 String[] parts = key.split(":"); 179 return new LogPartitionGroup(Name.ofId(parts[0]), Name.ofId(parts[1]), Integer.parseInt(parts[2])); 180 } 181 182 @Override 183 public void destroy() { 184 log.info("Good bye"); 185 } 186 187 @Override 188 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 189 log.error("Receiving a record is not expected: " + record); 190 } 191 192}