001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import static org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation.decodeKey; 022import static org.nuxeo.lib.stream.tools.command.PositionCommand.FIRST_READ_TIMEOUT; 023import static org.nuxeo.lib.stream.tools.command.PositionCommand.READ_TIMEOUT; 024import static org.nuxeo.lib.stream.tools.command.PositionCommand.getTimestampFromDate; 025import static org.nuxeo.lib.stream.tools.command.TrackerCommand.ALL_LOGS; 026import static org.nuxeo.lib.stream.tools.command.TrackerCommand.DEFAULT_LATENCIES_LOG; 027import static org.nuxeo.lib.stream.tools.command.TrackerCommand.INTERNAL_LOG_PREFIX; 028 029import java.io.UnsupportedEncodingException; 030import java.time.Instant; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.stream.Collectors; 036 037import org.apache.commons.cli.CommandLine; 038import org.apache.commons.cli.Option; 039import org.apache.commons.cli.Options; 040import org.jetbrains.annotations.NotNull; 041import org.nuxeo.lib.stream.computation.Record; 042import org.nuxeo.lib.stream.computation.Watermark; 043import org.nuxeo.lib.stream.log.Latency; 044import org.nuxeo.lib.stream.log.LogManager; 045import org.nuxeo.lib.stream.log.LogOffset; 046import org.nuxeo.lib.stream.log.LogRecord; 047import org.nuxeo.lib.stream.log.LogTailer; 048import org.nuxeo.lib.stream.log.internals.LogPartitionGroup; 049 050/** 051 * @since 10.1 052 */ 053public class RestoreCommand extends Command { 054 055 protected static final String NAME = "restore"; 056 057 protected static final String GROUP = "tools"; 058 059 protected boolean verbose = false; 060 061 protected String input; 062 063 protected List<String> logNames; 064 065 protected long date; 066 067 protected boolean dryRun; 068 069 @Override 070 public String name() { 071 return NAME; 072 } 073 074 @Override 075 public void updateOptions(Options options) { 076 options.addOption(Option.builder("l") 077 .longOpt("log-name") 078 .desc("Restore consumers positions for this LOG, must be a computation Record, " 079 + "can be a comma separated list of log names or ALL") 080 .required() 081 .hasArg() 082 .argName("LOG_NAME") 083 .build()); 084 options.addOption(Option.builder("i") 085 .longOpt("log-input") 086 .desc("Log name of the input default to " + DEFAULT_LATENCIES_LOG) 087 .hasArg() 088 .argName("LOG_OUTPUT") 089 .build()); 090 options.addOption(Option.builder() 091 .longOpt("to-date") 092 .desc("Sets the committed positions as they where at a specific date." 093 + " The date is specified in ISO-8601 format, eg. " + Instant.now()) 094 .hasArg() 095 .argName("DATE") 096 .build()); 097 options.addOption(Option.builder().longOpt("verbose").build()); 098 options.addOption(Option.builder().longOpt("dry-run").desc("Do not change any position").build()); 099 } 100 101 @Override 102 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 103 logNames = getLogNames(manager, cmd.getOptionValue("log-name")); 104 input = cmd.getOptionValue("log-input"); 105 date = getTimestampFromDate(cmd.getOptionValue("to-date")); 106 verbose = cmd.hasOption("verbose"); 107 dryRun = cmd.hasOption("dry-run"); 108 109 return restorePosition(manager); 110 } 111 112 protected boolean restorePosition(LogManager manager) throws InterruptedException { 113 Map<LogPartitionGroup, Latency> latencies = readLatencies(manager); 114 Map<LogPartitionGroup, LogOffset> offsets = searchOffsets(manager, latencies); 115 if (dryRun) { 116 System.out.println("# Dry run mode returning without doing any changes"); 117 return true; 118 } 119 updatePositions(manager, offsets); 120 return true; 121 } 122 123 protected void updatePositions(LogManager manager, Map<LogPartitionGroup, LogOffset> offsets) { 124 offsets.forEach((key, offset) -> updatePosition(manager, key, offset)); 125 } 126 127 protected void updatePosition(LogManager manager, LogPartitionGroup key, LogOffset offset) { 128 if (offset == null) { 129 return; 130 } 131 System.out.println("# Commit : " + key); 132 try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition())) { 133 tailer.seek(offset); 134 tailer.commit(); 135 } 136 } 137 138 protected Map<LogPartitionGroup, LogOffset> searchOffsets(LogManager manager, 139 Map<LogPartitionGroup, Latency> latencies) throws InterruptedException { 140 Map<LogPartitionGroup, LogOffset> ret = new HashMap<>(latencies.size()); 141 System.out.println("# Searching offsets matching the latencies"); 142 for (LogPartitionGroup key : latencies.keySet()) { 143 ret.put(key, findOffset(manager, key, latencies.get(key))); 144 } 145 return ret; 146 } 147 148 protected LogOffset findOffset(LogManager manager, LogPartitionGroup key, Latency latency) 149 throws InterruptedException { 150 long targetWatermark = latency.lower(); 151 String targetKey = latency.key(); 152 try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition())) { 153 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read( 154 READ_TIMEOUT)) { 155 if (targetKey != null && !targetKey.equals(rec.message().key)) { 156 continue; 157 } 158 long timestamp = Watermark.ofValue(rec.message().watermark).getTimestamp(); 159 if (targetWatermark == timestamp) { 160 System.out.println("Offset found: " + key + ": " + rec.offset()); 161 return rec.offset().nextOffset(); 162 } 163 } 164 } 165 System.err.println("No offset found for: " + key); 166 return null; 167 } 168 169 @NotNull 170 private Map<LogPartitionGroup, Latency> readLatencies(LogManager manager) throws InterruptedException { 171 Map<LogPartitionGroup, Latency> latencies = new HashMap<>(); 172 System.out.println("# Reading latencies from: " + input + " ..."); 173 try (LogTailer<Record> tailer = manager.createTailer(GROUP, input)) { 174 for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read( 175 READ_TIMEOUT)) { 176 long timestamp = Watermark.ofValue(rec.message().watermark).getTimestamp(); 177 if (date > 0 && timestamp > date) { 178 continue; 179 } 180 LogPartitionGroup key = decodeKey(rec.message().key); 181 if (!logNames.contains(key.name)) { 182 continue; 183 } 184 Latency latency = decodeLatency(rec.message().data); 185 if (latency != null) { 186 latencies.put(key, latency); 187 } 188 } 189 } 190 System.out.println("# Latencies found: "); 191 latencies.forEach((key, latency) -> System.out.println(String.format("%s: %s", key, latency))); 192 return latencies; 193 } 194 195 protected Latency decodeLatency(byte[] data) { 196 try { 197 return Latency.fromJson(new String(data, "UTF-8")); 198 } catch (UnsupportedEncodingException e) { 199 System.err.println("Cannot decode message" + e.getMessage() + " " + Arrays.toString(data)); 200 } 201 return null; 202 } 203 204 protected List<String> getLogNames(LogManager manager, String names) { 205 if (ALL_LOGS.equals(names.toLowerCase())) { 206 return manager.listAll().stream().filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX)).collect( 207 Collectors.toList()); 208 } 209 List<String> ret = Arrays.asList(names.split(",")); 210 for (String name : ret) { 211 if (!manager.exists(name)) { 212 throw new IllegalArgumentException("Unknown log name: " + name); 213 } 214 } 215 return ret; 216 } 217 218}