001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Gethin James
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.Externalizable;
022import java.time.DateTimeException;
023import java.time.Duration;
024import java.time.Instant;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Objects;
028
029import org.apache.commons.cli.CommandLine;
030import org.apache.commons.cli.Option;
031import org.apache.commons.cli.Options;
032import org.nuxeo.lib.stream.computation.Record;
033import org.nuxeo.lib.stream.computation.Watermark;
034import org.nuxeo.lib.stream.log.LogLag;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.LogOffset;
037import org.nuxeo.lib.stream.log.LogPartition;
038import org.nuxeo.lib.stream.log.LogRecord;
039import org.nuxeo.lib.stream.log.LogTailer;
040
041/**
042 * Manipulates the consumer position to the beginning, end or a specific timestamp
043 *
044 * @since 10.1
045 */
046public class PositionCommand extends Command {
047
048    public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000);
049
050    public static final Duration READ_TIMEOUT = Duration.ofMillis(100);
051
052    protected static final String NAME = "position";
053
054    protected static long getTimestampFromDate(String dateIso8601) {
055        if (dateIso8601 == null || dateIso8601.isEmpty()) {
056            return -1;
057        }
058        try {
059            Instant instant = Instant.parse(dateIso8601);
060            return instant.toEpochMilli();
061        } catch (DateTimeException e) {
062            System.err.println("Failed to read the timeout: " + e.getMessage());
063            System.err.println("The timestamp should be in ISO-8601 format, eg. " + Instant.now());
064        }
065        return -1;
066    }
067
068    @Override
069    public String name() {
070        return NAME;
071    }
072
073    @Override
074    public void updateOptions(Options options) {
075        options.addOption(Option.builder("l")
076                                .longOpt("log-name")
077                                .desc("Log name")
078                                .required()
079                                .hasArg()
080                                .argName("LOG_NAME")
081                                .build());
082        options.addOption(
083                Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
084        options.addOption(
085                Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build());
086        options.addOption(Option.builder()
087                                .longOpt("to-end")
088                                .desc("Sets the committed positions to the end of partitions for the group")
089                                .build());
090        options.addOption(
091                Option.builder()
092                      .longOpt("after-date")
093                      .desc("Sets the committed positions for the group to a specific date."
094                              + " The date used to find the offset depends on the implementation, for Kafka this is the"
095                              + " LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date."
096                              + " The date is specified in ISO-8601 format, eg. " + Instant.now()
097                              + ". If no record offset is found with an appropriate timestamp then the command fails.")
098                      .hasArg()
099                      .argName("DATE")
100                      .build());
101        options.addOption(
102                Option.builder()
103                      .longOpt("to-watermark")
104                      .desc("Sets the committed positions for the group to a specific date."
105                              + " The date used to find the offset is contained in a record watermark. "
106                              + " This means that the LOG_NAME is expected to be a computation stream with records with populated watermark."
107                              + " The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\""
108                              + " The date is specified in ISO-8601 format, eg. " + Instant.now()
109                              + ". If no record offset is found with an appropriate timestamp then the command fails.")
110                      .hasArg()
111                      .argName("DATE")
112                      .build());
113    }
114
115    @Override
116    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
117        String name = cmd.getOptionValue("log-name");
118        String group = cmd.getOptionValue("group", "tools");
119
120        if (cmd.hasOption("after-date")) {
121            long timestamp = getTimestampFromDate(cmd.getOptionValue("after-date"));
122            if (timestamp >= 0) {
123                return positionAfterDate(manager, group, name, timestamp);
124            }
125        } else if (cmd.hasOption("to-watermark")) {
126            long timestamp = getTimestampFromDate(cmd.getOptionValue("to-watermark"));
127            if (timestamp >= 0) {
128                return positionToWatermark(manager, group, name, timestamp);
129            }
130        } else if (cmd.hasOption("to-end")) {
131            return toEnd(manager, group, name);
132        } else if (cmd.hasOption("reset")) {
133            return reset(manager, group, name);
134        } else {
135            System.err.println("Invalid option, try 'help position'");
136        }
137        return false;
138    }
139
140    protected boolean toEnd(LogManager manager, String group, String name) {
141        LogLag lag = manager.getLag(name, group);
142        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
143            tailer.toEnd();
144            tailer.commit();
145        }
146        System.out.println(
147                String.format("# Moved log %s, group: %s, from: %s to %s", name, group, lag.lower(), lag.upper()));
148        return true;
149    }
150
151    protected boolean reset(LogManager manager, String group, String name) {
152        LogLag lag = manager.getLag(name, group);
153        long pos = lag.lower();
154        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
155            tailer.reset();
156        }
157        System.out.println(String.format("# Reset log %s, group: %s, from: %s to 0", name, group, pos));
158        return true;
159    }
160
161    protected boolean positionAfterDate(LogManager manager, String group, String name, long timestamp) {
162        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
163            boolean movedOffset = false;
164            for (int partition = 0; partition < manager.getAppender(name).size(); partition++) {
165                LogPartition logPartition = new LogPartition(name, partition);
166                LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp);
167                if (logOffset == null) {
168                    System.err.println(String.format("# Could not find an offset for group: %s, partition: %s", group,
169                            logPartition));
170                    continue;
171                }
172                tailer.seek(logOffset);
173                movedOffset = true;
174                System.out.println(
175                        String.format("# Set log %s, group: %s, to offset %s", name, group, logOffset.offset()));
176            }
177            if (movedOffset) {
178                tailer.commit();
179                return true;
180            }
181        }
182        System.err.println("No offset found for the specified date");
183        return false;
184    }
185
186    protected boolean positionToWatermark(LogManager manager, String group, String name, long timestamp)
187            throws InterruptedException {
188        String newGroup = "tools";
189        int size = manager.getAppender(name).size();
190        List<LogOffset> offsets = new ArrayList<>(size);
191        List<LogLag> lags = manager.getLagPerPartition(name, newGroup);
192        int partition = 0;
193        // find the offsets first
194        for (LogLag lag : lags) {
195            if (lag.lag() == 0) {
196                // empty partition nothing to do
197                offsets.add(null);
198            } else {
199                try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, partition))) {
200                    offsets.add(searchWatermarkOffset(tailer, timestamp));
201                }
202            }
203            partition++;
204        }
205        if (offsets.stream().noneMatch(Objects::nonNull)) {
206            if (LogLag.of(lags).upper() == 0) {
207                System.err.println("No offsets found because log is empty");
208                return false;
209            }
210            System.err.println("Timestamp: " + timestamp + " is earlier as any records, resetting positions");
211            return reset(manager, group, name);
212        }
213        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
214            offsets.stream().filter(Objects::nonNull).forEach(tailer::seek);
215            tailer.commit();
216            offsets.stream().filter(Objects::nonNull).forEach(
217                    offset -> System.out.println("# Moving consumer to: " + offset));
218        }
219        return true;
220    }
221
222    protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException {
223        LogOffset lastOffset = null;
224        for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(READ_TIMEOUT)) {
225            long recTimestamp = Watermark.ofValue(rec.message().watermark).getTimestamp();
226            if (recTimestamp == timestamp) {
227                return rec.offset();
228            } else if (recTimestamp > timestamp) {
229                return lastOffset;
230            }
231            if (recTimestamp == 0) {
232                throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec);
233            }
234            lastOffset = rec.offset();
235        }
236        // not found returns last offset of partition
237        return lastOffset;
238    }
239
240}