001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools; 020 021import java.nio.file.Paths; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Objects; 026 027import org.apache.commons.cli.CommandLine; 028import org.apache.commons.cli.CommandLineParser; 029import org.apache.commons.cli.DefaultParser; 030import org.apache.commons.cli.Option; 031import org.apache.commons.cli.Options; 032import org.apache.commons.cli.ParseException; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.lib.stream.log.LogManager; 036import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager; 037import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 038import org.nuxeo.lib.stream.tools.command.Command; 039import org.nuxeo.lib.stream.tools.command.HelpCommand; 040 041/** 042 * @since 9.3 043 */ 044public class Main { 045 private static final Log log = LogFactory.getLog(Main.class); 046 047 protected static final String NUXEO_KAFKA_FILE_CONF = "nxserver/config/kafka-config.xml"; 048 049 protected static final String NUXEO_KAFKA_CONF = "default"; 050 051 protected static final String CHRONICLE_OPT = "chronicle"; 052 053 protected static final String KAFKA_OPT = "kafka"; 054 055 protected final Map<String, Command> commandMap = new HashMap<>(); 056 057 protected final Options options = new Options(); 058 059 protected String command; 060 061 protected LogManager manager; 062 063 public static void main(final String[] args) { 064 boolean success = new Main().run(args); 065 if (!success) { 066 System.exit(-1); 067 } 068 } 069 070 public boolean run(String[] args) { 071 initDefaultOptions(); 072 if (args == null || args.length == 0 || args[0].trim().isEmpty()) { 073 command = "help"; 074 return runWithArgs(null); 075 } 076 command = Objects.requireNonNull(args)[0]; 077 return runWithArgs(Arrays.copyOfRange(args, 1, args.length)); 078 } 079 080 protected boolean runWithArgs(String[] args) { 081 try { 082 Command cmd = getCommand(); 083 cmd.updateOptions(options); 084 CommandLineParser parser = new DefaultParser(); 085 CommandLine cmdLine = parser.parse(options, args); 086 createManager(cmd, cmdLine); 087 return cmd.run(manager, cmdLine); 088 } catch (ParseException e) { 089 log.error("Parse error: " + e.getMessage() + ", try: help " + command); 090 } catch (IllegalArgumentException e) { 091 log.error(e.getMessage(), e); 092 } catch (InterruptedException e) { 093 Thread.currentThread().interrupt(); 094 log.error("Interrupted: " + e.getMessage()); 095 } 096 return false; 097 } 098 099 protected void createManager(Command cmd, CommandLine cmdLine) { 100 if (cmd instanceof HelpCommand) { 101 return; 102 } 103 if (cmdLine.hasOption(CHRONICLE_OPT)) { 104 createChronicleManager(cmdLine.getOptionValue(CHRONICLE_OPT)); 105 } else if (cmdLine.hasOption(KAFKA_OPT) || cmdLine.hasOption("k")) { 106 String contribPath = cmdLine.getOptionValue(KAFKA_OPT, NUXEO_KAFKA_FILE_CONF); 107 createKafkaManager(contribPath, cmdLine.getOptionValue("kafka-config", NUXEO_KAFKA_CONF)); 108 } else { 109 throw new IllegalArgumentException("Missing required option: --chronicle or --kafka"); 110 } 111 } 112 113 protected void createKafkaManager(String kafkaContribution, String kafkaConfig) { 114 KafkaConfigParser config = new KafkaConfigParser(Paths.get(kafkaContribution), kafkaConfig); 115 manager = new KafkaLogManager(config.getPrefix(), config.getProducerProperties(), 116 config.getConsumerProperties()); 117 } 118 119 protected void createChronicleManager(String basePath) { 120 manager = new ChronicleLogManager(Paths.get(basePath)); 121 } 122 123 protected Command getCommand() { 124 if (commandMap.isEmpty()) { 125 new CommandRegistry().commands().forEach(cmd -> commandMap.put(cmd.name(), cmd)); 126 } 127 if ("-h".equals(command) || "--help".equals(command)) { 128 command = "help"; 129 } else if (!commandMap.containsKey(command)) { 130 throw new IllegalArgumentException("Unknown command: " + command); 131 } 132 return commandMap.get(command); 133 } 134 135 protected void initDefaultOptions() { 136 options.addOption(Option.builder() 137 .longOpt(CHRONICLE_OPT) 138 .desc("Base path of the Chronicle Queue LogManager") 139 .hasArg() 140 .argName("PATH") 141 .build()); 142 options.addOption(Option.builder() 143 .longOpt(KAFKA_OPT) 144 .desc("Nuxeo Kafka configuration contribution file: nxserver/config/kafka-config.xml") 145 .hasArg() 146 .argName("PATH") 147 .build()); 148 options.addOption(Option.builder() 149 .longOpt("kafka-config") 150 .desc("Config name in the Nuxeo Kafka configuration contribution") 151 .hasArg() 152 .argName("CONFIG") 153 .build()); 154 options.addOption("k", "nuxeo-kafka", false, "Use the default Nuxeo Kafka configuration"); 155 } 156 157 public LogManager getLogManager(String[] args) { 158 run(args); 159 return manager; 160 } 161}