001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.nio.file.Paths;
022import java.time.Duration;
023
024import org.apache.commons.cli.CommandLine;
025import org.apache.commons.cli.Option;
026import org.apache.commons.cli.Options;
027import org.nuxeo.lib.stream.computation.Record;
028import org.nuxeo.lib.stream.log.LogManager;
029import org.nuxeo.lib.stream.log.LogRecord;
030import org.nuxeo.lib.stream.log.LogTailer;
031import org.nuxeo.lib.stream.tools.renderer.Renderer;
032
033/**
034 * Output records to stdout.
035 *
036 * @since 9.3
037 */
038public class CatCommand extends Command {
039
040    protected static final String NUXEO_SCHEMA_STORE = "nxserver/data/avro";
041
042    protected static final String NAME = "cat";
043
044    @Override
045    public String name() {
046        return NAME;
047    }
048
049    @Override
050    public void updateOptions(Options options) {
051        options.addOption(
052                Option.builder("n").longOpt("lines").desc("Render the first N records").hasArg().argName("N").build());
053        options.addOption(Option.builder("l")
054                                .longOpt("log-name")
055                                .desc("Log name")
056                                .required()
057                                .hasArg()
058                                .argName("LOG_NAME")
059                                .build());
060        options.addOption(
061                Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
062        options.addOption(Option.builder()
063                                .longOpt("codec")
064                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
065                                .hasArg()
066                                .argName("CODEC")
067                                .build());
068        options.addOption(Option.builder()
069                                .longOpt("data-size")
070                                .desc("Maximum size of message data to render")
071                                .hasArg()
072                                .argName("L")
073                                .build());
074        options.addOption(
075                Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build());
076        options.addOption(Option.builder()
077                                .longOpt("schema-store")
078                                .desc("Set path of a FileAvroSchemaStore to load Avro schemas")
079                                .hasArg()
080                                .argName("SCHEMA_STORE_PATH")
081                                .build());
082    }
083
084    @Override
085    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
086        int limit = Integer.parseInt(cmd.getOptionValue("lines", "-1"));
087        int dataSize = Integer.parseInt(cmd.getOptionValue("data-size", "256"));
088        String name = cmd.getOptionValue("log-name");
089        String render = cmd.getOptionValue("render", "default");
090        String group = cmd.getOptionValue("group", "tools");
091        String codec = cmd.getOptionValue("codec");
092        String avroSchemaStorePath = cmd.getOptionValue("schema-store");
093        if (avroSchemaStorePath == null && Paths.get(NUXEO_SCHEMA_STORE).toFile().exists()) {
094            avroSchemaStorePath = NUXEO_SCHEMA_STORE;
095        }
096        cat(manager, name, group, limit, getRecordRenderer(render, avroSchemaStorePath, dataSize), codec);
097        return true;
098    }
099
100    protected void cat(LogManager manager, String name, String group, int limit, Renderer render, String codec)
101            throws InterruptedException {
102        render.header();
103        try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) {
104            int count = 0;
105            do {
106                LogRecord<Record> record = tailer.read(Duration.ofMillis(1000));
107                if (record == null) {
108                    break;
109                }
110                count++;
111                render.accept(record);
112            } while (limit < 0 || (count < limit));
113        }
114        render.footer();
115    }
116}