001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.IOException;
022import java.nio.file.Path;
023import java.nio.file.Paths;
024
025import org.apache.avro.Schema;
026import org.apache.avro.file.DataFileReader;
027import org.apache.avro.io.DatumReader;
028import org.apache.avro.reflect.ReflectData;
029import org.apache.avro.reflect.ReflectDatumReader;
030import org.apache.commons.cli.CommandLine;
031import org.apache.commons.cli.Option;
032import org.apache.commons.cli.Options;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.lib.stream.StreamRuntimeException;
036import org.nuxeo.lib.stream.computation.Record;
037import org.nuxeo.lib.stream.log.LogAppender;
038import org.nuxeo.lib.stream.log.LogManager;
039import org.nuxeo.lib.stream.log.Name;
040
041/**
042 * Appends records from a dump file into an a Log partition.
043 *
044 * @since 10.2
045 */
046public class AppendCommand extends Command {
047    private static final Log log = LogFactory.getLog(AppendCommand.class);
048
049    protected static final String NAME = "append";
050
051    @Override
052    public String name() {
053        return NAME;
054    }
055
056    @Override
057    public void updateOptions(Options options) {
058        options.addOption(Option.builder()
059                                .longOpt("input")
060                                .desc("Avro file to append to a stream")
061                                .hasArg()
062                                .required()
063                                .argName("INPUT")
064                                .build());
065        options.addOption(Option.builder("l")
066                                .longOpt("log-name")
067                                .desc("Log name")
068                                .required()
069                                .hasArg()
070                                .argName("LOG_NAME")
071                                .build());
072        options.addOption(Option.builder("p")
073                                .longOpt("partition")
074                                .desc("Write to this partition")
075                                .required()
076                                .hasArg()
077                                .argName("PARTITION")
078                                .build());
079        options.addOption(Option.builder()
080                                .longOpt("codec")
081                                .desc("Codec used to write the records, can be: java, avro, avroBinary, avroJson")
082                                .required()
083                                .hasArg()
084                                .argName("CODEC")
085                                .build());
086    }
087
088    @Override
089    public boolean run(LogManager manager, CommandLine cmd) {
090        Name name = Name.ofUrn(cmd.getOptionValue("log-name"));
091        String codec = cmd.getOptionValue("codec");
092        int partition = Integer.parseInt(cmd.getOptionValue("partition"));
093        String input = cmd.getOptionValue("input");
094        append(manager, name, partition, codec, Paths.get(input));
095        return true;
096    }
097
098    protected void append(LogManager manager, Name name, int partition, String codec, Path input) {
099        log.info(String.format("Append records from %s to stream: %s, partition: %d", input, name, partition));
100        Schema schema = ReflectData.get().getSchema(Record.class);
101        DatumReader<Record> datumReader = new ReflectDatumReader<>(schema);
102        LogAppender<Record> appender = manager.getAppender(name, getRecordCodec(codec));
103        int count = 0;
104        try (DataFileReader<Record> dataFileReader = new DataFileReader<>(input.toFile(), datumReader)) {
105            while (dataFileReader.hasNext()) {
106                Record record = dataFileReader.next();
107                appender.append(partition, record);
108                count++;
109            }
110        } catch (IOException e) {
111            throw new StreamRuntimeException(e);
112        }
113        log.info(String.format("%d record(s) appended", count));
114    }
115
116}