001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.IOException;
022import java.nio.file.Path;
023import java.nio.file.Paths;
024import java.time.Duration;
025
026import org.apache.avro.Schema;
027import org.apache.avro.file.CodecFactory;
028import org.apache.avro.file.DataFileWriter;
029import org.apache.avro.io.DatumWriter;
030import org.apache.avro.reflect.ReflectData;
031import org.apache.avro.reflect.ReflectDatumWriter;
032import org.apache.commons.cli.CommandLine;
033import org.apache.commons.cli.Option;
034import org.apache.commons.cli.Options;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.lib.stream.StreamRuntimeException;
038import org.nuxeo.lib.stream.computation.Record;
039import org.nuxeo.lib.stream.log.LogManager;
040import org.nuxeo.lib.stream.log.LogPartition;
041import org.nuxeo.lib.stream.log.LogRecord;
042import org.nuxeo.lib.stream.log.LogTailer;
043import org.nuxeo.lib.stream.log.Name;
044
045/**
046 * Dump records from a Log into an Avro file.
047 *
048 * @since 10.2
049 */
050public class DumpCommand extends Command {
051    private static final Log log = LogFactory.getLog(DumpCommand.class);
052
053    protected static final String NAME = "dump";
054
055    @Override
056    public String name() {
057        return NAME;
058    }
059
060    @Override
061    public void updateOptions(Options options) {
062        options.addOption(Option.builder("n")
063                                .longOpt("count")
064                                .desc("Dump the first N records into a file")
065                                .hasArg()
066                                .argName("N")
067                                .build());
068        options.addOption(Option.builder("l")
069                                .longOpt("log-name")
070                                .desc("Log name")
071                                .required()
072                                .hasArg()
073                                .argName("LOG_NAME")
074                                .build());
075        options.addOption(
076                Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
077        options.addOption(Option.builder()
078                                .longOpt("codec")
079                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
080                                .hasArg()
081                                .argName("CODEC")
082                                .build());
083        options.addOption(Option.builder("p")
084                                .longOpt("partition")
085                                .desc("Read only this partition")
086                                .hasArg()
087                                .argName("PARTITION")
088                                .build());
089        options.addOption(Option.builder()
090                                .longOpt("output")
091                                .desc("Avro file path to dump the records")
092                                .hasArg()
093                                .required()
094                                .argName("OUTPUT")
095                                .build());
096    }
097
098    @Override
099    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
100        int limit = Integer.parseInt(cmd.getOptionValue("count", "-1"));
101        Name name = Name.ofUrn(cmd.getOptionValue("log-name"));
102        Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools"));
103        String codec = cmd.getOptionValue("codec");
104        int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1"));
105        String output = cmd.getOptionValue("output");
106        dump(manager, name, partition, group, limit, codec, Paths.get(output));
107        return true;
108    }
109
110    protected void dump(LogManager manager, Name name, int partition, Name group, int limit, String codec,
111            Path output) throws InterruptedException {
112        log.info("Dump record to file: " + output);
113        Schema schema = ReflectData.get().getSchema(Record.class);
114        DatumWriter<Record> datumWriter = new ReflectDatumWriter<>(schema);
115        int count = 0;
116        try (DataFileWriter<Record> dataFileWriter = new DataFileWriter<>(datumWriter)) {
117            dataFileWriter.setCodec(CodecFactory.snappyCodec());
118            dataFileWriter.create(schema, output.toFile());
119            try (LogTailer<Record> tailer = getTailer(manager, name, partition, group, codec)) {
120                do {
121                    LogRecord<Record> record = tailer.read(Duration.ofMillis(1000));
122                    if (record == null) {
123                        break;
124                    }
125                    dataFileWriter.append(record.message());
126                    count++;
127                } while (limit < 0 || (count < limit));
128            }
129        } catch (IOException e) {
130            throw new StreamRuntimeException(e);
131        }
132        log.info(String.format("%d record(s) dumped", count));
133    }
134
135    protected LogTailer<Record> getTailer(LogManager manager, Name name, int partition, Name group, String codec) {
136        if (partition >= 0) {
137            return manager.createTailer(group, new LogPartition(name, partition), getRecordCodec(codec));
138        }
139        return manager.createTailer(group, name, getRecordCodec(codec));
140    }
141}