001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import static org.nuxeo.lib.stream.tools.command.CatCommand.NUXEO_SCHEMA_STORE; 022 023import java.nio.file.Paths; 024import java.time.Duration; 025 026import org.apache.commons.cli.CommandLine; 027import org.apache.commons.cli.Option; 028import org.apache.commons.cli.Options; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.nuxeo.lib.stream.computation.Record; 032import org.nuxeo.lib.stream.log.LogManager; 033import org.nuxeo.lib.stream.log.LogRecord; 034import org.nuxeo.lib.stream.log.LogTailer; 035import org.nuxeo.lib.stream.log.Name; 036import org.nuxeo.lib.stream.tools.renderer.Renderer; 037 038/** 039 * Output the last records on a Log. 040 * 041 * @since 9.3 042 */ 043public class TailCommand extends Command { 044 private static final Log log = LogFactory.getLog(TailCommand.class); 045 046 protected static final String NAME = "tail"; 047 048 @Override 049 public String name() { 050 return NAME; 051 } 052 053 @Override 054 public void updateOptions(Options options) { 055 options.addOption(Option.builder("n") 056 .longOpt("lines") 057 .desc("output the last NUM records") 058 .hasArg() 059 .argName("NUM") 060 .build()); 061 options.addOption("f", "follow", false, "output appended records"); 062 options.addOption(Option.builder("l") 063 .longOpt("log-name") 064 .desc("Log name") 065 .required() 066 .hasArg() 067 .argName("LOG_NAME") 068 .build()); 069 options.addOption( 070 Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build()); 071 options.addOption(Option.builder() 072 .longOpt("codec") 073 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 074 .hasArg() 075 .argName("CODEC") 076 .build()); 077 options.addOption(Option.builder() 078 .longOpt("data-size") 079 .desc("Maximum size of message data to render") 080 .hasArg() 081 .argName("L") 082 .build()); 083 options.addOption( 084 Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build()); 085 options.addOption(Option.builder("t") 086 .longOpt("timeout") 087 .desc("Timeout on follow in second") 088 .hasArg() 089 .argName("TIMEOUT") 090 .build()); 091 options.addOption(Option.builder() 092 .longOpt("schema-store") 093 .desc("Set path of a FileAvroSchemaStore to load Avro schemas") 094 .hasArg() 095 .argName("SCHEMA_STORE_PATH") 096 .build()); 097 } 098 099 @Override 100 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 101 int lines = Integer.parseInt(cmd.getOptionValue("lines", "10")); 102 int dataSize = Integer.parseInt(cmd.getOptionValue("data-size", "256")); 103 Name name = Name.ofUrn(cmd.getOptionValue("log-name")); 104 String render = cmd.getOptionValue("render", "default"); 105 Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools")); 106 String codec = cmd.getOptionValue("codec"); 107 String avroSchemaStorePath = cmd.getOptionValue("schema-store"); 108 if (avroSchemaStorePath == null && Paths.get(NUXEO_SCHEMA_STORE).toFile().exists()) { 109 avroSchemaStorePath = NUXEO_SCHEMA_STORE; 110 } 111 int timeout = Integer.parseInt(cmd.getOptionValue("timeout", "120")); 112 tail(manager, name, group, lines, getRecordRenderer(render, avroSchemaStorePath, dataSize), codec); 113 if (cmd.hasOption("follow")) { 114 follow(manager, name, group, getRecordRenderer(render, avroSchemaStorePath, dataSize), timeout, codec); 115 } 116 return true; 117 } 118 119 @SuppressWarnings("unchecked") 120 protected void tail(LogManager manager, Name name, Name group, int lines, Renderer render, String codec) 121 throws InterruptedException { 122 LogRecord<Record>[] records = new LogRecord[lines]; 123 render.header(); 124 int count = 0; 125 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 126 LogRecord<Record> record; 127 do { 128 record = tailer.read(Duration.ofMillis(500)); 129 if (record != null) { 130 records[count++ % lines] = record; 131 } 132 } while (record != null); 133 } 134 for (int i = count; i < lines + count; i++) { 135 LogRecord<Record> record = records[i % lines]; 136 if (record != null) { 137 render.accept(record); 138 } 139 } 140 render.footer(); 141 } 142 143 protected void follow(LogManager manager, Name name, Name group, Renderer render, int timeout, String codec) 144 throws InterruptedException { 145 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 146 tailer.toEnd(); 147 while (true) { 148 LogRecord<Record> record = tailer.read(Duration.ofSeconds(timeout)); 149 if (record == null) { 150 log.error("tail timeout"); 151 break; 152 } 153 render.accept(record); 154 } 155 } 156 } 157}