001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools;
020
021import java.nio.file.Paths;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Objects;
026
027import org.apache.commons.cli.CommandLine;
028import org.apache.commons.cli.CommandLineParser;
029import org.apache.commons.cli.DefaultParser;
030import org.apache.commons.cli.Option;
031import org.apache.commons.cli.Options;
032import org.apache.commons.cli.ParseException;
033import org.nuxeo.lib.stream.log.LogManager;
034import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
035import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
036import org.nuxeo.lib.stream.tools.command.Command;
037import org.nuxeo.lib.stream.tools.command.HelpCommand;
038
039/**
040 * @since 9.3
041 */
042public class Main {
043
044    protected static final String NUXEO_KAFKA_FILE_CONF = "nxserver/config/kafka-config.xml";
045
046    protected static final String NUXEO_KAFKA_CONF = "default";
047
048    protected final Map<String, Command> commandMap = new HashMap<>();
049
050    protected final Options options = new Options();
051
052    protected String command;
053
054    protected LogManager manager;
055
056    public static void main(final String[] args) {
057        boolean success = new Main().run(args);
058        if (!success) {
059            System.exit(-1);
060        }
061    }
062
063    public boolean run(String[] args) {
064        initDefaultOptions();
065        if (args == null || args.length == 0 || args[0].trim().isEmpty()) {
066            command = "help";
067            return runWithArgs(null);
068        }
069        command = Objects.requireNonNull(args)[0];
070        return runWithArgs(Arrays.copyOfRange(args, 1, args.length));
071    }
072
073    protected boolean runWithArgs(String[] args) {
074        try {
075            Command cmd = getCommand();
076            cmd.updateOptions(options);
077            CommandLineParser parser = new DefaultParser();
078            CommandLine cmdLine = parser.parse(options, args);
079            createManager(cmd, cmdLine);
080            return cmd.run(manager, cmdLine);
081        } catch (ParseException e) {
082            System.err.println("Parse error: " + e.getMessage() + ", try: help " + command);
083        } catch (IllegalArgumentException e) {
084            System.err.println(e.getMessage());
085        } catch (InterruptedException e) {
086            Thread.currentThread().interrupt();
087            System.err.println("Interrupted: " + e.getMessage());
088        }
089        return false;
090    }
091
092    protected void createManager(Command cmd, CommandLine cmdLine) {
093        if (cmd instanceof HelpCommand) {
094            return;
095        }
096        if (cmdLine.hasOption("chronicle")) {
097            createChronicleManager(cmdLine.getOptionValue("chronicle"));
098        } else if (cmdLine.hasOption("kafka") || cmdLine.hasOption("k")) {
099            String contribPath = cmdLine.getOptionValue("kafka", NUXEO_KAFKA_FILE_CONF);
100            createKafkaManager(contribPath, cmdLine.getOptionValue("kafka-config", NUXEO_KAFKA_CONF));
101        } else {
102            throw new IllegalArgumentException(
103                    "Missing required option: --chronicle or --kafka");
104        }
105    }
106
107    protected void createKafkaManager(String kafkaContribution, String kafkaConfig) {
108        KafkaConfigParser config = new KafkaConfigParser(Paths.get(kafkaContribution), kafkaConfig);
109        manager = new KafkaLogManager(config.getZkServers(), config.getPrefix(), config.getProducerProperties(),
110                config.getConsumerProperties());
111    }
112
113    protected void createChronicleManager(String basePath) {
114        manager = new ChronicleLogManager(Paths.get(basePath));
115    }
116
117    protected Command getCommand() {
118        if (commandMap.isEmpty()) {
119            new CommandRegistry().commands().forEach(cmd -> commandMap.put(cmd.name(), cmd));
120        }
121        if ("-h".equals(command) || "--help".equals(command)) {
122            command = "help";
123        } else if (!commandMap.containsKey(command)) {
124            throw new IllegalArgumentException("Unknown command: " + command);
125        }
126        return commandMap.get(command);
127    }
128
129    protected void initDefaultOptions() {
130        options.addOption(Option.builder()
131                                .longOpt("chronicle")
132                                .desc("Base path of the Chronicle Queue LogManager")
133                                .hasArg()
134                                .argName("PATH")
135                                .build());
136        options.addOption(Option.builder()
137                                .longOpt("kafka")
138                                .desc("Nuxeo Kafka configuration contribution file: nxserver/config/kafka-config.xml")
139                                .hasArg()
140                                .argName("PATH")
141                                .build());
142        options.addOption(Option.builder()
143                                .longOpt("kafka-config")
144                                .desc("Config name in the Nuxeo Kafka configuration contribution")
145                                .hasArg()
146                                .argName("CONFIG")
147                                .build());
148        options.addOption("k", "nuxeo-kafka", false, "Use the default Nuxeo Kafka configuration");
149    }
150
151}