001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools; 020 021import java.nio.file.Paths; 022import java.util.Arrays; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Objects; 026 027import org.apache.commons.cli.CommandLine; 028import org.apache.commons.cli.CommandLineParser; 029import org.apache.commons.cli.DefaultParser; 030import org.apache.commons.cli.Option; 031import org.apache.commons.cli.Options; 032import org.apache.commons.cli.ParseException; 033import org.nuxeo.lib.stream.log.LogManager; 034import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager; 035import org.nuxeo.lib.stream.log.kafka.KafkaLogManager; 036import org.nuxeo.lib.stream.tools.command.Command; 037import org.nuxeo.lib.stream.tools.command.HelpCommand; 038 039/** 040 * @since 9.3 041 */ 042public class Main { 043 044 protected static final String NUXEO_KAFKA_FILE_CONF = "nxserver/config/kafka-config.xml"; 045 046 protected static final String NUXEO_KAFKA_CONF = "default"; 047 048 protected final Map<String, Command> commandMap = new HashMap<>(); 049 050 protected final Options options = new Options(); 051 052 protected String command; 053 054 protected LogManager manager; 055 056 public static void main(final String[] args) { 057 boolean success = new Main().run(args); 058 if (!success) { 059 System.exit(-1); 060 } 061 } 062 063 public boolean run(String[] args) { 064 initDefaultOptions(); 065 if (args == null || args.length == 0 || args[0].trim().isEmpty()) { 066 command = "help"; 067 return runWithArgs(null); 068 } 069 command = Objects.requireNonNull(args)[0]; 070 return runWithArgs(Arrays.copyOfRange(args, 1, args.length)); 071 } 072 073 protected boolean runWithArgs(String[] args) { 074 try { 075 Command cmd = getCommand(); 076 cmd.updateOptions(options); 077 CommandLineParser parser = new DefaultParser(); 078 CommandLine cmdLine = parser.parse(options, args); 079 createManager(cmd, cmdLine); 080 return cmd.run(manager, cmdLine); 081 } catch (ParseException e) { 082 System.err.println("Parse error: " + e.getMessage() + ", try: help " + command); 083 } catch (IllegalArgumentException e) { 084 System.err.println(e.getMessage()); 085 } catch (InterruptedException e) { 086 Thread.currentThread().interrupt(); 087 System.err.println("Interrupted: " + e.getMessage()); 088 } 089 return false; 090 } 091 092 protected void createManager(Command cmd, CommandLine cmdLine) { 093 if (cmd instanceof HelpCommand) { 094 return; 095 } 096 if (cmdLine.hasOption("chronicle")) { 097 createChronicleManager(cmdLine.getOptionValue("chronicle")); 098 } else if (cmdLine.hasOption("kafka") || cmdLine.hasOption("k")) { 099 String contribPath = cmdLine.getOptionValue("kafka", NUXEO_KAFKA_FILE_CONF); 100 createKafkaManager(contribPath, cmdLine.getOptionValue("kafka-config", NUXEO_KAFKA_CONF)); 101 } else { 102 throw new IllegalArgumentException( 103 "Missing required option: --chronicle or --kafka"); 104 } 105 } 106 107 protected void createKafkaManager(String kafkaContribution, String kafkaConfig) { 108 KafkaConfigParser config = new KafkaConfigParser(Paths.get(kafkaContribution), kafkaConfig); 109 manager = new KafkaLogManager(config.getZkServers(), config.getPrefix(), config.getProducerProperties(), 110 config.getConsumerProperties()); 111 } 112 113 protected void createChronicleManager(String basePath) { 114 manager = new ChronicleLogManager(Paths.get(basePath)); 115 } 116 117 protected Command getCommand() { 118 if (commandMap.isEmpty()) { 119 new CommandRegistry().commands().forEach(cmd -> commandMap.put(cmd.name(), cmd)); 120 } 121 if ("-h".equals(command) || "--help".equals(command)) { 122 command = "help"; 123 } else if (!commandMap.containsKey(command)) { 124 throw new IllegalArgumentException("Unknown command: " + command); 125 } 126 return commandMap.get(command); 127 } 128 129 protected void initDefaultOptions() { 130 options.addOption(Option.builder() 131 .longOpt("chronicle") 132 .desc("Base path of the Chronicle Queue LogManager") 133 .hasArg() 134 .argName("PATH") 135 .build()); 136 options.addOption(Option.builder() 137 .longOpt("kafka") 138 .desc("Nuxeo Kafka configuration contribution file: nxserver/config/kafka-config.xml") 139 .hasArg() 140 .argName("PATH") 141 .build()); 142 options.addOption(Option.builder() 143 .longOpt("kafka-config") 144 .desc("Config name in the Nuxeo Kafka configuration contribution") 145 .hasArg() 146 .argName("CONFIG") 147 .build()); 148 options.addOption("k", "nuxeo-kafka", false, "Use the default Nuxeo Kafka configuration"); 149 } 150 151}