001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import static org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation.decodeKey; 022import static org.nuxeo.lib.stream.tools.command.PositionCommand.FIRST_READ_TIMEOUT; 023import static org.nuxeo.lib.stream.tools.command.PositionCommand.READ_TIMEOUT; 024import static org.nuxeo.lib.stream.tools.command.PositionCommand.getTimestampFromDate; 025import static org.nuxeo.lib.stream.tools.command.TrackerCommand.ALL_LOGS; 026import static org.nuxeo.lib.stream.tools.command.TrackerCommand.DEFAULT_LATENCIES_LOG; 027 028import java.nio.charset.StandardCharsets; 029import java.time.Instant; 030import java.util.Arrays; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.stream.Collectors; 035 036import org.apache.commons.cli.CommandLine; 037import org.apache.commons.cli.Option; 038import org.apache.commons.cli.Options; 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.lib.stream.computation.Record; 042import org.nuxeo.lib.stream.computation.Watermark; 043import org.nuxeo.lib.stream.log.Latency; 044import org.nuxeo.lib.stream.log.LogManager; 045import org.nuxeo.lib.stream.log.LogOffset; 046import org.nuxeo.lib.stream.log.LogRecord; 047import org.nuxeo.lib.stream.log.LogTailer; 048import org.nuxeo.lib.stream.log.Name; 049import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 050 051/** 052 * Restore consumer positions using the latency tracker Log. 053 * 054 * @since 10.1 055 */ 056public class RestoreCommand extends Command { 057 private static final Log log = LogFactory.getLog(RestoreCommand.class); 058 059 protected static final String NAME = "restore"; 060 061 protected static final Name GROUP = Name.ofUrn("admin/tools"); 062 063 protected boolean verbose = false; 064 065 protected Name input; 066 067 protected List<Name> logNames; 068 069 protected long date; 070 071 protected boolean dryRun; 072 073 protected String codec; 074 075 @Override 076 public String name() { 077 return NAME; 078 } 079 080 @Override 081 public void updateOptions(Options options) { 082 options.addOption(Option.builder("l") 083 .longOpt("log-name") 084 .desc("Restore consumers positions for this LOG, must be a computation Record, " 085 + "can be a comma separated list of log names or ALL") 086 .required() 087 .hasArg() 088 .argName("LOG_NAME") 089 .build()); 090 options.addOption(Option.builder("i") 091 .longOpt("log-input") 092 .desc("Log name of the input default to " + DEFAULT_LATENCIES_LOG) 093 .hasArg() 094 .argName("LOG_INPUT") 095 .build()); 096 options.addOption(Option.builder() 097 .longOpt("to-date") 098 .desc("Sets the committed positions as they where at a specific date." 099 + " The date is specified in ISO-8601 format, eg. " + Instant.now()) 100 .hasArg() 101 .argName("DATE") 102 .build()); 103 options.addOption(Option.builder() 104 .longOpt("codec") 105 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 106 .hasArg() 107 .argName("CODEC") 108 .build()); 109 options.addOption(Option.builder().longOpt("verbose").build()); 110 options.addOption(Option.builder().longOpt("dry-run").desc("Do not change any position").build()); 111 } 112 113 @Override 114 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 115 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 116 input = Name.ofUrn(cmd.getOptionValue("log-input", DEFAULT_LATENCIES_LOG)); 117 date = getTimestampFromDate(cmd.getOptionValue("to-date")); 118 verbose = cmd.hasOption("verbose"); 119 dryRun = cmd.hasOption("dry-run"); 120 codec = cmd.getOptionValue("codec"); 121 return restorePosition(manager); 122 } 123 124 protected boolean restorePosition(LogManager manager) throws InterruptedException { 125 Map<LogPartitionGroup, Latency> latencies = readLatencies(manager); 126 Map<LogPartitionGroup, LogOffset> offsets = searchOffsets(manager, latencies); 127 if (dryRun) { 128 log.info("# Dry run mode returning without doing any changes"); 129 return true; 130 } 131 updatePositions(manager, offsets); 132 return true; 133 } 134 135 protected void updatePositions(LogManager manager, Map<LogPartitionGroup, LogOffset> offsets) { 136 log.info("# Update positions"); 137 offsets.forEach((key, offset) -> updatePosition(manager, key, offset)); 138 } 139 140 protected void updatePosition(LogManager manager, LogPartitionGroup key, LogOffset offset) { 141 if (offset == null) { 142 return; 143 } 144 log.info(key + " new position: " + offset); 145 try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition(), getRecordCodec(codec))) { 146 tailer.seek(offset); 147 tailer.commit(); 148 } 149 } 150 151 protected Map<LogPartitionGroup, LogOffset> searchOffsets(LogManager manager, 152 Map<LogPartitionGroup, Latency> latencies) throws InterruptedException { 153 Map<LogPartitionGroup, LogOffset> ret = new HashMap<>(latencies.size()); 154 log.info("# Searching records matching the latencies lower timestamp and key"); 155 for (Map.Entry<LogPartitionGroup, Latency> entry : latencies.entrySet()) { 156 ret.put(entry.getKey(), findOffset(manager, entry.getKey(), entry.getValue())); 157 } 158 return ret; 159 } 160 161 protected LogOffset findOffset(LogManager manager, LogPartitionGroup key, Latency latency) 162 throws InterruptedException { 163 long targetWatermark = latency.lower(); 164 String targetKey = latency.key(); 165 try (LogTailer<Record> tailer = manager.createTailer(GROUP, key.getLogPartition(), getRecordCodec(codec))) { 166 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read( 167 READ_TIMEOUT)) { 168 if (targetKey != null && !targetKey.equals(rec.message().getKey())) { 169 continue; 170 } 171 long timestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp(); 172 if (targetWatermark == timestamp) { 173 log.info(String.format("%s: offset: %s wm: %d key: %s", key, rec.offset(), 174 rec.message().getWatermark(), rec.message().getKey())); 175 return rec.offset().nextOffset(); 176 } 177 } 178 } 179 log.error("No offset found for: " + key + ", matching: " + latency.asJson()); 180 return null; 181 } 182 183 protected Map<LogPartitionGroup, Latency> readLatencies(LogManager manager) throws InterruptedException { 184 Map<LogPartitionGroup, Latency> latencies = new HashMap<>(); 185 log.info("# Reading latencies log: " + input + ", searching for the higher timestamp <= " + date); 186 try (LogTailer<Record> tailer = manager.createTailer(GROUP, input, getRecordCodec(codec))) { 187 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read( 188 READ_TIMEOUT)) { 189 long timestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp(); 190 if (date > 0 && timestamp > date) { 191 continue; 192 } 193 LogPartitionGroup key = decodeKey(rec.message().getKey()); 194 if (!logNames.contains(key.name)) { 195 continue; 196 } 197 Latency latency = decodeLatency(rec.message().getData()); 198 if (latency != null && latency.lower() > 0) { 199 // we don't want latency.lower = 0, this means either no record either records with unset watermark 200 latencies.put(key, latency); 201 } 202 } 203 } 204 log.info("# Latencies found (group:log:partition -> lat)"); 205 latencies.forEach((key, latency) -> log.info(String.format("%s: %s", key, latency.asJson()))); 206 return latencies; 207 } 208 209 protected Latency decodeLatency(byte[] data) { 210 return Latency.fromJson(new String(data, StandardCharsets.UTF_8)); 211 } 212 213 protected List<Name> getLogNames(LogManager manager, String names) { 214 if (ALL_LOGS.equalsIgnoreCase(names)) { 215 return manager.listAllNames(); 216 } 217 List<Name> ret = Arrays.stream(names.split(",")).map(Name::ofUrn).collect(Collectors.toList()); 218 for (Name name : ret) { 219 if (!manager.exists(name)) { 220 throw new IllegalArgumentException("Unknown log name: " + name); 221 } 222 } 223 return ret; 224 } 225 226}