001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.nio.file.Path; 023import java.nio.file.Paths; 024import java.time.Duration; 025 026import org.apache.avro.Schema; 027import org.apache.avro.file.CodecFactory; 028import org.apache.avro.file.DataFileWriter; 029import org.apache.avro.io.DatumWriter; 030import org.apache.avro.reflect.ReflectData; 031import org.apache.avro.reflect.ReflectDatumWriter; 032import org.apache.commons.cli.CommandLine; 033import org.apache.commons.cli.Option; 034import org.apache.commons.cli.Options; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.StreamRuntimeException; 038import org.nuxeo.lib.stream.computation.Record; 039import org.nuxeo.lib.stream.log.LogManager; 040import org.nuxeo.lib.stream.log.LogPartition; 041import org.nuxeo.lib.stream.log.LogRecord; 042import org.nuxeo.lib.stream.log.LogTailer; 043 044/** 045 * Dump records from a Log into an Avro file. 046 * 047 * @since 10.2 048 */ 049public class DumpCommand extends Command { 050 private static final Log log = LogFactory.getLog(DumpCommand.class); 051 052 protected static final String NAME = "dump"; 053 054 @Override 055 public String name() { 056 return NAME; 057 } 058 059 @Override 060 public void updateOptions(Options options) { 061 options.addOption(Option.builder("n") 062 .longOpt("count") 063 .desc("Dump the first N records into a file") 064 .hasArg() 065 .argName("N") 066 .build()); 067 options.addOption(Option.builder("l") 068 .longOpt("log-name") 069 .desc("Log name") 070 .required() 071 .hasArg() 072 .argName("LOG_NAME") 073 .build()); 074 options.addOption( 075 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 076 options.addOption(Option.builder() 077 .longOpt("codec") 078 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 079 .hasArg() 080 .argName("CODEC") 081 .build()); 082 options.addOption(Option.builder("p") 083 .longOpt("partition") 084 .desc("Read only this partition") 085 .hasArg() 086 .argName("PARTITION") 087 .build()); 088 options.addOption(Option.builder() 089 .longOpt("output") 090 .desc("Avro file path to dump the records") 091 .hasArg() 092 .required() 093 .argName("OUTPUT") 094 .build()); 095 } 096 097 @Override 098 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 099 int limit = Integer.parseInt(cmd.getOptionValue("count", "-1")); 100 String name = cmd.getOptionValue("log-name"); 101 String group = cmd.getOptionValue("group", "tools"); 102 String codec = cmd.getOptionValue("codec"); 103 int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1")); 104 String output = cmd.getOptionValue("output"); 105 dump(manager, name, partition, group, limit, codec, Paths.get(output)); 106 return true; 107 } 108 109 protected void dump(LogManager manager, String name, int partition, String group, int limit, String codec, 110 Path output) throws InterruptedException { 111 log.info("Dump record to file: " + output); 112 Schema schema = ReflectData.get().getSchema(Record.class); 113 DatumWriter<Record> datumWriter = new ReflectDatumWriter<>(schema); 114 int count = 0; 115 try (DataFileWriter<Record> dataFileWriter = new DataFileWriter<>(datumWriter)) { 116 dataFileWriter.setCodec(CodecFactory.snappyCodec()); 117 dataFileWriter.create(schema, output.toFile()); 118 try (LogTailer<Record> tailer = getTailer(manager, name, partition, group, codec)) { 119 do { 120 LogRecord<Record> record = tailer.read(Duration.ofMillis(1000)); 121 if (record == null) { 122 break; 123 } 124 dataFileWriter.append(record.message()); 125 count++; 126 } while (limit < 0 || (count < limit)); 127 } 128 } catch (IOException e) { 129 throw new StreamRuntimeException(e); 130 } 131 log.info(String.format("%d record(s) dumped", count)); 132 } 133 134 protected LogTailer<Record> getTailer(LogManager manager, String name, int partition, String group, String codec) { 135 if (partition >= 0) { 136 return manager.createTailer(group, new LogPartition(name, partition), getRecordCodec(codec)); 137 } 138 return manager.createTailer(group, name, getRecordCodec(codec)); 139 } 140}