001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import static org.nuxeo.lib.stream.tools.command.CatCommand.NUXEO_SCHEMA_STORE; 022 023import java.nio.file.Paths; 024import java.time.Duration; 025 026import org.apache.commons.cli.CommandLine; 027import org.apache.commons.cli.Option; 028import org.apache.commons.cli.Options; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.computation.Record; 032import org.nuxeo.lib.stream.log.LogManager; 033import org.nuxeo.lib.stream.log.LogRecord; 034import org.nuxeo.lib.stream.log.LogTailer; 035import org.nuxeo.lib.stream.tools.renderer.Renderer; 036 037/** 038 * Output the last records on a Log. 039 * 040 * @since 9.3 041 */ 042public class TailCommand extends Command { 043 private static final Log log = LogFactory.getLog(TailCommand.class); 044 045 protected static final String NAME = "tail"; 046 047 @Override 048 public String name() { 049 return NAME; 050 } 051 052 @Override 053 public void updateOptions(Options options) { 054 options.addOption(Option.builder("n") 055 .longOpt("lines") 056 .desc("output the last NUM records") 057 .hasArg() 058 .argName("NUM") 059 .build()); 060 options.addOption("f", "follow", false, "output appended records"); 061 options.addOption(Option.builder("l") 062 .longOpt("log-name") 063 .desc("Log name") 064 .required() 065 .hasArg() 066 .argName("LOG_NAME") 067 .build()); 068 options.addOption( 069 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 070 options.addOption(Option.builder() 071 .longOpt("codec") 072 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 073 .hasArg() 074 .argName("CODEC") 075 .build()); 076 options.addOption(Option.builder() 077 .longOpt("data-size") 078 .desc("Maximum size of message data to render") 079 .hasArg() 080 .argName("L") 081 .build()); 082 options.addOption( 083 Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build()); 084 options.addOption(Option.builder("t") 085 .longOpt("timeout") 086 .desc("Timeout on follow in second") 087 .hasArg() 088 .argName("TIMEOUT") 089 .build()); 090 options.addOption(Option.builder() 091 .longOpt("schema-store") 092 .desc("Set path of a FileAvroSchemaStore to load Avro schemas") 093 .hasArg() 094 .argName("SCHEMA_STORE_PATH") 095 .build()); 096 } 097 098 @Override 099 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 100 int lines = Integer.parseInt(cmd.getOptionValue("lines", "10")); 101 int dataSize = Integer.parseInt(cmd.getOptionValue("data-size", "256")); 102 String name = cmd.getOptionValue("log-name"); 103 String render = cmd.getOptionValue("render", "default"); 104 String group = cmd.getOptionValue("group", "tools"); 105 String codec = cmd.getOptionValue("codec"); 106 String avroSchemaStorePath = cmd.getOptionValue("schema-store"); 107 if (avroSchemaStorePath == null && Paths.get(NUXEO_SCHEMA_STORE).toFile().exists()) { 108 avroSchemaStorePath = NUXEO_SCHEMA_STORE; 109 } 110 int timeout = Integer.parseInt(cmd.getOptionValue("timeout", "120")); 111 tail(manager, name, group, lines, getRecordRenderer(render, avroSchemaStorePath, dataSize), codec); 112 if (cmd.hasOption("follow")) { 113 follow(manager, name, group, getRecordRenderer(render, avroSchemaStorePath, dataSize), timeout, codec); 114 } 115 return true; 116 } 117 118 @SuppressWarnings("unchecked") 119 protected void tail(LogManager manager, String name, String group, int lines, Renderer render, String codec) 120 throws InterruptedException { 121 LogRecord<Record>[] records = new LogRecord[lines]; 122 render.header(); 123 int count = 0; 124 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 125 LogRecord<Record> record; 126 do { 127 record = tailer.read(Duration.ofMillis(500)); 128 if (record != null) { 129 records[count++ % lines] = record; 130 } 131 } while (record != null); 132 } 133 for (int i = count; i < lines + count; i++) { 134 LogRecord<Record> record = records[i % lines]; 135 if (record != null) { 136 render.accept(record); 137 } 138 } 139 render.footer(); 140 } 141 142 protected void follow(LogManager manager, String name, String group, Renderer render, int timeout, String codec) 143 throws InterruptedException { 144 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 145 tailer.toEnd(); 146 while (true) { 147 LogRecord<Record> record = tailer.read(Duration.ofSeconds(timeout)); 148 if (record == null) { 149 log.error("tail timeout"); 150 break; 151 } 152 render.accept(record); 153 } 154 } 155 } 156}