001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Gethin James 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.Externalizable; 022import java.time.DateTimeException; 023import java.time.Duration; 024import java.time.Instant; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Objects; 028 029import org.apache.commons.cli.CommandLine; 030import org.apache.commons.cli.Option; 031import org.apache.commons.cli.Options; 032import org.nuxeo.lib.stream.computation.Record; 033import org.nuxeo.lib.stream.computation.Watermark; 034import org.nuxeo.lib.stream.log.LogLag; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.LogOffset; 037import org.nuxeo.lib.stream.log.LogPartition; 038import org.nuxeo.lib.stream.log.LogRecord; 039import org.nuxeo.lib.stream.log.LogTailer; 040 041/** 042 * Manipulates the consumer position to the beginning, end or a specific timestamp 043 * 044 * @since 10.1 045 */ 046public class PositionCommand extends Command { 047 048 public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000); 049 050 public static final Duration READ_TIMEOUT = Duration.ofMillis(100); 051 052 protected static final String NAME = "position"; 053 054 protected static long getTimestampFromDate(String dateIso8601) { 055 if (dateIso8601 == null || dateIso8601.isEmpty()) { 056 return -1; 057 } 058 try { 059 Instant instant = Instant.parse(dateIso8601); 060 return instant.toEpochMilli(); 061 } catch (DateTimeException e) { 062 System.err.println("Failed to read the timeout: " + e.getMessage()); 063 System.err.println("The timestamp should be in ISO-8601 format, eg. " + Instant.now()); 064 } 065 return -1; 066 } 067 068 @Override 069 public String name() { 070 return NAME; 071 } 072 073 @Override 074 public void updateOptions(Options options) { 075 options.addOption(Option.builder("l") 076 .longOpt("log-name") 077 .desc("Log name") 078 .required() 079 .hasArg() 080 .argName("LOG_NAME") 081 .build()); 082 options.addOption( 083 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 084 options.addOption( 085 Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build()); 086 options.addOption(Option.builder() 087 .longOpt("to-end") 088 .desc("Sets the committed positions to the end of partitions for the group") 089 .build()); 090 options.addOption( 091 Option.builder() 092 .longOpt("after-date") 093 .desc("Sets the committed positions for the group to a specific date." 094 + " The date used to find the offset depends on the implementation, for Kafka this is the" 095 + " LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date." 096 + " The date is specified in ISO-8601 format, eg. " + Instant.now() 097 + ". If no record offset is found with an appropriate timestamp then the command fails.") 098 .hasArg() 099 .argName("DATE") 100 .build()); 101 options.addOption( 102 Option.builder() 103 .longOpt("to-watermark") 104 .desc("Sets the committed positions for the group to a specific date." 105 + " The date used to find the offset is contained in a record watermark. " 106 + " This means that the LOG_NAME is expected to be a computation stream with records with populated watermark." 107 + " The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\"" 108 + " The date is specified in ISO-8601 format, eg. " + Instant.now() 109 + ". If no record offset is found with an appropriate timestamp then the command fails.") 110 .hasArg() 111 .argName("DATE") 112 .build()); 113 } 114 115 @Override 116 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 117 String name = cmd.getOptionValue("log-name"); 118 String group = cmd.getOptionValue("group", "tools"); 119 120 if (cmd.hasOption("after-date")) { 121 long timestamp = getTimestampFromDate(cmd.getOptionValue("after-date")); 122 if (timestamp >= 0) { 123 return positionAfterDate(manager, group, name, timestamp); 124 } 125 } else if (cmd.hasOption("to-watermark")) { 126 long timestamp = getTimestampFromDate(cmd.getOptionValue("to-watermark")); 127 if (timestamp >= 0) { 128 return positionToWatermark(manager, group, name, timestamp); 129 } 130 } else if (cmd.hasOption("to-end")) { 131 return toEnd(manager, group, name); 132 } else if (cmd.hasOption("reset")) { 133 return reset(manager, group, name); 134 } else { 135 System.err.println("Invalid option, try 'help position'"); 136 } 137 return false; 138 } 139 140 protected boolean toEnd(LogManager manager, String group, String name) { 141 LogLag lag = manager.getLag(name, group); 142 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 143 tailer.toEnd(); 144 tailer.commit(); 145 } 146 System.out.println( 147 String.format("# Moved log %s, group: %s, from: %s to %s", name, group, lag.lower(), lag.upper())); 148 return true; 149 } 150 151 protected boolean reset(LogManager manager, String group, String name) { 152 LogLag lag = manager.getLag(name, group); 153 long pos = lag.lower(); 154 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 155 tailer.reset(); 156 } 157 System.out.println(String.format("# Reset log %s, group: %s, from: %s to 0", name, group, pos)); 158 return true; 159 } 160 161 protected boolean positionAfterDate(LogManager manager, String group, String name, long timestamp) { 162 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 163 boolean movedOffset = false; 164 for (int partition = 0; partition < manager.getAppender(name).size(); partition++) { 165 LogPartition logPartition = new LogPartition(name, partition); 166 LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp); 167 if (logOffset == null) { 168 System.err.println(String.format("# Could not find an offset for group: %s, partition: %s", group, 169 logPartition)); 170 continue; 171 } 172 tailer.seek(logOffset); 173 movedOffset = true; 174 System.out.println( 175 String.format("# Set log %s, group: %s, to offset %s", name, group, logOffset.offset())); 176 } 177 if (movedOffset) { 178 tailer.commit(); 179 return true; 180 } 181 } 182 System.err.println("No offset found for the specified date"); 183 return false; 184 } 185 186 protected boolean positionToWatermark(LogManager manager, String group, String name, long timestamp) 187 throws InterruptedException { 188 String newGroup = "tools"; 189 int size = manager.getAppender(name).size(); 190 List<LogOffset> offsets = new ArrayList<>(size); 191 List<LogLag> lags = manager.getLagPerPartition(name, newGroup); 192 int partition = 0; 193 // find the offsets first 194 for (LogLag lag : lags) { 195 if (lag.lag() == 0) { 196 // empty partition nothing to do 197 offsets.add(null); 198 } else { 199 try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, partition))) { 200 offsets.add(searchWatermarkOffset(tailer, timestamp)); 201 } 202 } 203 partition++; 204 } 205 if (offsets.stream().noneMatch(Objects::nonNull)) { 206 if (LogLag.of(lags).upper() == 0) { 207 System.err.println("No offsets found because log is empty"); 208 return false; 209 } 210 System.err.println("Timestamp: " + timestamp + " is earlier as any records, resetting positions"); 211 return reset(manager, group, name); 212 } 213 try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) { 214 offsets.stream().filter(Objects::nonNull).forEach(tailer::seek); 215 tailer.commit(); 216 offsets.stream().filter(Objects::nonNull).forEach( 217 offset -> System.out.println("# Moving consumer to: " + offset)); 218 } 219 return true; 220 } 221 222 protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException { 223 LogOffset lastOffset = null; 224 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(READ_TIMEOUT)) { 225 long recTimestamp = Watermark.ofValue(rec.message().watermark).getTimestamp(); 226 if (recTimestamp == timestamp) { 227 return rec.offset(); 228 } else if (recTimestamp > timestamp) { 229 return lastOffset; 230 } 231 if (recTimestamp == 0) { 232 throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec); 233 } 234 lastOffset = rec.offset(); 235 } 236 // not found returns last offset of partition 237 return lastOffset; 238 } 239 240}