001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.util.Arrays; 022import java.util.List; 023import java.util.stream.Collectors; 024 025import org.apache.commons.cli.CommandLine; 026import org.apache.commons.cli.Option; 027import org.apache.commons.cli.Options; 028import org.nuxeo.lib.stream.computation.Settings; 029import org.nuxeo.lib.stream.computation.Topology; 030import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 031import org.nuxeo.lib.stream.log.LogManager; 032 033/** 034 * Track consumer positions so they can be restored in case of fail-over 035 * 036 * @since 10.1 037 */ 038public class TrackerCommand extends Command { 039 040 public static final String COMPUTATION_NAME = "LatencyTracker"; 041 042 public static final String INPUT_STREAM = "log_null"; 043 044 public static final String INTERNAL_LOG_PREFIX = "_"; 045 046 protected static final String NAME = "tracker"; 047 048 protected static final String DEFAULT_INTERVAL = "60"; 049 050 protected static final String DEFAULT_COUNT = "-1"; 051 052 protected static final String ALL_LOGS = "all"; 053 054 protected static final String DEFAULT_LATENCIES_LOG = "_consumer_latencies"; 055 056 protected boolean verbose = false; 057 058 protected String output; 059 060 protected List<String> logNames; 061 062 protected int interval; 063 064 protected int count; 065 066 protected Topology topology; 067 068 protected LogStreamProcessor processor; 069 070 protected String codec; 071 072 @Override 073 public String name() { 074 return NAME; 075 } 076 077 @Override 078 public void updateOptions(Options options) { 079 options.addOption(Option.builder("l") 080 .longOpt("log-name") 081 .desc("Track consumers latency for this LOG, must be a computation Record, " 082 + "can be a comma separated list of log names or ALL") 083 .required() 084 .hasArg() 085 .argName("LOG_NAME") 086 .build()); 087 options.addOption(Option.builder("o") 088 .longOpt("log-output") 089 .desc("Log name of the output") 090 .hasArg() 091 .argName("LOG_OUTPUT") 092 .build()); 093 options.addOption(Option.builder("i") 094 .longOpt("interval") 095 .desc("send latency spaced at the specified interval in seconds") 096 .hasArg() 097 .argName("INTERVAL") 098 .build()); 099 options.addOption(Option.builder("c") 100 .longOpt("count") 101 .desc("number of time to send the latency information") 102 .hasArg() 103 .argName("COUNT") 104 .build()); 105 options.addOption(Option.builder() 106 .longOpt("codec") 107 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 108 .hasArg() 109 .argName("CODEC") 110 .build()); 111 options.addOption(Option.builder().longOpt("verbose").build()); 112 } 113 114 @Override 115 public boolean run(LogManager manager, CommandLine cmd) { 116 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 117 output = cmd.getOptionValue("log-output", DEFAULT_LATENCIES_LOG); 118 codec = cmd.getOptionValue("codec"); 119 verbose = cmd.hasOption("verbose"); 120 interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL)); 121 count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT)); 122 123 initTopology(manager); 124 return runProcessor(manager); 125 } 126 127 protected List<String> getLogNames(LogManager manager, String names) { 128 if (ALL_LOGS.equalsIgnoreCase(names)) { 129 return manager.listAll() 130 .stream() 131 .filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX)) 132 .collect(Collectors.toList()); 133 } 134 List<String> ret = Arrays.asList(names.split(",")); 135 for (String name : ret) { 136 if (!manager.exists(name)) { 137 throw new IllegalArgumentException("Unknown log name: " + name); 138 } 139 } 140 return ret; 141 } 142 143 protected void initTopology(LogManager manager) { 144 topology = Topology.builder() 145 .addComputation( 146 () -> new LatencyTrackerComputation(manager, logNames, COMPUTATION_NAME, interval, 147 count, verbose, getRecordCodec(codec)), 148 Arrays.asList("i1:" + INPUT_STREAM, "o1:" + output)) 149 .build(); 150 } 151 152 protected boolean runProcessor(LogManager manager) { 153 processor = new LogStreamProcessor(manager); 154 Settings settings = new Settings(1, 1, getRecordCodec(codec)); 155 processor.init(topology, settings).start(); 156 while (!processor.isTerminated()) { 157 try { 158 Thread.sleep(1000); 159 } catch (InterruptedException e) { 160 Thread.currentThread().interrupt(); 161 processor.shutdown(); 162 return false; 163 } 164 } 165 return true; 166 } 167 168}