001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import static org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation.decodeKey;
022import static org.nuxeo.lib.stream.tools.command.PositionCommand.FIRST_READ_TIMEOUT;
023import static org.nuxeo.lib.stream.tools.command.PositionCommand.READ_TIMEOUT;
024import static org.nuxeo.lib.stream.tools.command.PositionCommand.getTimestampFromDate;
025import static org.nuxeo.lib.stream.tools.command.TrackerCommand.ALL_LOGS;
026import static org.nuxeo.lib.stream.tools.command.TrackerCommand.DEFAULT_LATENCIES_LOG;
027import static org.nuxeo.lib.stream.tools.command.TrackerCommand.INTERNAL_LOG_PREFIX;
028
029import java.io.UnsupportedEncodingException;
030import java.time.Instant;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.stream.Collectors;
036
037import org.apache.commons.cli.CommandLine;
038import org.apache.commons.cli.Option;
039import org.apache.commons.cli.Options;
040import org.jetbrains.annotations.NotNull;
041import org.nuxeo.lib.stream.computation.Record;
042import org.nuxeo.lib.stream.computation.Watermark;
043import org.nuxeo.lib.stream.log.Latency;
044import org.nuxeo.lib.stream.log.LogManager;
045import org.nuxeo.lib.stream.log.LogOffset;
046import org.nuxeo.lib.stream.log.LogRecord;
047import org.nuxeo.lib.stream.log.LogTailer;
048import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
049
050/**
051 * @since 10.1
052 */
053public class RestoreCommand extends Command {
054
055    protected static final String NAME = "restore";
056
057    protected static final String GROUP = "tools";
058
059    protected boolean verbose = false;
060
061    protected String input;
062
063    protected List<String> logNames;
064
065    protected long date;
066
067    protected boolean dryRun;
068
069    @Override
070    public String name() {
071        return NAME;
072    }
073
074    @Override
075    public void updateOptions(Options options) {
076        options.addOption(Option.builder("l")
077                                .longOpt("log-name")
078                                .desc("Restore consumers positions for this LOG, must be a computation Record, "
079                                        + "can be a comma separated list of log names or ALL")
080                                .required()
081                                .hasArg()
082                                .argName("LOG_NAME")
083                                .build());
084        options.addOption(Option.builder("i")
085                                .longOpt("log-input")
086                                .desc("Log name of the input default to " + DEFAULT_LATENCIES_LOG)
087                                .hasArg()
088                                .argName("LOG_OUTPUT")
089                                .build());
090        options.addOption(Option.builder()
091                                .longOpt("to-date")
092                                .desc("Sets the committed positions as they where at a specific date."
093                                        + " The date is specified in ISO-8601 format, eg. " + Instant.now())
094                                .hasArg()
095                                .argName("DATE")
096                                .build());
097        options.addOption(Option.builder().longOpt("verbose").build());
098        options.addOption(Option.builder().longOpt("dry-run").desc("Do not change any position").build());
099    }
100
101    @Override
102    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
103        logNames = getLogNames(manager, cmd.getOptionValue("log-name"));
104        input = cmd.getOptionValue("log-input");
105        date = getTimestampFromDate(cmd.getOptionValue("to-date"));
106        verbose = cmd.hasOption("verbose");
107        dryRun = cmd.hasOption("dry-run");
108
109        return restorePosition(manager);
110    }
111
112    protected boolean restorePosition(LogManager manager) throws InterruptedException {
113        Map<LogPartitionGroup, Latency> latencies = readLatencies(manager);
114        Map<LogPartitionGroup, LogOffset> offsets = searchOffsets(manager, latencies);
115        if (dryRun) {
116            System.out.println("# Dry run mode returning without doing any changes");
117            return true;
118        }
119        updatePositions(manager, offsets);
120        return true;
121    }
122
123    protected void updatePositions(LogManager manager, Map<LogPartitionGroup, LogOffset> offsets) {
124        offsets.forEach((key, offset) -> updatePosition(manager, key, offset));
125    }
126
127    protected void updatePosition(LogManager manager, LogPartitionGroup key, LogOffset offset) {
128        if (offset == null) {
129            return;
130        }
131        System.out.println("# Commit : " + key);
132        try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition())) {
133            tailer.seek(offset);
134            tailer.commit();
135        }
136    }
137
138    protected Map<LogPartitionGroup, LogOffset> searchOffsets(LogManager manager,
139            Map<LogPartitionGroup, Latency> latencies) throws InterruptedException {
140        Map<LogPartitionGroup, LogOffset> ret = new HashMap<>(latencies.size());
141        System.out.println("# Searching offsets matching the latencies");
142        for (LogPartitionGroup key : latencies.keySet()) {
143            ret.put(key, findOffset(manager, key, latencies.get(key)));
144        }
145        return ret;
146    }
147
148    protected LogOffset findOffset(LogManager manager, LogPartitionGroup key, Latency latency)
149            throws InterruptedException {
150        long targetWatermark = latency.lower();
151        String targetKey = latency.key();
152        try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition())) {
153            for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(
154                    READ_TIMEOUT)) {
155                if (targetKey != null && !targetKey.equals(rec.message().key)) {
156                    continue;
157                }
158                long timestamp = Watermark.ofValue(rec.message().watermark).getTimestamp();
159                if (targetWatermark == timestamp) {
160                    System.out.println("Offset found: " + key + ": " + rec.offset());
161                    return rec.offset().nextOffset();
162                }
163            }
164        }
165        System.err.println("No offset found for: " + key);
166        return null;
167    }
168
169    @NotNull
170    private Map<LogPartitionGroup, Latency> readLatencies(LogManager manager) throws InterruptedException {
171        Map<LogPartitionGroup, Latency> latencies = new HashMap<>();
172        System.out.println("# Reading latencies from: " + input + " ...");
173        try (LogTailer<Record> tailer = manager.createTailer(GROUP, input)) {
174            for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(
175                    READ_TIMEOUT)) {
176                long timestamp = Watermark.ofValue(rec.message().watermark).getTimestamp();
177                if (date > 0 && timestamp > date) {
178                    continue;
179                }
180                LogPartitionGroup key = decodeKey(rec.message().key);
181                if (!logNames.contains(key.name)) {
182                    continue;
183                }
184                Latency latency = decodeLatency(rec.message().data);
185                if (latency != null) {
186                    latencies.put(key, latency);
187                }
188            }
189        }
190        System.out.println("# Latencies found: ");
191        latencies.forEach((key, latency) -> System.out.println(String.format("%s: %s", key, latency)));
192        return latencies;
193    }
194
195    protected Latency decodeLatency(byte[] data) {
196        try {
197            return Latency.fromJson(new String(data, "UTF-8"));
198        } catch (UnsupportedEncodingException e) {
199            System.err.println("Cannot decode message" + e.getMessage() + " " + Arrays.toString(data));
200        }
201        return null;
202    }
203
204    protected List<String> getLogNames(LogManager manager, String names) {
205        if (ALL_LOGS.equals(names.toLowerCase())) {
206            return manager.listAll().stream().filter(name -> !name.startsWith(INTERNAL_LOG_PREFIX)).collect(
207                    Collectors.toList());
208        }
209        List<String> ret = Arrays.asList(names.split(","));
210        for (String name : ret) {
211            if (!manager.exists(name)) {
212                throw new IllegalArgumentException("Unknown log name: " + name);
213            }
214        }
215        return ret;
216    }
217
218}