001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.nio.file.Path; 023import java.nio.file.Paths; 024 025import org.apache.avro.Schema; 026import org.apache.avro.file.DataFileReader; 027import org.apache.avro.io.DatumReader; 028import org.apache.avro.reflect.ReflectData; 029import org.apache.avro.reflect.ReflectDatumReader; 030import org.apache.commons.cli.CommandLine; 031import org.apache.commons.cli.Option; 032import org.apache.commons.cli.Options; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.lib.stream.StreamRuntimeException; 036import org.nuxeo.lib.stream.computation.Record; 037import org.nuxeo.lib.stream.log.LogAppender; 038import org.nuxeo.lib.stream.log.LogManager; 039 040/** 041 * Appends records from a dump file into an a Log partition. 042 * 043 * @since 10.2 044 */ 045public class AppendCommand extends Command { 046 private static final Log log = LogFactory.getLog(AppendCommand.class); 047 048 protected static final String NAME = "append"; 049 050 @Override 051 public String name() { 052 return NAME; 053 } 054 055 @Override 056 public void updateOptions(Options options) { 057 options.addOption(Option.builder() 058 .longOpt("input") 059 .desc("Avro file to append to a stream") 060 .hasArg() 061 .required() 062 .argName("INPUT") 063 .build()); 064 options.addOption(Option.builder("l") 065 .longOpt("log-name") 066 .desc("Log name") 067 .required() 068 .hasArg() 069 .argName("LOG_NAME") 070 .build()); 071 options.addOption(Option.builder("p") 072 .longOpt("partition") 073 .desc("Write to this partition") 074 .required() 075 .hasArg() 076 .argName("PARTITION") 077 .build()); 078 options.addOption(Option.builder() 079 .longOpt("codec") 080 .desc("Codec used to write the records, can be: java, avro, avroBinary, avroJson") 081 .required() 082 .hasArg() 083 .argName("CODEC") 084 .build()); 085 } 086 087 @Override 088 public boolean run(LogManager manager, CommandLine cmd) { 089 String name = cmd.getOptionValue("log-name"); 090 String codec = cmd.getOptionValue("codec"); 091 int partition = Integer.parseInt(cmd.getOptionValue("partition")); 092 String input = cmd.getOptionValue("input"); 093 append(manager, name, partition, codec, Paths.get(input)); 094 return true; 095 } 096 097 protected void append(LogManager manager, String name, int partition, String codec, Path input) { 098 log.info(String.format("Append records from %s to stream: %s, partition: %d", input, name, partition)); 099 Schema schema = ReflectData.get().getSchema(Record.class); 100 DatumReader<Record> datumReader = new ReflectDatumReader<>(schema); 101 LogAppender<Record> appender = manager.getAppender(name, getRecordCodec(codec)); 102 int count = 0; 103 try (DataFileReader<Record> dataFileReader = new DataFileReader<>(input.toFile(), datumReader)) { 104 while (dataFileReader.hasNext()) { 105 Record record = dataFileReader.next(); 106 appender.append(partition, record); 107 count++; 108 } 109 } catch (IOException e) { 110 throw new StreamRuntimeException(e); 111 } 112 log.info(String.format("%d record(s) appended", count)); 113 } 114 115}