001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.time.Duration;
022
023import org.apache.commons.cli.CommandLine;
024import org.apache.commons.cli.Option;
025import org.apache.commons.cli.Options;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.lib.stream.computation.Record;
029import org.nuxeo.lib.stream.log.LogAppender;
030import org.nuxeo.lib.stream.log.LogManager;
031import org.nuxeo.lib.stream.log.LogRecord;
032import org.nuxeo.lib.stream.log.LogTailer;
033
034/**
035 * Copy a Log to another
036 * @since 9.3
037 */
038public class CopyCommand extends Command {
039    private static final Log log = LogFactory.getLog(CopyCommand.class);
040
041    protected static final String NAME = "copy";
042
043    @Override
044    public String name() {
045        return NAME;
046    }
047
048    @Override
049    public void updateOptions(Options options) {
050        options.addOption(Option.builder()
051                                .longOpt("src")
052                                .desc("Source log name")
053                                .required()
054                                .hasArg()
055                                .argName("LOG_NAME")
056                                .build());
057        options.addOption(Option.builder()
058                                .longOpt("srcCodec")
059                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
060                                .hasArg()
061                                .argName("CODEC")
062                                .build());
063        options.addOption(Option.builder()
064                                .longOpt("dest")
065                                .desc("Target log name")
066                                .required()
067                                .hasArg()
068                                .argName("LOG_NAME")
069                                .build());
070        options.addOption(Option.builder()
071                                .longOpt("destCodec")
072                                .desc("Codec used to write record, can be: java, avro, avroBinary, avroJson")
073                                .hasArg()
074                                .argName("CODEC")
075                                .build());
076        options.addOption(Option.builder("g")
077                                .longOpt("group")
078                                .desc("Source consumer group to use")
079                                .hasArg()
080                                .argName("GROUP")
081                                .build());
082    }
083
084    @Override
085    public boolean run(LogManager manager, CommandLine cmd) {
086        String src = cmd.getOptionValue("src");
087        String dest = cmd.getOptionValue("dest");
088        String group = cmd.getOptionValue("group", "tools");
089        String srcCodec = cmd.getOptionValue("srcCodec");
090        String destCodec = cmd.getOptionValue("destCodec");
091        return copy(manager, src, srcCodec, dest, destCodec, group);
092    }
093
094    protected boolean copy(LogManager manager, String src, String srcCodec, String dest, String destCodec,
095            String group) {
096        log.info(String.format("# Copy %s to %s", src, dest));
097        if (!manager.exists(src)) {
098            log.error("source log not found: " + src);
099            return false;
100        }
101        if (manager.exists(dest)) {
102            log.error("destination log already exists: " + dest);
103            return false;
104        }
105        manager.createIfNotExists(dest, manager.size(src));
106        LogAppender<Record> appender = manager.getAppender(dest, getRecordCodec(destCodec));
107        try (LogTailer<Record> tailer = manager.createTailer(group, src, getRecordCodec(srcCodec))) {
108            while (true) {
109                LogRecord<Record> record = tailer.read(Duration.ofSeconds(5));
110                if (record == null) {
111                    break;
112                }
113                appender.append(record.message().getKey(), record.message());
114            }
115        } catch (InterruptedException e) {
116            Thread.currentThread().interrupt();
117            log.error("Interrupted");
118            return false;
119        }
120        return true;
121    }
122
123}