001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import static org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation.decodeKey;
022import static org.nuxeo.lib.stream.tools.command.PositionCommand.FIRST_READ_TIMEOUT;
023import static org.nuxeo.lib.stream.tools.command.PositionCommand.READ_TIMEOUT;
024import static org.nuxeo.lib.stream.tools.command.PositionCommand.getTimestampFromDate;
025import static org.nuxeo.lib.stream.tools.command.TrackerCommand.ALL_LOGS;
026import static org.nuxeo.lib.stream.tools.command.TrackerCommand.DEFAULT_LATENCIES_LOG;
027
028import java.nio.charset.StandardCharsets;
029import java.time.Instant;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.stream.Collectors;
035
036import org.apache.commons.cli.CommandLine;
037import org.apache.commons.cli.Option;
038import org.apache.commons.cli.Options;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.lib.stream.computation.Record;
042import org.nuxeo.lib.stream.computation.Watermark;
043import org.nuxeo.lib.stream.log.Latency;
044import org.nuxeo.lib.stream.log.LogManager;
045import org.nuxeo.lib.stream.log.LogOffset;
046import org.nuxeo.lib.stream.log.LogRecord;
047import org.nuxeo.lib.stream.log.LogTailer;
048import org.nuxeo.lib.stream.log.Name;
049import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
050
051/**
052 * Restore consumer positions using the latency tracker Log.
053 *
054 * @since 10.1
055 */
056public class RestoreCommand extends Command {
057    private static final Log log = LogFactory.getLog(RestoreCommand.class);
058
059    protected static final String NAME = "restore";
060
061    protected static final Name GROUP = Name.ofUrn("admin/tools");
062
063    protected boolean verbose = false;
064
065    protected Name input;
066
067    protected List<Name> logNames;
068
069    protected long date;
070
071    protected boolean dryRun;
072
073    protected String codec;
074
075    @Override
076    public String name() {
077        return NAME;
078    }
079
080    @Override
081    public void updateOptions(Options options) {
082        options.addOption(Option.builder("l")
083                                .longOpt("log-name")
084                                .desc("Restore consumers positions for this LOG, must be a computation Record, "
085                                        + "can be a comma separated list of log names or ALL")
086                                .required()
087                                .hasArg()
088                                .argName("LOG_NAME")
089                                .build());
090        options.addOption(Option.builder("i")
091                                .longOpt("log-input")
092                                .desc("Log name of the input default to " + DEFAULT_LATENCIES_LOG)
093                                .hasArg()
094                                .argName("LOG_INPUT")
095                                .build());
096        options.addOption(Option.builder()
097                                .longOpt("to-date")
098                                .desc("Sets the committed positions as they where at a specific date."
099                                        + " The date is specified in ISO-8601 format, eg. " + Instant.now())
100                                .hasArg()
101                                .argName("DATE")
102                                .build());
103        options.addOption(Option.builder()
104                                .longOpt("codec")
105                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
106                                .hasArg()
107                                .argName("CODEC")
108                                .build());
109        options.addOption(Option.builder().longOpt("verbose").build());
110        options.addOption(Option.builder().longOpt("dry-run").desc("Do not change any position").build());
111    }
112
113    @Override
114    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
115        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
116        input = Name.ofUrn(cmd.getOptionValue("log-input", DEFAULT_LATENCIES_LOG));
117        date = getTimestampFromDate(cmd.getOptionValue("to-date"));
118        verbose = cmd.hasOption("verbose");
119        dryRun = cmd.hasOption("dry-run");
120        codec = cmd.getOptionValue("codec");
121        return restorePosition(manager);
122    }
123
124    protected boolean restorePosition(LogManager manager) throws InterruptedException {
125        Map<LogPartitionGroup, Latency> latencies = readLatencies(manager);
126        Map<LogPartitionGroup, LogOffset> offsets = searchOffsets(manager, latencies);
127        if (dryRun) {
128            log.info("# Dry run mode returning without doing any changes");
129            return true;
130        }
131        updatePositions(manager, offsets);
132        return true;
133    }
134
135    protected void updatePositions(LogManager manager, Map<LogPartitionGroup, LogOffset> offsets) {
136        log.info("# Update positions");
137        offsets.forEach((key, offset) -> updatePosition(manager, key, offset));
138    }
139
140    protected void updatePosition(LogManager manager, LogPartitionGroup key, LogOffset offset) {
141        if (offset == null) {
142            return;
143        }
144        log.info(key + " new position: " + offset);
145        try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition(), getRecordCodec(codec))) {
146            tailer.seek(offset);
147            tailer.commit();
148        }
149    }
150
151    protected Map<LogPartitionGroup, LogOffset> searchOffsets(LogManager manager,
152            Map<LogPartitionGroup, Latency> latencies) throws InterruptedException {
153        Map<LogPartitionGroup, LogOffset> ret = new HashMap<>(latencies.size());
154        log.info("# Searching records matching the latencies lower timestamp and key");
155        for (Map.Entry<LogPartitionGroup, Latency> entry : latencies.entrySet()) {
156            ret.put(entry.getKey(), findOffset(manager, entry.getKey(), entry.getValue()));
157        }
158        return ret;
159    }
160
161    protected LogOffset findOffset(LogManager manager, LogPartitionGroup key, Latency latency)
162            throws InterruptedException {
163        long targetWatermark = latency.lower();
164        String targetKey = latency.key();
165        try (LogTailer<Record> tailer = manager.createTailer(GROUP, key.getLogPartition(), getRecordCodec(codec))) {
166            for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(
167                    READ_TIMEOUT)) {
168                if (targetKey != null && !targetKey.equals(rec.message().getKey())) {
169                    continue;
170                }
171                long timestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp();
172                if (targetWatermark == timestamp) {
173                    log.info(String.format("%s: offset: %s wm: %d key: %s", key, rec.offset(),
174                            rec.message().getWatermark(), rec.message().getKey()));
175                    return rec.offset().nextOffset();
176                }
177            }
178        }
179        log.error("No offset found for: " + key + ", matching: " + latency.asJson());
180        return null;
181    }
182
183    protected Map<LogPartitionGroup, Latency> readLatencies(LogManager manager) throws InterruptedException {
184        Map<LogPartitionGroup, Latency> latencies = new HashMap<>();
185        log.info("# Reading latencies log: " + input + ", searching for the higher timestamp <= " + date);
186        try (LogTailer<Record> tailer = manager.createTailer(GROUP, input, getRecordCodec(codec))) {
187            for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(
188                    READ_TIMEOUT)) {
189                long timestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp();
190                if (date > 0 && timestamp > date) {
191                    continue;
192                }
193                LogPartitionGroup key = decodeKey(rec.message().getKey());
194                if (!logNames.contains(key.name)) {
195                    continue;
196                }
197                Latency latency = decodeLatency(rec.message().getData());
198                if (latency != null && latency.lower() > 0) {
199                    // we don't want latency.lower = 0, this means either no record either records with unset watermark
200                    latencies.put(key, latency);
201                }
202            }
203        }
204        log.info("# Latencies found (group:log:partition -> lat)");
205        latencies.forEach((key, latency) -> log.info(String.format("%s: %s", key, latency.asJson())));
206        return latencies;
207    }
208
209    protected Latency decodeLatency(byte[] data) {
210        return Latency.fromJson(new String(data, StandardCharsets.UTF_8));
211    }
212
213    protected List<Name> getLogNames(LogManager manager, String names) {
214        if (ALL_LOGS.equalsIgnoreCase(names)) {
215            return manager.listAllNames();
216        }
217        List<Name> ret = Arrays.stream(names.split(",")).map(Name::ofUrn).collect(Collectors.toList());
218        for (Name name : ret) {
219            if (!manager.exists(name)) {
220                throw new IllegalArgumentException("Unknown log name: " + name);
221            }
222        }
223        return ret;
224    }
225
226}