001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.IOException; 022import java.nio.file.Path; 023import java.nio.file.Paths; 024import java.time.Duration; 025 026import org.apache.avro.Schema; 027import org.apache.avro.file.CodecFactory; 028import org.apache.avro.file.DataFileWriter; 029import org.apache.avro.io.DatumWriter; 030import org.apache.avro.reflect.ReflectData; 031import org.apache.avro.reflect.ReflectDatumWriter; 032import org.apache.commons.cli.CommandLine; 033import org.apache.commons.cli.Option; 034import org.apache.commons.cli.Options; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.lib.stream.StreamRuntimeException; 038import org.nuxeo.lib.stream.computation.Record; 039import org.nuxeo.lib.stream.log.LogManager; 040import org.nuxeo.lib.stream.log.LogPartition; 041import org.nuxeo.lib.stream.log.LogRecord; 042import org.nuxeo.lib.stream.log.LogTailer; 043import org.nuxeo.lib.stream.log.Name; 044 045/** 046 * Dump records from a Log into an Avro file. 047 * 048 * @since 10.2 049 */ 050public class DumpCommand extends Command { 051 private static final Log log = LogFactory.getLog(DumpCommand.class); 052 053 protected static final String NAME = "dump"; 054 055 @Override 056 public String name() { 057 return NAME; 058 } 059 060 @Override 061 public void updateOptions(Options options) { 062 options.addOption(Option.builder("n") 063 .longOpt("count") 064 .desc("Dump the first N records into a file") 065 .hasArg() 066 .argName("N") 067 .build()); 068 options.addOption(Option.builder("l") 069 .longOpt("log-name") 070 .desc("Log name") 071 .required() 072 .hasArg() 073 .argName("LOG_NAME") 074 .build()); 075 options.addOption( 076 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 077 options.addOption(Option.builder() 078 .longOpt("codec") 079 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 080 .hasArg() 081 .argName("CODEC") 082 .build()); 083 options.addOption(Option.builder("p") 084 .longOpt("partition") 085 .desc("Read only this partition") 086 .hasArg() 087 .argName("PARTITION") 088 .build()); 089 options.addOption(Option.builder() 090 .longOpt("output") 091 .desc("Avro file path to dump the records") 092 .hasArg() 093 .required() 094 .argName("OUTPUT") 095 .build()); 096 } 097 098 @Override 099 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 100 int limit = Integer.parseInt(cmd.getOptionValue("count", "-1")); 101 Name name = Name.ofUrn(cmd.getOptionValue("log-name")); 102 Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools")); 103 String codec = cmd.getOptionValue("codec"); 104 int partition = Integer.parseInt(cmd.getOptionValue("partition", "-1")); 105 String output = cmd.getOptionValue("output"); 106 dump(manager, name, partition, group, limit, codec, Paths.get(output)); 107 return true; 108 } 109 110 protected void dump(LogManager manager, Name name, int partition, Name group, int limit, String codec, 111 Path output) throws InterruptedException { 112 log.info("Dump record to file: " + output); 113 Schema schema = ReflectData.get().getSchema(Record.class); 114 DatumWriter<Record> datumWriter = new ReflectDatumWriter<>(schema); 115 int count = 0; 116 try (DataFileWriter<Record> dataFileWriter = new DataFileWriter<>(datumWriter)) { 117 dataFileWriter.setCodec(CodecFactory.snappyCodec()); 118 dataFileWriter.create(schema, output.toFile()); 119 try (LogTailer<Record> tailer = getTailer(manager, name, partition, group, codec)) { 120 do { 121 LogRecord<Record> record = tailer.read(Duration.ofMillis(1000)); 122 if (record == null) { 123 break; 124 } 125 dataFileWriter.append(record.message()); 126 count++; 127 } while (limit < 0 || (count < limit)); 128 } 129 } catch (IOException e) { 130 throw new StreamRuntimeException(e); 131 } 132 log.info(String.format("%d record(s) dumped", count)); 133 } 134 135 protected LogTailer<Record> getTailer(LogManager manager, Name name, int partition, Name group, String codec) { 136 if (partition >= 0) { 137 return manager.createTailer(group, new LogPartition(name, partition), getRecordCodec(codec)); 138 } 139 return manager.createTailer(group, name, getRecordCodec(codec)); 140 } 141}