001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Gethin James
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.Externalizable;
022import java.time.DateTimeException;
023import java.time.Duration;
024import java.time.Instant;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Objects;
028
029import org.apache.commons.cli.CommandLine;
030import org.apache.commons.cli.Option;
031import org.apache.commons.cli.Options;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.lib.stream.computation.Watermark;
036import org.nuxeo.lib.stream.log.LogLag;
037import org.nuxeo.lib.stream.log.LogManager;
038import org.nuxeo.lib.stream.log.LogOffset;
039import org.nuxeo.lib.stream.log.LogPartition;
040import org.nuxeo.lib.stream.log.LogRecord;
041import org.nuxeo.lib.stream.log.LogTailer;
042import org.nuxeo.lib.stream.log.Name;
043
044/**
045 * Manipulates the consumer position to the beginning, end or a specific timestamp
046 *
047 * @since 10.1
048 */
049public class PositionCommand extends Command {
050    private static final Log log = LogFactory.getLog(PositionCommand.class);
051
052    public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000);
053
054    public static final Duration READ_TIMEOUT = Duration.ofMillis(100);
055
056    protected static final String NAME = "position";
057
058    public static final String AFTER_DATE_OPT = "after-date";
059
060    public static final String TO_WATERMARK_OPT = "to-watermark";
061
062    // @since 11.2
063    public static final String TO_OFFSET_OPT = "to-offset";
064
065    protected static long getTimestampFromDate(String dateIso8601) {
066        if (dateIso8601 == null || dateIso8601.isEmpty()) {
067            return -1;
068        }
069        try {
070            Instant instant = Instant.parse(dateIso8601);
071            return instant.toEpochMilli();
072        } catch (DateTimeException e) {
073            log.error("Failed to read the timeout: " + e.getMessage());
074            log.error("The timestamp should be in ISO-8601 format, eg. " + Instant.now());
075        }
076        return -1;
077    }
078
079    @Override
080    public String name() {
081        return NAME;
082    }
083
084    @Override
085    public void updateOptions(Options options) {
086        options.addOption(Option.builder("l")
087                                .longOpt("log-name")
088                                .desc("Log name")
089                                .required()
090                                .hasArg()
091                                .argName("LOG_NAME")
092                                .build());
093        options.addOption(Option.builder("p")
094                                .longOpt("partition")
095                                .desc("The log partition")
096                                .hasArg()
097                                .argName("PARTITION")
098                                .build());
099        options.addOption(
100                Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
101        options.addOption(Option.builder()
102                                .longOpt("codec")
103                                .desc("Codec used to read record and extract watermark when using to-watermark")
104                                .hasArg()
105                                .argName("CODEC")
106                                .build());
107        options.addOption(
108                Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build());
109        options.addOption(Option.builder()
110                                .longOpt("to-end")
111                                .desc("Sets the committed positions to the end of partitions for the group")
112                                .build());
113        options.addOption(
114                Option.builder()
115                      .longOpt(AFTER_DATE_OPT)
116                      .desc("Sets the committed positions for the group to a specific date."
117                              + " The date used to find the offset depends on the implementation, for Kafka this is the"
118                              + " LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date."
119                              + " The date is specified in ISO-8601 format, eg. " + Instant.now()
120                              + ". If no record offset is found with an appropriate timestamp then the command fails.")
121                      .hasArg()
122                      .argName("DATE")
123                      .build());
124        options.addOption(
125                Option.builder()
126                      .longOpt(TO_WATERMARK_OPT)
127                      .desc("Sets the committed positions for the group to a specific date."
128                              + " The date used to find the offset is contained in a record watermark. "
129                              + " This means that the LOG_NAME is expected to be a computation stream with records with populated watermark."
130                              + " The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\""
131                              + " The date is specified in ISO-8601 format, eg. " + Instant.now()
132                              + ". If no record offset is found with an appropriate timestamp then the command fails.")
133                      .hasArg()
134                      .argName("DATE")
135                      .build());
136        options.addOption(Option.builder()
137                                .longOpt(TO_OFFSET_OPT)
138                                .desc("Sets the committed position to a specific offset."
139                                        + " When restarted the consumer will continue on the next record.")
140                                .hasArg()
141                                .argName("OFFSET")
142                                .build());
143    }
144
145    @Override
146    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
147        Name name = Name.ofUrn(cmd.getOptionValue("log-name"));
148        Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools"));
149        String codec = cmd.getOptionValue("codec");
150        int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1"));
151        if (cmd.hasOption(AFTER_DATE_OPT)) {
152            long timestamp = getTimestampFromDate(cmd.getOptionValue(AFTER_DATE_OPT));
153            if (timestamp >= 0) {
154                return positionAfterDate(manager, group, name, partition, timestamp);
155            }
156        } else if (cmd.hasOption(TO_WATERMARK_OPT)) {
157            long timestamp = getTimestampFromDate(cmd.getOptionValue(TO_WATERMARK_OPT));
158            if (timestamp >= 0) {
159                return positionToWatermark(manager, group, name, partition, timestamp, codec);
160            }
161        } else if (cmd.hasOption(TO_OFFSET_OPT)) {
162            long offset = Long.parseLong(cmd.getOptionValue(TO_OFFSET_OPT));
163            return positionToOffset(manager, group, name, partition, offset);
164        } else if (cmd.hasOption("to-end")) {
165            return toEnd(manager, group, name, partition);
166        } else if (cmd.hasOption("reset")) {
167            return reset(manager, group, name, partition);
168        } else {
169            log.error("Invalid option, try 'help position'");
170        }
171        return false;
172    }
173
174    protected boolean toEnd(LogManager manager, Name group, Name name, int partition) {
175        LogLag lag = getLag(manager, group, name, partition);
176        try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) {
177            tailer.toEnd();
178            tailer.commit();
179        }
180        log.info(String.format("# Moved log %s, group: %s, from: %s to %s", labelFor(name, partition), group,
181                lag.lower(), lag.upper()));
182        return true;
183    }
184
185    protected String labelFor(int partition) {
186        return partition >= 0 ? Integer.toString(partition) : "all";
187    }
188
189    protected String labelFor(Name name, int partition) {
190        return partition >= 0 ? name.getUrn() + ":" + labelFor(partition) : name.getUrn();
191    }
192
193    protected LogLag getLag(LogManager manager, Name group, Name name, int partition) {
194        if (partition >= 0) {
195            return manager.getLagPerPartition(name, group).get(partition);
196        } else {
197            return manager.getLag(name, group);
198        }
199    }
200
201    protected <T extends Externalizable> LogTailer<T> createTailer(LogManager manager, Name name, int partition,
202            Name group) {
203        if (partition >= 0) {
204            return manager.createTailer(group, new LogPartition(name, partition));
205        }
206        return manager.createTailer(group, name);
207    }
208
209    protected boolean reset(LogManager manager, Name group, Name name, int partition) {
210        LogLag lag = getLag(manager, group, name, partition);
211        long pos = lag.lower();
212        try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) {
213            tailer.reset();
214        }
215        log.warn(String.format("# Reset log %s, group: %s, from: %s to 0", labelFor(name, partition), group, pos));
216        return true;
217    }
218
219    protected boolean positionAfterDate(LogManager manager, Name group, Name name, int partition, long timestamp) {
220        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
221            boolean movedOffset = false;
222            for (int part = 0; part < manager.size(name); part++) {
223                if (partition >= 0 && part != partition) {
224                    continue;
225                }
226                LogPartition logPartition = new LogPartition(name, part);
227                LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp);
228                if (logOffset == null) {
229                    log.error(String.format("# Could not find an offset for group: %s, partition: %s", group,
230                            logPartition));
231                    continue;
232                }
233                tailer.seek(logOffset);
234                movedOffset = true;
235                log.warn(String.format("# Set log %s, group: %s, to offset %s", labelFor(name, part), group,
236                        logOffset.offset()));
237            }
238            if (movedOffset) {
239                tailer.commit();
240                return true;
241            }
242        }
243        log.error("No offset found for the specified date");
244        return false;
245    }
246
247    protected boolean positionToWatermark(LogManager manager, Name group, Name name, int partition, long timestamp,
248            String codec)
249            throws InterruptedException {
250        Name newGroup = Name.ofUrn("admin/tools");
251        int size = manager.size(name);
252        List<LogOffset> offsets = new ArrayList<>(size);
253        List<LogLag> lags = manager.getLagPerPartition(name, newGroup);
254        int part = 0;
255        // find the offsets first
256        for (LogLag lag : lags) {
257            if (lag.lag() == 0) {
258                // empty partition nothing to do
259                offsets.add(null);
260            } else {
261                if (partition >= 0 && part != partition) {
262                    offsets.add(null);
263                }
264                try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, part),
265                        getRecordCodec(codec))) {
266                    offsets.add(searchWatermarkOffset(tailer, timestamp));
267                }
268            }
269            part++;
270        }
271        if (offsets.stream().noneMatch(Objects::nonNull)) {
272            if (LogLag.of(lags).upper() == 0) {
273                log.error("No offsets found because log is empty");
274                return false;
275            }
276            log.error("Timestamp: " + timestamp + " is earlier as any records, resetting positions");
277            return reset(manager, group, name, partition);
278        }
279        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
280            offsets.stream().filter(Objects::nonNull).forEach(tailer::seek);
281            tailer.commit();
282            offsets.stream().filter(Objects::nonNull).forEach(offset -> log.warn("# Moving consumer to: " + offset));
283        }
284        return true;
285    }
286
287    protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException {
288        LogOffset lastOffset = null;
289        for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(READ_TIMEOUT)) {
290            long recTimestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp();
291            if (recTimestamp == timestamp) {
292                return rec.offset();
293            } else if (recTimestamp > timestamp) {
294                return lastOffset;
295            }
296            if (recTimestamp == 0) {
297                throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec);
298            }
299            lastOffset = rec.offset();
300        }
301        // not found returns last offset of partition
302        return lastOffset;
303    }
304
305    protected boolean positionToOffset(LogManager manager, Name group, Name name, int partition, long offset) {
306        LogLag lag = getLag(manager, group, name, partition);
307        long pos = lag.lower();
308        boolean goToStart = false;
309        if (offset < pos) {
310            goToStart = true;
311        } else if (offset == pos) {
312            log.error(String.format("Current offset for group %s on %s is already %d", group, labelFor(name, partition),
313                    pos));
314            return false;
315        }
316        try (LogTailer<Externalizable> tailer = manager.createTailer(group, LogPartition.of(name, partition))) {
317            if (goToStart) {
318                tailer.toStart();
319            }
320            for (;;) {
321                LogRecord<Externalizable> record = tailer.read(Duration.ofSeconds(1));
322                if (record == null || record.offset().offset() > offset) {
323                    log.error("No offset found for the specified partition");
324                    return false;
325                }
326                if (record.offset().offset() == offset) {
327                    tailer.commit();
328                    log.warn(String.format("# Move group %s on %s from %d to %d", group, labelFor(name, partition), pos,
329                            offset));
330                    return true;
331                }
332            }
333        } catch (InterruptedException e) {
334            Thread.currentThread().interrupt();
335            log.error("Interrupted");
336            return false;
337        }
338    }
339}