001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.time.Duration; 022 023import org.apache.commons.cli.CommandLine; 024import org.apache.commons.cli.Option; 025import org.apache.commons.cli.Options; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.lib.stream.computation.Record; 029import org.nuxeo.lib.stream.log.LogManager; 030import org.nuxeo.lib.stream.log.LogRecord; 031import org.nuxeo.lib.stream.log.LogTailer; 032import org.nuxeo.lib.stream.tools.renderer.Renderer; 033 034/** 035 * Output the last records on a Log. 036 * 037 * @since 9.3 038 */ 039public class TailCommand extends Command { 040 private static final Log log = LogFactory.getLog(TailCommand.class); 041 042 protected static final String NAME = "tail"; 043 044 @Override 045 public String name() { 046 return NAME; 047 } 048 049 @Override 050 public void updateOptions(Options options) { 051 options.addOption(Option.builder("n") 052 .longOpt("lines") 053 .desc("output the last NUM records") 054 .hasArg() 055 .argName("NUM") 056 .build()); 057 options.addOption("f", "follow", false, "output appended records"); 058 options.addOption(Option.builder("l") 059 .longOpt("log-name") 060 .desc("Log name") 061 .required() 062 .hasArg() 063 .argName("LOG_NAME") 064 .build()); 065 options.addOption( 066 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 067 options.addOption(Option.builder() 068 .longOpt("codec") 069 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 070 .hasArg() 071 .argName("CODEC") 072 .build()); 073 options.addOption( 074 Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build()); 075 options.addOption(Option.builder("t") 076 .longOpt("timeout") 077 .desc("Timeout on follow in second") 078 .hasArg() 079 .argName("TIMEOUT") 080 .build()); 081 } 082 083 @Override 084 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 085 int lines = Integer.parseInt(cmd.getOptionValue("lines", "10")); 086 String name = cmd.getOptionValue("log-name"); 087 String render = cmd.getOptionValue("render", "default"); 088 String group = cmd.getOptionValue("group", "tools"); 089 String codec = cmd.getOptionValue("codec"); 090 int timeout = Integer.parseInt(cmd.getOptionValue("timeout", "120")); 091 tail(manager, name, group, lines, getRecordRenderer(render), codec); 092 if (cmd.hasOption("follow")) { 093 follow(manager, name, group, getRecordRenderer(render), timeout, codec); 094 } 095 return true; 096 } 097 098 @SuppressWarnings("unchecked") 099 protected void tail(LogManager manager, String name, String group, int lines, Renderer render, String codec) 100 throws InterruptedException { 101 LogRecord<Record>[] records = new LogRecord[lines]; 102 render.header(); 103 int count = 0; 104 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 105 LogRecord<Record> record; 106 do { 107 record = tailer.read(Duration.ofMillis(500)); 108 if (record != null) { 109 records[count++ % lines] = record; 110 } 111 } while (record != null); 112 } 113 for (int i = count; i < lines + count; i++) { 114 LogRecord<Record> record = records[i % lines]; 115 if (record != null) { 116 render.accept(record); 117 } 118 } 119 render.footer(); 120 } 121 122 protected void follow(LogManager manager, String name, String group, Renderer render, int timeout, String codec) 123 throws InterruptedException { 124 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 125 tailer.toEnd(); 126 while (true) { 127 LogRecord<Record> record = tailer.read(Duration.ofSeconds(timeout)); 128 if (record == null) { 129 log.error("tail timeout"); 130 break; 131 } 132 render.accept(record); 133 } 134 } 135 } 136}