001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools;
020
021import java.nio.file.Paths;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Objects;
026
027import org.apache.commons.cli.CommandLine;
028import org.apache.commons.cli.CommandLineParser;
029import org.apache.commons.cli.DefaultParser;
030import org.apache.commons.cli.Option;
031import org.apache.commons.cli.Options;
032import org.apache.commons.cli.ParseException;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.lib.stream.log.LogManager;
036import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
037import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
038import org.nuxeo.lib.stream.tools.command.Command;
039import org.nuxeo.lib.stream.tools.command.HelpCommand;
040
041/**
042 * @since 9.3
043 */
044public class Main {
045    private static final Log log = LogFactory.getLog(Main.class);
046
047    protected static final String NUXEO_KAFKA_FILE_CONF = "nxserver/config/kafka-config.xml";
048
049    protected static final String NUXEO_KAFKA_CONF = "default";
050
051    protected static final String CHRONICLE_OPT = "chronicle";
052
053    protected static final String KAFKA_OPT = "kafka";
054
055    protected final Map<String, Command> commandMap = new HashMap<>();
056
057    protected final Options options = new Options();
058
059    protected String command;
060
061    protected LogManager manager;
062
063    public static void main(final String[] args) {
064        boolean success = new Main().run(args);
065        if (!success) {
066            System.exit(-1);
067        }
068    }
069
070    public boolean run(String[] args) {
071        initDefaultOptions();
072        if (args == null || args.length == 0 || args[0].trim().isEmpty()) {
073            command = "help";
074            return runWithArgs(null);
075        }
076        command = Objects.requireNonNull(args)[0];
077        return runWithArgs(Arrays.copyOfRange(args, 1, args.length));
078    }
079
080    protected boolean runWithArgs(String[] args) {
081        try {
082            Command cmd = getCommand();
083            cmd.updateOptions(options);
084            CommandLineParser parser = new DefaultParser();
085            CommandLine cmdLine = parser.parse(options, args);
086            createManager(cmd, cmdLine);
087            return cmd.run(manager, cmdLine);
088        } catch (ParseException e) {
089            log.error("Parse error: " + e.getMessage() + ", try: help " + command);
090        } catch (IllegalArgumentException e) {
091            log.error(e.getMessage(), e);
092        } catch (InterruptedException e) {
093            Thread.currentThread().interrupt();
094            log.error("Interrupted: " + e.getMessage());
095        }
096        return false;
097    }
098
099    protected void createManager(Command cmd, CommandLine cmdLine) {
100        if (cmd instanceof HelpCommand) {
101            return;
102        }
103        if (cmdLine.hasOption(CHRONICLE_OPT)) {
104            createChronicleManager(cmdLine.getOptionValue(CHRONICLE_OPT));
105        } else if (cmdLine.hasOption(KAFKA_OPT) || cmdLine.hasOption("k")) {
106            String contribPath = cmdLine.getOptionValue(KAFKA_OPT, NUXEO_KAFKA_FILE_CONF);
107            createKafkaManager(contribPath, cmdLine.getOptionValue("kafka-config", NUXEO_KAFKA_CONF));
108        } else {
109            throw new IllegalArgumentException("Missing required option: --chronicle or --kafka");
110        }
111    }
112
113    protected void createKafkaManager(String kafkaContribution, String kafkaConfig) {
114        KafkaConfigParser config = new KafkaConfigParser(Paths.get(kafkaContribution), kafkaConfig);
115        manager = new KafkaLogManager(config.getPrefix(), config.getProducerProperties(),
116                config.getConsumerProperties());
117    }
118
119    protected void createChronicleManager(String basePath) {
120        manager = new ChronicleLogManager(Paths.get(basePath));
121    }
122
123    protected Command getCommand() {
124        if (commandMap.isEmpty()) {
125            new CommandRegistry().commands().forEach(cmd -> commandMap.put(cmd.name(), cmd));
126        }
127        if ("-h".equals(command) || "--help".equals(command)) {
128            command = "help";
129        } else if (!commandMap.containsKey(command)) {
130            throw new IllegalArgumentException("Unknown command: " + command);
131        }
132        return commandMap.get(command);
133    }
134
135    protected void initDefaultOptions() {
136        options.addOption(Option.builder()
137                                .longOpt(CHRONICLE_OPT)
138                                .desc("Base path of the Chronicle Queue LogManager")
139                                .hasArg()
140                                .argName("PATH")
141                                .build());
142        options.addOption(Option.builder()
143                                .longOpt(KAFKA_OPT)
144                                .desc("Nuxeo Kafka configuration contribution file: nxserver/config/kafka-config.xml")
145                                .hasArg()
146                                .argName("PATH")
147                                .build());
148        options.addOption(Option.builder()
149                                .longOpt("kafka-config")
150                                .desc("Config name in the Nuxeo Kafka configuration contribution")
151                                .hasArg()
152                                .argName("CONFIG")
153                                .build());
154        options.addOption("k", "nuxeo-kafka", false, "Use the default Nuxeo Kafka configuration");
155    }
156
157    public LogManager getLogManager(String[] args) {
158        run(args);
159        return manager;
160    }
161}