001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.nio.file.Path; 023import java.nio.file.Paths; 024 025import org.apache.avro.Schema; 026import org.apache.avro.file.DataFileReader; 027import org.apache.avro.io.DatumReader; 028import org.apache.avro.reflect.ReflectData; 029import org.apache.avro.reflect.ReflectDatumReader; 030import org.apache.commons.cli.CommandLine; 031import org.apache.commons.cli.Option; 032import org.apache.commons.cli.Options; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.lib.stream.StreamRuntimeException; 036import org.nuxeo.lib.stream.computation.Record; 037import org.nuxeo.lib.stream.log.LogAppender; 038import org.nuxeo.lib.stream.log.LogManager; 039import org.nuxeo.lib.stream.log.Name; 040 041/** 042 * Appends records from a dump file into an a Log partition. 043 * 044 * @since 10.2 045 */ 046public class AppendCommand extends Command { 047 private static final Log log = LogFactory.getLog(AppendCommand.class); 048 049 protected static final String NAME = "append"; 050 051 @Override 052 public String name() { 053 return NAME; 054 } 055 056 @Override 057 public void updateOptions(Options options) { 058 options.addOption(Option.builder() 059 .longOpt("input") 060 .desc("Avro file to append to a stream") 061 .hasArg() 062 .required() 063 .argName("INPUT") 064 .build()); 065 options.addOption(Option.builder("l") 066 .longOpt("log-name") 067 .desc("Log name") 068 .required() 069 .hasArg() 070 .argName("LOG_NAME") 071 .build()); 072 options.addOption(Option.builder("p") 073 .longOpt("partition") 074 .desc("Write to this partition") 075 .required() 076 .hasArg() 077 .argName("PARTITION") 078 .build()); 079 options.addOption(Option.builder() 080 .longOpt("codec") 081 .desc("Codec used to write the records, can be: java, avro, avroBinary, avroJson") 082 .required() 083 .hasArg() 084 .argName("CODEC") 085 .build()); 086 } 087 088 @Override 089 public boolean run(LogManager manager, CommandLine cmd) { 090 Name name = Name.ofUrn(cmd.getOptionValue("log-name")); 091 String codec = cmd.getOptionValue("codec"); 092 int partition = Integer.parseInt(cmd.getOptionValue("partition")); 093 String input = cmd.getOptionValue("input"); 094 append(manager, name, partition, codec, Paths.get(input)); 095 return true; 096 } 097 098 protected void append(LogManager manager, Name name, int partition, String codec, Path input) { 099 log.info(String.format("Append records from %s to stream: %s, partition: %d", input, name, partition)); 100 Schema schema = ReflectData.get().getSchema(Record.class); 101 DatumReader<Record> datumReader = new ReflectDatumReader<>(schema); 102 LogAppender<Record> appender = manager.getAppender(name, getRecordCodec(codec)); 103 int count = 0; 104 try (DataFileReader<Record> dataFileReader = new DataFileReader<>(input.toFile(), datumReader)) { 105 while (dataFileReader.hasNext()) { 106 Record record = dataFileReader.next(); 107 appender.append(partition, record); 108 count++; 109 } 110 } catch (IOException e) { 111 throw new StreamRuntimeException(e); 112 } 113 log.info(String.format("%d record(s) appended", count)); 114 } 115 116}