001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.time.Duration;
022
023import org.apache.commons.cli.CommandLine;
024import org.apache.commons.cli.Option;
025import org.apache.commons.cli.Options;
026import org.nuxeo.lib.stream.computation.Record;
027import org.nuxeo.lib.stream.log.LogAppender;
028import org.nuxeo.lib.stream.log.LogManager;
029import org.nuxeo.lib.stream.log.LogRecord;
030import org.nuxeo.lib.stream.log.LogTailer;
031
032/**
033 * @since 9.3
034 */
035public class CopyCommand extends Command {
036
037    protected static final String NAME = "copy";
038
039    @Override
040    public String name() {
041        return NAME;
042    }
043
044    @Override
045    public void updateOptions(Options options) {
046        options.addOption(Option.builder()
047                                .longOpt("src")
048                                .desc("Source log name")
049                                .required()
050                                .hasArg()
051                                .argName("LOG_NAME")
052                                .build());
053        options.addOption(Option.builder()
054                                .longOpt("dest")
055                                .desc("Target log name")
056                                .required()
057                                .hasArg()
058                                .argName("LOG_NAME")
059                                .build());
060        options.addOption(Option.builder("g")
061                                .longOpt("group")
062                                .desc("Source consumer group to use")
063                                .hasArg()
064                                .argName("GROUP")
065                                .build());
066    }
067
068    @Override
069    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
070        String src = cmd.getOptionValue("src");
071        String dest = cmd.getOptionValue("dest");
072        String group = cmd.getOptionValue("group", "tools");
073        return copy(manager, src, dest, group);
074    }
075
076    protected boolean copy(LogManager manager, String src, String dest, String group) {
077        System.out.println(String.format("# Copy %s to %s", src, dest));
078        if (!manager.exists(src)) {
079            System.err.println("source log not found: " + src);
080            return false;
081        }
082        if (manager.exists(dest)) {
083            System.err.println("destination log already exists: " + dest);
084            return false;
085        }
086        manager.createIfNotExists(dest, manager.getAppender(src).size());
087        LogAppender<Record> appender = manager.getAppender(dest);
088        try (LogTailer<Record> tailer = manager.createTailer(group, src)) {
089            while (true) {
090                LogRecord<Record> record = tailer.read(Duration.ofSeconds(5));
091                if (record == null) {
092                    break;
093                }
094                appender.append(record.message().key, record.message());
095            }
096        } catch (InterruptedException e) {
097            Thread.currentThread().interrupt();
098            System.err.println("Interrupted");
099            return false;
100        }
101        return true;
102    }
103
104}