001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Gethin James
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.Externalizable;
022import java.time.DateTimeException;
023import java.time.Duration;
024import java.time.Instant;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Objects;
028
029import org.apache.commons.cli.CommandLine;
030import org.apache.commons.cli.Option;
031import org.apache.commons.cli.Options;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.lib.stream.computation.Watermark;
036import org.nuxeo.lib.stream.log.LogLag;
037import org.nuxeo.lib.stream.log.LogManager;
038import org.nuxeo.lib.stream.log.LogOffset;
039import org.nuxeo.lib.stream.log.LogPartition;
040import org.nuxeo.lib.stream.log.LogRecord;
041import org.nuxeo.lib.stream.log.LogTailer;
042
043/**
044 * Manipulates the consumer position to the beginning, end or a specific timestamp
045 *
046 * @since 10.1
047 */
048public class PositionCommand extends Command {
049    private static final Log log = LogFactory.getLog(PositionCommand.class);
050
051    public static final Duration FIRST_READ_TIMEOUT = Duration.ofMillis(1000);
052
053    public static final Duration READ_TIMEOUT = Duration.ofMillis(100);
054
055    protected static final String NAME = "position";
056
057    public static final String AFTER_DATE_OPT = "after-date";
058
059    public static final String TO_WATERMARK_OPT = "to-watermark";
060
061    protected static long getTimestampFromDate(String dateIso8601) {
062        if (dateIso8601 == null || dateIso8601.isEmpty()) {
063            return -1;
064        }
065        try {
066            Instant instant = Instant.parse(dateIso8601);
067            return instant.toEpochMilli();
068        } catch (DateTimeException e) {
069            log.error("Failed to read the timeout: " + e.getMessage());
070            log.error("The timestamp should be in ISO-8601 format, eg. " + Instant.now());
071        }
072        return -1;
073    }
074
075    @Override
076    public String name() {
077        return NAME;
078    }
079
080    @Override
081    public void updateOptions(Options options) {
082        options.addOption(Option.builder("l")
083                                .longOpt("log-name")
084                                .desc("Log name")
085                                .required()
086                                .hasArg()
087                                .argName("LOG_NAME")
088                                .build());
089        options.addOption(Option.builder("p")
090                                .longOpt("partition")
091                                .desc("Read only this partition")
092                                .hasArg()
093                                .argName("PARTITION")
094                                .build());
095        options.addOption(
096                Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
097        options.addOption(
098                Option.builder().longOpt("reset").desc("Resets all committed positions for the group").build());
099        options.addOption(Option.builder()
100                                .longOpt("to-end")
101                                .desc("Sets the committed positions to the end of partitions for the group")
102                                .build());
103        options.addOption(
104                Option.builder()
105                      .longOpt(AFTER_DATE_OPT)
106                      .desc("Sets the committed positions for the group to a specific date."
107                              + " The date used to find the offset depends on the implementation, for Kafka this is the"
108                              + " LogAppendTime. The position is set to the earliest offset whose timestamp is greater than or equal to the given date."
109                              + " The date is specified in ISO-8601 format, eg. " + Instant.now()
110                              + ". If no record offset is found with an appropriate timestamp then the command fails.")
111                      .hasArg()
112                      .argName("DATE")
113                      .build());
114        options.addOption(
115                Option.builder()
116                      .longOpt(TO_WATERMARK_OPT)
117                      .desc("Sets the committed positions for the group to a specific date."
118                              + " The date used to find the offset is contained in a record watermark. "
119                              + " This means that the LOG_NAME is expected to be a computation stream with records with populated watermark."
120                              + " The position is set to the biggest record offset with a watermark date inferior or equals to the given date.\""
121                              + " The date is specified in ISO-8601 format, eg. " + Instant.now()
122                              + ". If no record offset is found with an appropriate timestamp then the command fails.")
123                      .hasArg()
124                      .argName("DATE")
125                      .build());
126    }
127
128    @Override
129    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
130        String name = cmd.getOptionValue("log-name");
131        String group = cmd.getOptionValue("group", "tools");
132        int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1"));
133        if (cmd.hasOption(AFTER_DATE_OPT)) {
134            long timestamp = getTimestampFromDate(cmd.getOptionValue(AFTER_DATE_OPT));
135            if (timestamp >= 0) {
136                return positionAfterDate(manager, group, name, partition, timestamp);
137            }
138        } else if (cmd.hasOption(TO_WATERMARK_OPT)) {
139            long timestamp = getTimestampFromDate(cmd.getOptionValue(TO_WATERMARK_OPT));
140            if (timestamp >= 0) {
141                return positionToWatermark(manager, group, name, partition, timestamp);
142            }
143        } else if (cmd.hasOption("to-end")) {
144            return toEnd(manager, group, name, partition);
145        } else if (cmd.hasOption("reset")) {
146            return reset(manager, group, name, partition);
147        } else {
148            log.error("Invalid option, try 'help position'");
149        }
150        return false;
151    }
152
153    protected boolean toEnd(LogManager manager, String group, String name, int partition) {
154        LogLag lag = getLag(manager, group, name, partition);
155        try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) {
156            tailer.toEnd();
157            tailer.commit();
158        }
159        log.info(String.format("# Moved log %s, group: %s, from: %s to %s", labelFor(name, partition), group,
160                lag.lower(), lag.upper()));
161        return true;
162    }
163
164    protected String labelFor(int partition) {
165        return partition >= 0 ? Integer.toString(partition) : "all";
166    }
167
168    protected String labelFor(String name, int partition) {
169        return partition >= 0 ?  name + ":" + labelFor(partition): name;
170    }
171
172    protected LogLag getLag(LogManager manager, String group, String name, int partition) {
173        if (partition >= 0) {
174            return manager.getLagPerPartition(name, group).get(partition);
175        } else {
176            return manager.getLag(name, group);
177        }
178    }
179
180    protected <T extends Externalizable> LogTailer<T> createTailer(LogManager manager, String name, int partition,
181            String group) {
182        if (partition >= 0) {
183            return manager.createTailer(group, new LogPartition(name, partition));
184        }
185        return manager.createTailer(group, name);
186    }
187
188    protected boolean reset(LogManager manager, String group, String name, int partition) {
189        LogLag lag = getLag(manager, group, name, partition);
190        long pos = lag.lower();
191        try (LogTailer<Externalizable> tailer = createTailer(manager, name, partition, group)) {
192            tailer.reset();
193        }
194        log.warn(String.format("# Reset log %s, group: %s, from: %s to 0", labelFor(name, partition), group, pos));
195        return true;
196    }
197
198    protected boolean positionAfterDate(LogManager manager, String group, String name, int partition, long timestamp) {
199        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
200            boolean movedOffset = false;
201            for (int part = 0; part < manager.size(name); part++) {
202                if (partition >= 0 && part != partition) {
203                    continue;
204                }
205                LogPartition logPartition = new LogPartition(name, part);
206                LogOffset logOffset = tailer.offsetForTimestamp(logPartition, timestamp);
207                if (logOffset == null) {
208                    log.error(String.format("# Could not find an offset for group: %s, partition: %s", group,
209                            logPartition));
210                    continue;
211                }
212                tailer.seek(logOffset);
213                movedOffset = true;
214                log.info(String.format("# Set log %s, group: %s, to offset %s", labelFor(name, part), group, logOffset.offset()));
215            }
216            if (movedOffset) {
217                tailer.commit();
218                return true;
219            }
220        }
221        log.error("No offset found for the specified date");
222        return false;
223    }
224
225    protected boolean positionToWatermark(LogManager manager, String group, String name, int partition, long timestamp)
226            throws InterruptedException {
227        String newGroup = "tools";
228        int size = manager.size(name);
229        List<LogOffset> offsets = new ArrayList<>(size);
230        List<LogLag> lags = manager.getLagPerPartition(name, newGroup);
231        int part = 0;
232        // find the offsets first
233        for (LogLag lag : lags) {
234            if (lag.lag() == 0) {
235                // empty partition nothing to do
236                offsets.add(null);
237            } else {
238                if (partition >= 0 && part!= partition) {
239                    offsets.add(null);
240                }
241                try (LogTailer<Record> tailer = manager.createTailer(newGroup, new LogPartition(name, part))) {
242                    offsets.add(searchWatermarkOffset(tailer, timestamp));
243                }
244            }
245            part++;
246        }
247        if (offsets.stream().noneMatch(Objects::nonNull)) {
248            if (LogLag.of(lags).upper() == 0) {
249                log.error("No offsets found because log is empty");
250                return false;
251            }
252            log.error("Timestamp: " + timestamp + " is earlier as any records, resetting positions");
253            return reset(manager, group, name, partition);
254        }
255        try (LogTailer<Externalizable> tailer = manager.createTailer(group, name)) {
256            offsets.stream().filter(Objects::nonNull).forEach(tailer::seek);
257            tailer.commit();
258            offsets.stream().filter(Objects::nonNull).forEach(offset -> log.info("# Moving consumer to: " + offset));
259        }
260        return true;
261    }
262
263    protected LogOffset searchWatermarkOffset(LogTailer<Record> tailer, long timestamp) throws InterruptedException {
264        LogOffset lastOffset = null;
265        for (LogRecord<Record> rec = tailer.read(FIRST_READ_TIMEOUT); rec != null; rec = tailer.read(READ_TIMEOUT)) {
266            long recTimestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp();
267            if (recTimestamp == timestamp) {
268                return rec.offset();
269            } else if (recTimestamp > timestamp) {
270                return lastOffset;
271            }
272            if (recTimestamp == 0) {
273                throw new IllegalArgumentException("Cannot find position because Record has empty watermark: " + rec);
274            }
275            lastOffset = rec.offset();
276        }
277        // not found returns last offset of partition
278        return lastOffset;
279    }
280
281}