001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Gethin James 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.Externalizable; 022import java.time.DateTimeException; 023import java.time.Duration; 024import java.time.Instant; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Objects; 028 029import org.apache.commons.cli.CommandLine; 030import org.apache.commons.cli.Option; 031import org.apache.commons.cli.Options; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.lib.stream.computation.Watermark; 036import org.nuxeo.lib.stream.log.LogLag; 037import org.nuxeo.lib.stream.log.LogManager; 038import org.nuxeo.lib.stream.log.LogOffset; 039import org.nuxeo.lib.stream.log.LogPartition; 040import org.nuxeo.lib.stream.log.LogRecord; 041import org.nuxeo.lib.stream.log.LogTailer; 042import org.nuxeo.lib.stream.log.Name; 043 044/** 045 * Manipulates the consumer position to the beginning, end or a specific timestamp 046 * 047 * @since 10.1 048 */ 049public class PositionCommand extends Command { 050 private static final Log log = LogFactory.getLog(PositionCommand.class); 051 052 public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000); 053 054 public static final Duration READ_TIMEOUT = Duration.ofMillis(100); 055 056 protected static final String NAME = "position"; 057 058 public static final String AFTER_DATE_OPT = "after-date"; 059 060 public static final String TO_WATERMARK_OPT = "to-watermark"; 061 062 // @since 11.2 063 public static final String TO_OFFSET_OPT = "to-offset"; 064 065 protected static long getTimestampFromDate(String dateIso8601) { 066 if (dateIso8601 == null || dateIso8601.isEmpty()) { 067 return -1; 068 } 069 try { 070 Instant instant = Instant.parse(dateIso8601); 071 return instant.toEpochMilli(); 072 } catch (DateTimeException e) { 073 log.error("Failed to read the timeout: " + e.getMessage()); 074 log.error("The timestamp should be in ISO-8601 format, eg. " + Instant.now()); 075 } 076 return -1; 077 } 078 079 @Override 080 public String name() { 081 return NAME; 082 } 083 084 @Override 085 public void updateOptions(Options options) { 086 options.addOption(Option.builder("l") 087 .longOpt("log-name") 088 .desc("Log name") 089 .required() 090 .hasArg() 091 .argName("LOG_NAME") 092 .build()); 093 options.addOption(Option.builder("p") 094 .longOpt("partition") 095 .desc("The log partition") 096 .hasArg() 097 .argName("PARTITION") 098 .build()); 099 options.addOption( 100 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 101 options.addOption(Option.builder() 102 .longOpt("codec") 103 .desc("Codec used to read record and extract watermark when using to-watermark") 104 .hasArg() 105 .argName("CODEC") 106 .build()); 107 options.addOption( 108 Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build()); 109 options.addOption(Option.builder() 110 .longOpt("to-end") 111 .desc("Sets the committed positions to the end of partitions for the group") 112 .build()); 113 options.addOption( 114 Option.builder() 115 .longOpt(AFTER_DATE_OPT) 116 .desc("Sets the committed positions for the group to a specific date." 117 + " The date used to find the offset depends on the implementation, for Kafka this is the" 118 + " LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date." 119 + " The date is specified in ISO-8601 format, eg. " + Instant.now() 120 + ". If no record offset is found with an appropriate timestamp then the command fails.") 121 .hasArg() 122 .argName("DATE") 123 .build()); 124 options.addOption( 125 Option.builder() 126 .longOpt(TO_WATERMARK_OPT) 127 .desc("Sets the committed positions for the group to a specific date." 128 + " The date used to find the offset is contained in a record watermark. " 129 + " This means that the LOG_NAME is expected to be a computation stream with records with populated watermark." 130 + " The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\"" 131 + " The date is specified in ISO-8601 format, eg. " + Instant.now() 132 + ". If no record offset is found with an appropriate timestamp then the command fails.") 133 .hasArg() 134 .argName("DATE") 135 .build()); 136 options.addOption(Option.builder() 137 .longOpt(TO_OFFSET_OPT) 138 .desc("Sets the committed position to a specific offset." 139 + " When restarted the consumer will continue on the next record.") 140 .hasArg() 141 .argName("OFFSET") 142 .build()); 143 } 144 145 @Override 146 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 147 Name name = Name.ofUrn(cmd.getOptionValue("log-name")); 148 Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools")); 149 String codec = cmd.getOptionValue("codec"); 150 int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1")); 151 if (cmd.hasOption(AFTER_DATE_OPT)) { 152 long timestamp = getTimestampFromDate(cmd.getOptionValue(AFTER_DATE_OPT)); 153 if (timestamp >= 0) { 154 return positionAfterDate(manager, group, name, partition, timestamp); 155 } 156 } else if (cmd.hasOption(TO_WATERMARK_OPT)) { 157 long timestamp = getTimestampFromDate(cmd.getOptionValue(TO_WATERMARK_OPT)); 158 if (timestamp >= 0) { 159 return positionToWatermark(manager, group, name, partition, timestamp, codec); 160 } 161 } else if (cmd.hasOption(TO_OFFSET_OPT)) { 162 long offset = Long.parseLong(cmd.getOptionValue(TO_OFFSET_OPT)); 163 return positionToOffset(manager, group, name, partition, offset); 164 } else if (cmd.hasOption("to-end")) { 165 return toEnd(manager, group, name, partition); 166 } else if (cmd.hasOption("reset")) { 167 return reset(manager, group, name, partition); 168 } else { 169 log.error("Invalid option, try 'help position'"); 170 } 171 return false; 172 } 173 174 protected boolean toEnd(LogManager manager, Name group, Name name, int partition) { 175 LogLag lag = getLag(manager, group, name, partition); 176 try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) { 177 tailer.toEnd(); 178 tailer.commit(); 179 } 180 log.info(String.format("# Moved log %s, group: %s, from: %s to %s", labelFor(name, partition), group, 181 lag.lower(), lag.upper())); 182 return true; 183 } 184 185 protected String labelFor(int partition) { 186 return partition >= 0 ? Integer.toString(partition) : "all"; 187 } 188 189 protected String labelFor(Name name, int partition) { 190 return partition >= 0 ? name.getUrn() + ":" + labelFor(partition) : name.getUrn(); 191 } 192 193 protected LogLag getLag(LogManager manager, Name group, Name name, int partition) { 194 if (partition >= 0) { 195 return manager.getLagPerPartition(name, group).get(partition); 196 } else { 197 return manager.getLag(name, group); 198 } 199 } 200 201 protected <T extends Externalizable> LogTailer<T> createTailer(LogManager manager, Name name, int partition, 202 Name group) { 203 if (partition >= 0) { 204 return manager.createTailer(group, new LogPartition(name, partition)); 205 } 206 return manager.createTailer(group, name); 207 } 208 209 protected boolean reset(LogManager manager, Name group, Name name, int partition) { 210 LogLag lag = getLag(manager, group, name, partition); 211 long pos = lag.lower(); 212 try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) { 213 tailer.reset(); 214 } 215 log.warn(String.format("# Reset log %s, group: %s, from: %s to 0", labelFor(name, partition), group, pos)); 216 return true; 217 } 218 219 protected boolean positionAfterDate(LogManager manager, Name group, Name name, int partition, long timestamp) { 220 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 221 boolean movedOffset = false; 222 for (int part = 0; part < manager.size(name); part++) { 223 if (partition >= 0 && part != partition) { 224 continue; 225 } 226 LogPartition logPartition = new LogPartition(name, part); 227 LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp); 228 if (logOffset == null) { 229 log.error(String.format("# Could not find an offset for group: %s, partition: %s", group, 230 logPartition)); 231 continue; 232 } 233 tailer.seek(logOffset); 234 movedOffset = true; 235 log.warn(String.format("# Set log %s, group: %s, to offset %s", labelFor(name, part), group, 236 logOffset.offset())); 237 } 238 if (movedOffset) { 239 tailer.commit(); 240 return true; 241 } 242 } 243 log.error("No offset found for the specified date"); 244 return false; 245 } 246 247 protected boolean positionToWatermark(LogManager manager, Name group, Name name, int partition, long timestamp, 248 String codec) 249 throws InterruptedException { 250 Name newGroup = Name.ofUrn("admin/tools"); 251 int size = manager.size(name); 252 List<LogOffset> offsets = new ArrayList<>(size); 253 List<LogLag> lags = manager.getLagPerPartition(name, newGroup); 254 int part = 0; 255 // find the offsets first 256 for (LogLag lag : lags) { 257 if (lag.lag() == 0) { 258 // empty partition nothing to do 259 offsets.add(null); 260 } else { 261 if (partition >= 0 && part != partition) { 262 offsets.add(null); 263 } 264 try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, part), 265 getRecordCodec(codec))) { 266 offsets.add(searchWatermarkOffset(tailer, timestamp)); 267 } 268 } 269 part++; 270 } 271 if (offsets.stream().noneMatch(Objects::nonNull)) { 272 if (LogLag.of(lags).upper() == 0) { 273 log.error("No offsets found because log is empty"); 274 return false; 275 } 276 log.error("Timestamp: " + timestamp + " is earlier as any records, resetting positions"); 277 return reset(manager, group, name, partition); 278 } 279 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 280 offsets.stream().filter(Objects::nonNull).forEach(tailer::seek); 281 tailer.commit(); 282 offsets.stream().filter(Objects::nonNull).forEach(offset -> log.warn("# Moving consumer to: " + offset)); 283 } 284 return true; 285 } 286 287 protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException { 288 LogOffset lastOffset = null; 289 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(READ_TIMEOUT)) { 290 long recTimestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp(); 291 if (recTimestamp == timestamp) { 292 return rec.offset(); 293 } else if (recTimestamp > timestamp) { 294 return lastOffset; 295 } 296 if (recTimestamp == 0) { 297 throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec); 298 } 299 lastOffset = rec.offset(); 300 } 301 // not found returns last offset of partition 302 return lastOffset; 303 } 304 305 protected boolean positionToOffset(LogManager manager, Name group, Name name, int partition, long offset) { 306 LogLag lag = getLag(manager, group, name, partition); 307 long pos = lag.lower(); 308 boolean goToStart = false; 309 if (offset < pos) { 310 goToStart = true; 311 } else if (offset == pos) { 312 log.error(String.format("Current offset for group %s on %s is already %d", group, labelFor(name, partition), 313 pos)); 314 return false; 315 } 316 try (LogTailer<Externalizable> tailer = manager.createTailer(group, LogPartition.of(name, partition))) { 317 if (goToStart) { 318 tailer.toStart(); 319 } 320 for (;;) { 321 LogRecord<Externalizable> record = tailer.read(Duration.ofSeconds(1)); 322 if (record == null || record.offset().offset() > offset) { 323 log.error("No offset found for the specified partition"); 324 return false; 325 } 326 if (record.offset().offset() == offset) { 327 tailer.commit(); 328 log.warn(String.format("# Move group %s on %s from %d to %d", group, labelFor(name, partition), pos, 329 offset)); 330 return true; 331 } 332 } 333 } catch (InterruptedException e) { 334 Thread.currentThread().interrupt(); 335 log.error("Interrupted"); 336 return false; 337 } 338 } 339}