001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.util.Arrays; 022import java.util.List; 023import java.util.stream.Collectors; 024 025import org.apache.commons.cli.CommandLine; 026import org.apache.commons.cli.Option; 027import org.apache.commons.cli.Options; 028import org.nuxeo.lib.stream.computation.Settings; 029import org.nuxeo.lib.stream.computation.StreamManager; 030import org.nuxeo.lib.stream.computation.StreamProcessor; 031import org.nuxeo.lib.stream.computation.Topology; 032import org.nuxeo.lib.stream.computation.log.LogStreamManager; 033import org.nuxeo.lib.stream.log.LogManager; 034import org.nuxeo.lib.stream.log.Name; 035 036/** 037 * Track consumer positions so they can be restored in case of fail-over 038 * 039 * @since 10.1 040 */ 041public class TrackerCommand extends Command { 042 043 public static final String COMPUTATION_NAME = "tools/LatencyTracker"; 044 045 public static final String INPUT_STREAM = "input/null"; 046 047 public static final String INTERNAL_LOG_PREFIX = "_"; 048 049 protected static final String NAME = "tracker"; 050 051 protected static final String DEFAULT_INTERVAL = "60"; 052 053 protected static final String DEFAULT_COUNT = "-1"; 054 055 protected static final String ALL_LOGS = "all"; 056 057 protected static final String DEFAULT_LATENCIES_LOG = "_consumer_latencies"; 058 059 protected boolean verbose = false; 060 061 protected String output; 062 063 protected List<Name> logNames; 064 065 protected int interval; 066 067 protected int count; 068 069 protected Topology topology; 070 071 protected StreamProcessor processor; 072 073 protected String codec; 074 075 @Override 076 public String name() { 077 return NAME; 078 } 079 080 @Override 081 public void updateOptions(Options options) { 082 options.addOption(Option.builder("l") 083 .longOpt("log-name") 084 .desc("Track consumers latency for this LOG, must be a computation Record, " 085 + "can be a comma separated list of log names or ALL") 086 .required() 087 .hasArg() 088 .argName("LOG_NAME") 089 .build()); 090 options.addOption(Option.builder("o") 091 .longOpt("log-output") 092 .desc("Log name of the output") 093 .hasArg() 094 .argName("LOG_OUTPUT") 095 .build()); 096 options.addOption(Option.builder("i") 097 .longOpt("interval") 098 .desc("send latency spaced at the specified interval in seconds") 099 .hasArg() 100 .argName("INTERVAL") 101 .build()); 102 options.addOption(Option.builder("c") 103 .longOpt("count") 104 .desc("number of time to send the latency information") 105 .hasArg() 106 .argName("COUNT") 107 .build()); 108 options.addOption(Option.builder() 109 .longOpt("codec") 110 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 111 .hasArg() 112 .argName("CODEC") 113 .build()); 114 options.addOption(Option.builder().longOpt("verbose").build()); 115 } 116 117 @Override 118 public boolean run(LogManager manager, CommandLine cmd) { 119 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 120 output = cmd.getOptionValue("log-output", DEFAULT_LATENCIES_LOG); 121 codec = cmd.getOptionValue("codec"); 122 verbose = cmd.hasOption("verbose"); 123 interval = Integer.parseInt(cmd.getOptionValue("interval", DEFAULT_INTERVAL)); 124 count = Integer.parseInt(cmd.getOptionValue("count", DEFAULT_COUNT)); 125 126 initTopology(manager); 127 return runProcessor(manager); 128 } 129 130 protected List<Name> getLogNames(LogManager manager, String names) { 131 if (ALL_LOGS.equalsIgnoreCase(names)) { 132 return manager.listAllNames().stream().filter(name -> !INPUT_STREAM.equals(name.getName())) 133 .collect(Collectors.toList()); 134 } 135 List<Name> ret = Arrays.stream(names.split(",")).map(Name::ofUrn).collect(Collectors.toList()); 136 if (ret.isEmpty()) { 137 throw new IllegalArgumentException("No log name provided or found."); 138 } 139 for (Name name : ret) { 140 if (!manager.exists(name)) { 141 throw new IllegalArgumentException("Unknown log name: " + name); 142 } 143 } 144 return ret; 145 } 146 147 protected void initTopology(LogManager manager) { 148 topology = Topology.builder() 149 .addComputation( 150 () -> new LatencyTrackerComputation(manager, logNames, COMPUTATION_NAME, interval, 151 count, verbose, getRecordCodec(codec), 1), 152 Arrays.asList("i1:" + INPUT_STREAM, "o1:" + output)) 153 .build(); 154 } 155 156 protected boolean runProcessor(LogManager manager) { 157 StreamManager streamManager = new LogStreamManager(manager); 158 Settings settings = new Settings(1, 1, getRecordCodec(codec)); 159 processor = streamManager.registerAndCreateProcessor("tracker", topology, settings); 160 processor.start(); 161 while (!processor.isTerminated()) { 162 try { 163 Thread.sleep(1000); 164 } catch (InterruptedException e) { 165 Thread.currentThread().interrupt(); 166 processor.shutdown(); 167 return false; 168 } 169 } 170 return true; 171 } 172 173}