001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.io.ByteArrayInputStream;
022import java.io.IOException;
023import java.io.ObjectInput;
024import java.io.ObjectInputStream;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.time.Duration;
028
029import org.apache.commons.cli.CommandLine;
030import org.apache.commons.cli.Option;
031import org.apache.commons.cli.Options;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.lib.stream.computation.Record;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.LogRecord;
037import org.nuxeo.lib.stream.log.LogTailer;
038import org.nuxeo.lib.stream.log.Name;
039
040/**
041 * Output information of Work in a StreamWorkManager Queue.
042 *
043 * @since 11.5
044 */
045public class WorkCatCommand extends Command {
046    private static final Log log = LogFactory.getLog(WorkCatCommand.class);
047
048    protected static final String NAME = "workCat";
049
050    @Override
051    public String name() {
052        return NAME;
053    }
054
055    @Override
056    public void updateOptions(Options options) {
057        options.addOption(Option.builder("n").desc("Limit to the first N Works").hasArg().argName("N").build());
058        options.addOption(Option.builder("l")
059                                .longOpt("log-name")
060                                .desc("Work Queue Log name")
061                                .required()
062                                .hasArg()
063                                .argName("LOG_NAME")
064                                .build());
065        options.addOption(Option.builder()
066                                .longOpt("codec")
067                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
068                                .hasArg()
069                                .argName("CODEC")
070                                .build());
071    }
072
073    @Override
074    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
075        int limit = Integer.parseInt(cmd.getOptionValue("n", "-1"));
076        Name name = Name.ofUrn(cmd.getOptionValue("log-name"));
077        Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools"));
078        String codec = cmd.getOptionValue("codec");
079        workStat(manager, name, group, limit, codec);
080        return true;
081    }
082
083    protected void workStat(LogManager manager, Name name, Name group, int limit, String codec)
084            throws InterruptedException {
085
086        try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) {
087            int count = 0;
088            log.info("pos,class,fullname,category,name");
089            do {
090                LogRecord<Record> record = tailer.read(Duration.ofMillis(1000));
091                if (record == null) {
092                    break;
093                }
094                count++;
095                log.info(record.offset().toString() + "," + deserialize(record.message().getData()));
096            } while (limit < 0 || (count < limit));
097        }
098
099    }
100
101    public static String deserialize(byte[] data) {
102        ByteArrayInputStream bis = new ByteArrayInputStream(data);
103        ObjectInput in = null;
104        String ret = "";
105        try {
106            in = new ObjectInputStream(bis);
107            Object work = in.readObject();
108            String clazz = work.getClass().getSimpleName();
109            ret += clazz;
110            ret += "," + work.getClass().getCanonicalName();
111            Method getCategory = work.getClass().getMethod("getCategory");
112            String category = (String) getCategory.invoke(work);
113            ret += "," + category;
114            ret += "," + work.toString().replace(",", ".");
115            return ret;
116        } catch (IOException | ClassNotFoundException | NoSuchMethodException e) {
117            throw new RuntimeException(e);
118        } catch (IllegalAccessException e) {
119            throw new RuntimeException(e);
120        } catch (InvocationTargetException e) {
121            throw new RuntimeException(e);
122        } finally {
123            try {
124                if (in != null) {
125                    in.close();
126                }
127            } catch (IOException ex) {
128                // ignore close exception so we cannot use a try-with-resources squid:S2093
129            }
130        }
131    }
132
133}