001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.nio.file.Paths;
022import java.time.Duration;
023
024import org.apache.commons.cli.CommandLine;
025import org.apache.commons.cli.Option;
026import org.apache.commons.cli.Options;
027import org.nuxeo.lib.stream.computation.Record;
028import org.nuxeo.lib.stream.log.LogManager;
029import org.nuxeo.lib.stream.log.LogRecord;
030import org.nuxeo.lib.stream.log.LogTailer;
031import org.nuxeo.lib.stream.log.Name;
032import org.nuxeo.lib.stream.tools.renderer.Renderer;
033
034/**
035 * Output records to stdout.
036 *
037 * @since 9.3
038 */
039public class CatCommand extends Command {
040
041    protected static final String NUXEO_SCHEMA_STORE = "nxserver/data/avro";
042
043    protected static final String NAME = "cat";
044
045    @Override
046    public String name() {
047        return NAME;
048    }
049
050    @Override
051    public void updateOptions(Options options) {
052        options.addOption(
053                Option.builder("n").longOpt("lines").desc("Render the first N records").hasArg().argName("N").build());
054        options.addOption(Option.builder("l")
055                                .longOpt("log-name")
056                                .desc("Log name")
057                                .required()
058                                .hasArg()
059                                .argName("LOG_NAME")
060                                .build());
061        options.addOption(
062                Option.builder("g").longOpt("group").desc("Consumer group").hasArg().argName("GROUP").build());
063        options.addOption(Option.builder()
064                                .longOpt("codec")
065                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
066                                .hasArg()
067                                .argName("CODEC")
068                                .build());
069        options.addOption(Option.builder()
070                                .longOpt("data-size")
071                                .desc("Maximum size of message data to render")
072                                .hasArg()
073                                .argName("L")
074                                .build());
075        options.addOption(
076                Option.builder().longOpt("render").desc("Output rendering").hasArg().argName("FORMAT").build());
077        options.addOption(Option.builder()
078                                .longOpt("schema-store")
079                                .desc("Set path of a FileAvroSchemaStore to load Avro schemas")
080                                .hasArg()
081                                .argName("SCHEMA_STORE_PATH")
082                                .build());
083    }
084
085    @Override
086    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
087        int limit = Integer.parseInt(cmd.getOptionValue("lines", "-1"));
088        int dataSize = Integer.parseInt(cmd.getOptionValue("data-size", "256"));
089        Name name = Name.ofUrn(cmd.getOptionValue("log-name"));
090        String render = cmd.getOptionValue("render", "default");
091        Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools"));
092        String codec = cmd.getOptionValue("codec");
093        String avroSchemaStorePath = cmd.getOptionValue("schema-store");
094        if (avroSchemaStorePath == null && Paths.get(NUXEO_SCHEMA_STORE).toFile().exists()) {
095            avroSchemaStorePath = NUXEO_SCHEMA_STORE;
096        }
097        cat(manager, name, group, limit, getRecordRenderer(render, avroSchemaStorePath, dataSize), codec);
098        return true;
099    }
100
101    protected void cat(LogManager manager, Name name, Name group, int limit, Renderer render, String codec)
102            throws InterruptedException {
103        render.header();
104        try (LogTailer<Record> tailer = manager.createTailer(group, name, getRecordCodec(codec))) {
105            int count = 0;
106            do {
107                LogRecord<Record> record = tailer.read(Duration.ofMillis(1000));
108                if (record == null) {
109                    break;
110                }
111                count++;
112                render.accept(record);
113            } while (limit < 0 || (count < limit));
114        }
115        render.footer();
116    }
117}