001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.nio.file.Paths; 022import java.time.Duration; 023 024import org.apache.commons.cli.CommandLine; 025import org.apache.commons.cli.Option; 026import org.apache.commons.cli.Options; 027import org.nuxeo.lib.stream.computation.Record; 028import org.nuxeo.lib.stream.log.LogManager; 029import org.nuxeo.lib.stream.log.LogRecord; 030import org.nuxeo.lib.stream.log.LogTailer; 031import org.nuxeo.lib.stream.log.Name; 032import org.nuxeo.lib.stream.tools.renderer.Renderer; 033 034/** 035 * Output records to stdout. 036 * 037 * @since 9.3 038 */ 039public class CatCommand extends Command { 040 041 protected static final String NUXEO_SCHEMA_STORE = "nxserver/data/avro"; 042 043 protected static final String NAME = "cat"; 044 045 @Override 046 public String name() { 047 return NAME; 048 } 049 050 @Override 051 public void updateOptions(Options options) { 052 options.addOption( 053 Option.builder("n").longOpt("lines").desc("Render the first N records").hasArg().argName("N").build()); 054 options.addOption(Option.builder("l") 055 .longOpt("log-name") 056 .desc("Log name") 057 .required() 058 .hasArg() 059 .argName("LOG_NAME") 060 .build()); 061 options.addOption( 062 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 063 options.addOption(Option.builder() 064 .longOpt("codec") 065 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 066 .hasArg() 067 .argName("CODEC") 068 .build()); 069 options.addOption(Option.builder() 070 .longOpt("data-size") 071 .desc("Maximum size of message data to render") 072 .hasArg() 073 .argName("L") 074 .build()); 075 options.addOption( 076 Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build()); 077 options.addOption(Option.builder() 078 .longOpt("schema-store") 079 .desc("Set path of a FileAvroSchemaStore to load Avro schemas") 080 .hasArg() 081 .argName("SCHEMA_STORE_PATH") 082 .build()); 083 } 084 085 @Override 086 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 087 int limit = Integer.parseInt(cmd.getOptionValue("lines", "-1")); 088 int dataSize = Integer.parseInt(cmd.getOptionValue("data-size", "256")); 089 Name name = Name.ofUrn(cmd.getOptionValue("log-name")); 090 String render = cmd.getOptionValue("render", "default"); 091 Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools")); 092 String codec = cmd.getOptionValue("codec"); 093 String avroSchemaStorePath = cmd.getOptionValue("schema-store"); 094 if (avroSchemaStorePath == null && Paths.get(NUXEO_SCHEMA_STORE).toFile().exists()) { 095 avroSchemaStorePath = NUXEO_SCHEMA_STORE; 096 } 097 cat(manager, name, group, limit, getRecordRenderer(render, avroSchemaStorePath, dataSize), codec); 098 return true; 099 } 100 101 protected void cat(LogManager manager, Name name, Name group, int limit, Renderer render, String codec) 102 throws InterruptedException { 103 render.header(); 104 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 105 int count = 0; 106 do { 107 LogRecord<Record> record = tailer.read(Duration.ofMillis(1000)); 108 if (record == null) { 109 break; 110 } 111 count++; 112 render.accept(record); 113 } while (limit < 0 || (count < limit)); 114 } 115 render.footer(); 116 } 117}