001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.nio.file.Paths; 022import java.time.Duration; 023 024import org.apache.commons.cli.CommandLine; 025import org.apache.commons.cli.Option; 026import org.apache.commons.cli.Options; 027import org.nuxeo.lib.stream.computation.Record; 028import org.nuxeo.lib.stream.log.LogManager; 029import org.nuxeo.lib.stream.log.LogRecord; 030import org.nuxeo.lib.stream.log.LogTailer; 031import org.nuxeo.lib.stream.tools.renderer.Renderer; 032 033/** 034 * Output records to stdout. 035 * 036 * @since 9.3 037 */ 038public class CatCommand extends Command { 039 040 protected static final String NUXEO_SCHEMA_STORE = "nxserver/data/avro"; 041 042 protected static final String NAME = "cat"; 043 044 @Override 045 public String name() { 046 return NAME; 047 } 048 049 @Override 050 public void updateOptions(Options options) { 051 options.addOption( 052 Option.builder("n").longOpt("lines").desc("Render the first N records").hasArg().argName("N").build()); 053 options.addOption(Option.builder("l") 054 .longOpt("log-name") 055 .desc("Log name") 056 .required() 057 .hasArg() 058 .argName("LOG_NAME") 059 .build()); 060 options.addOption( 061 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 062 options.addOption(Option.builder() 063 .longOpt("codec") 064 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 065 .hasArg() 066 .argName("CODEC") 067 .build()); 068 options.addOption(Option.builder() 069 .longOpt("data-size") 070 .desc("Maximum size of message data to render") 071 .hasArg() 072 .argName("L") 073 .build()); 074 options.addOption( 075 Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build()); 076 options.addOption(Option.builder() 077 .longOpt("schema-store") 078 .desc("Set path of a FileAvroSchemaStore to load Avro schemas") 079 .hasArg() 080 .argName("SCHEMA_STORE_PATH") 081 .build()); 082 } 083 084 @Override 085 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 086 int limit = Integer.parseInt(cmd.getOptionValue("lines", "-1")); 087 int dataSize = Integer.parseInt(cmd.getOptionValue("data-size", "256")); 088 String name = cmd.getOptionValue("log-name"); 089 String render = cmd.getOptionValue("render", "default"); 090 String group = cmd.getOptionValue("group", "tools"); 091 String codec = cmd.getOptionValue("codec"); 092 String avroSchemaStorePath = cmd.getOptionValue("schema-store"); 093 if (avroSchemaStorePath == null && Paths.get(NUXEO_SCHEMA_STORE).toFile().exists()) { 094 avroSchemaStorePath = NUXEO_SCHEMA_STORE; 095 } 096 cat(manager, name, group, limit, getRecordRenderer(render, avroSchemaStorePath, dataSize), codec); 097 return true; 098 } 099 100 protected void cat(LogManager manager, String name, String group, int limit, Renderer render, String codec) 101 throws InterruptedException { 102 render.header(); 103 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 104 int count = 0; 105 do { 106 LogRecord<Record> record = tailer.read(Duration.ofMillis(1000)); 107 if (record == null) { 108 break; 109 } 110 count++; 111 render.accept(record); 112 } while (limit < 0 || (count < limit)); 113 } 114 render.footer(); 115 } 116}