001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Gethin James 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.Externalizable; 022import java.time.DateTimeException; 023import java.time.Duration; 024import java.time.Instant; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Objects; 028 029import org.apache.commons.cli.CommandLine; 030import org.apache.commons.cli.Option; 031import org.apache.commons.cli.Options; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.lib.stream.computation.Watermark; 036import org.nuxeo.lib.stream.log.LogLag; 037import org.nuxeo.lib.stream.log.LogManager; 038import org.nuxeo.lib.stream.log.LogOffset; 039import org.nuxeo.lib.stream.log.LogPartition; 040import org.nuxeo.lib.stream.log.LogRecord; 041import org.nuxeo.lib.stream.log.LogTailer; 042 043/** 044 * Manipulates the consumer position to the beginning, end or a specific timestamp 045 * 046 * @since 10.1 047 */ 048public class PositionCommand extends Command { 049 private static final Log log = LogFactory.getLog(PositionCommand.class); 050 051 public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000); 052 053 public static final Duration READ_TIMEOUT = Duration.ofMillis(100); 054 055 protected static final String NAME = "position"; 056 057 public static final String AFTER_DATE_OPT = "after-date"; 058 059 public static final String TO_WATERMARK_OPT = "to-watermark"; 060 061 protected static long getTimestampFromDate(String dateIso8601) { 062 if (dateIso8601 == null || dateIso8601.isEmpty()) { 063 return -1; 064 } 065 try { 066 Instant instant = Instant.parse(dateIso8601); 067 return instant.toEpochMilli(); 068 } catch (DateTimeException e) { 069 log.error("Failed to read the timeout: " + e.getMessage()); 070 log.error("The timestamp should be in ISO-8601 format, eg. " + Instant.now()); 071 } 072 return -1; 073 } 074 075 @Override 076 public String name() { 077 return NAME; 078 } 079 080 @Override 081 public void updateOptions(Options options) { 082 options.addOption(Option.builder("l") 083 .longOpt("log-name") 084 .desc("Log name") 085 .required() 086 .hasArg() 087 .argName("LOG_NAME") 088 .build()); 089 options.addOption(Option.builder("p") 090 .longOpt("partition") 091 .desc("Read only this partition") 092 .hasArg() 093 .argName("PARTITION") 094 .build()); 095 options.addOption( 096 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 097 options.addOption( 098 Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build()); 099 options.addOption(Option.builder() 100 .longOpt("to-end") 101 .desc("Sets the committed positions to the end of partitions for the group") 102 .build()); 103 options.addOption( 104 Option.builder() 105 .longOpt(AFTER_DATE_OPT) 106 .desc("Sets the committed positions for the group to a specific date." 107 + " The date used to find the offset depends on the implementation, for Kafka this is the" 108 + " LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date." 109 + " The date is specified in ISO-8601 format, eg. " + Instant.now() 110 + ". If no record offset is found with an appropriate timestamp then the command fails.") 111 .hasArg() 112 .argName("DATE") 113 .build()); 114 options.addOption( 115 Option.builder() 116 .longOpt(TO_WATERMARK_OPT) 117 .desc("Sets the committed positions for the group to a specific date." 118 + " The date used to find the offset is contained in a record watermark. " 119 + " This means that the LOG_NAME is expected to be a computation stream with records with populated watermark." 120 + " The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\"" 121 + " The date is specified in ISO-8601 format, eg. " + Instant.now() 122 + ". If no record offset is found with an appropriate timestamp then the command fails.") 123 .hasArg() 124 .argName("DATE") 125 .build()); 126 } 127 128 @Override 129 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 130 String name = cmd.getOptionValue("log-name"); 131 String group = cmd.getOptionValue("group", "tools"); 132 int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1")); 133 if (cmd.hasOption(AFTER_DATE_OPT)) { 134 long timestamp = getTimestampFromDate(cmd.getOptionValue(AFTER_DATE_OPT)); 135 if (timestamp >= 0) { 136 return positionAfterDate(manager, group, name, partition, timestamp); 137 } 138 } else if (cmd.hasOption(TO_WATERMARK_OPT)) { 139 long timestamp = getTimestampFromDate(cmd.getOptionValue(TO_WATERMARK_OPT)); 140 if (timestamp >= 0) { 141 return positionToWatermark(manager, group, name, partition, timestamp); 142 } 143 } else if (cmd.hasOption("to-end")) { 144 return toEnd(manager, group, name, partition); 145 } else if (cmd.hasOption("reset")) { 146 return reset(manager, group, name, partition); 147 } else { 148 log.error("Invalid option, try 'help position'"); 149 } 150 return false; 151 } 152 153 protected boolean toEnd(LogManager manager, String group, String name, int partition) { 154 LogLag lag = getLag(manager, group, name, partition); 155 try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) { 156 tailer.toEnd(); 157 tailer.commit(); 158 } 159 log.info(String.format("# Moved log %s, group: %s, from: %s to %s", labelFor(name, partition), group, 160 lag.lower(), lag.upper())); 161 return true; 162 } 163 164 protected String labelFor(int partition) { 165 return partition >= 0 ? Integer.toString(partition) : "all"; 166 } 167 168 protected String labelFor(String name, int partition) { 169 return partition >= 0 ? name + ":" + labelFor(partition) : name; 170 } 171 172 protected LogLag getLag(LogManager manager, String group, String name, int partition) { 173 if (partition >= 0) { 174 return manager.getLagPerPartition(name, group).get(partition); 175 } else { 176 return manager.getLag(name, group); 177 } 178 } 179 180 protected <T extends Externalizable> LogTailer<T> createTailer(LogManager manager, String name, int partition, 181 String group) { 182 if (partition >= 0) { 183 return manager.createTailer(group, new LogPartition(name, partition)); 184 } 185 return manager.createTailer(group, name); 186 } 187 188 protected boolean reset(LogManager manager, String group, String name, int partition) { 189 LogLag lag = getLag(manager, group, name, partition); 190 long pos = lag.lower(); 191 try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) { 192 tailer.reset(); 193 } 194 log.warn(String.format("# Reset log %s, group: %s, from: %s to 0", labelFor(name, partition), group, pos)); 195 return true; 196 } 197 198 protected boolean positionAfterDate(LogManager manager, String group, String name, int partition, long timestamp) { 199 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 200 boolean movedOffset = false; 201 for (int part = 0; part < manager.size(name); part++) { 202 if (partition >= 0 && part != partition) { 203 continue; 204 } 205 LogPartition logPartition = new LogPartition(name, part); 206 LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp); 207 if (logOffset == null) { 208 log.error(String.format("# Could not find an offset for group: %s, partition: %s", group, 209 logPartition)); 210 continue; 211 } 212 tailer.seek(logOffset); 213 movedOffset = true; 214 log.info(String.format("# Set log %s, group: %s, to offset %s", labelFor(name, part), group, 215 logOffset.offset())); 216 } 217 if (movedOffset) { 218 tailer.commit(); 219 return true; 220 } 221 } 222 log.error("No offset found for the specified date"); 223 return false; 224 } 225 226 protected boolean positionToWatermark(LogManager manager, String group, String name, int partition, long timestamp) 227 throws InterruptedException { 228 String newGroup = "tools"; 229 int size = manager.size(name); 230 List<LogOffset> offsets = new ArrayList<>(size); 231 List<LogLag> lags = manager.getLagPerPartition(name, newGroup); 232 int part = 0; 233 // find the offsets first 234 for (LogLag lag : lags) { 235 if (lag.lag() == 0) { 236 // empty partition nothing to do 237 offsets.add(null); 238 } else { 239 if (partition >= 0 && part != partition) { 240 offsets.add(null); 241 } 242 try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, part))) { 243 offsets.add(searchWatermarkOffset(tailer, timestamp)); 244 } 245 } 246 part++; 247 } 248 if (offsets.stream().noneMatch(Objects::nonNull)) { 249 if (LogLag.of(lags).upper() == 0) { 250 log.error("No offsets found because log is empty"); 251 return false; 252 } 253 log.error("Timestamp: " + timestamp + " is earlier as any records, resetting positions"); 254 return reset(manager, group, name, partition); 255 } 256 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 257 offsets.stream().filter(Objects::nonNull).forEach(tailer::seek); 258 tailer.commit(); 259 offsets.stream().filter(Objects::nonNull).forEach(offset -> log.info("# Moving consumer to: " + offset)); 260 } 261 return true; 262 } 263 264 protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException { 265 LogOffset lastOffset = null; 266 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(READ_TIMEOUT)) { 267 long recTimestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp(); 268 if (recTimestamp == timestamp) { 269 return rec.offset(); 270 } else if (recTimestamp > timestamp) { 271 return lastOffset; 272 } 273 if (recTimestamp == 0) { 274 throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec); 275 } 276 lastOffset = rec.offset(); 277 } 278 // not found returns last offset of partition 279 return lastOffset; 280 } 281 282}