001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.command; 020 021import java.io.ByteArrayInputStream; 022import java.io.IOException; 023import java.io.ObjectInput; 024import java.io.ObjectInputStream; 025import java.lang.reflect.InvocationTargetException; 026import java.lang.reflect.Method; 027import java.time.Duration; 028 029import org.apache.commons.cli.CommandLine; 030import org.apache.commons.cli.Option; 031import org.apache.commons.cli.Options; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.LogRecord; 037import org.nuxeo.lib.stream.log.LogTailer; 038import org.nuxeo.lib.stream.log.Name; 039 040/** 041 * Output information of Work in a StreamWorkManager Queue. 042 * 043 * @since 11.5 044 */ 045public class WorkCatCommand extends Command { 046 private static final Log log = LogFactory.getLog(WorkCatCommand.class); 047 048 protected static final String NAME = "workCat"; 049 050 @Override 051 public String name() { 052 return NAME; 053 } 054 055 @Override 056 public void updateOptions(Options options) { 057 options.addOption(Option.builder("n").desc("Limit to the first N Works").hasArg().argName("N").build()); 058 options.addOption(Option.builder("l") 059 .longOpt("log-name") 060 .desc("Work Queue Log name") 061 .required() 062 .hasArg() 063 .argName("LOG_NAME") 064 .build()); 065 options.addOption(Option.builder() 066 .longOpt("codec") 067 .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson") 068 .hasArg() 069 .argName("CODEC") 070 .build()); 071 } 072 073 @Override 074 public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException { 075 int limit = Integer.parseInt(cmd.getOptionValue("n", "-1")); 076 Name name = Name.ofUrn(cmd.getOptionValue("log-name")); 077 Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools")); 078 String codec = cmd.getOptionValue("codec"); 079 workStat(manager, name, group, limit, codec); 080 return true; 081 } 082 083 protected void workStat(LogManager manager, Name name, Name group, int limit, String codec) 084 throws InterruptedException { 085 086 try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) { 087 int count = 0; 088 log.info("pos,class,fullname,category,name"); 089 do { 090 LogRecord<Record> record = tailer.read(Duration.ofMillis(1000)); 091 if (record == null) { 092 break; 093 } 094 count++; 095 log.info(record.offset().toString() + "," + deserialize(record.message().getData())); 096 } while (limit < 0 || (count < limit)); 097 } 098 099 } 100 101 public static String deserialize(byte[] data) { 102 ByteArrayInputStream bis = new ByteArrayInputStream(data); 103 ObjectInput in = null; 104 String ret = ""; 105 try { 106 in = new ObjectInputStream(bis); 107 Object work = in.readObject(); 108 String clazz = work.getClass().getSimpleName(); 109 ret += clazz; 110 ret += "," + work.getClass().getCanonicalName(); 111 Method getCategory = work.getClass().getMethod("getCategory"); 112 String category = (String) getCategory.invoke(work); 113 ret += "," + category; 114 ret += "," + work.toString().replace(",", "."); 115 return ret; 116 } catch (IOException | ClassNotFoundException | NoSuchMethodException e) { 117 throw new RuntimeException(e); 118 } catch (IllegalAccessException e) { 119 throw new RuntimeException(e); 120 } catch (InvocationTargetException e) { 121 throw new RuntimeException(e); 122 } finally { 123 try { 124 if (in != null) { 125 in.close(); 126 } 127 } catch (IOException ex) { 128 // ignore close exception so we cannot use a try-with-resources squid:S2093 129 } 130 } 131 } 132 133}