001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.tools.command;
020
021import java.time.Duration;
022
023import org.apache.commons.cli.CommandLine;
024import org.apache.commons.cli.Option;
025import org.apache.commons.cli.Options;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.lib.stream.computation.Record;
029import org.nuxeo.lib.stream.log.LogAppender;
030import org.nuxeo.lib.stream.log.LogManager;
031import org.nuxeo.lib.stream.log.LogRecord;
032import org.nuxeo.lib.stream.log.LogTailer;
033import org.nuxeo.lib.stream.log.Name;
034
035/**
036 * Copy a Log to another
037 *
038 * @since 9.3
039 */
040public class CopyCommand extends Command {
041    private static final Log log = LogFactory.getLog(CopyCommand.class);
042
043    protected static final String NAME = "copy";
044
045    @Override
046    public String name() {
047        return NAME;
048    }
049
050    @Override
051    public void updateOptions(Options options) {
052        options.addOption(Option.builder()
053                                .longOpt("src")
054                                .desc("Source log name")
055                                .required()
056                                .hasArg()
057                                .argName("LOG_NAME")
058                                .build());
059        options.addOption(Option.builder()
060                                .longOpt("srcCodec")
061                                .desc("Codec used to read record, can be: java, avro, avroBinary, avroJson")
062                                .hasArg()
063                                .argName("CODEC")
064                                .build());
065        options.addOption(Option.builder()
066                                .longOpt("dest")
067                                .desc("Target log name")
068                                .required()
069                                .hasArg()
070                                .argName("LOG_NAME")
071                                .build());
072        options.addOption(Option.builder()
073                                .longOpt("destCodec")
074                                .desc("Codec used to write record, can be: java, avro, avroBinary, avroJson")
075                                .hasArg()
076                                .argName("CODEC")
077                                .build());
078        options.addOption(Option.builder("g")
079                                .longOpt("group")
080                                .desc("Source consumer group to use")
081                                .hasArg()
082                                .argName("GROUP")
083                                .build());
084    }
085
086    @Override
087    public boolean run(LogManager manager, CommandLine cmd) {
088        Name src = Name.ofUrn(cmd.getOptionValue("src"));
089        Name dest = Name.ofUrn(cmd.getOptionValue("dest"));
090        Name group = Name.ofUrn(cmd.getOptionValue("group", "admin/tools"));
091        String srcCodec = cmd.getOptionValue("srcCodec");
092        String destCodec = cmd.getOptionValue("destCodec");
093        return copy(manager, src, srcCodec, dest, destCodec, group);
094    }
095
096    protected boolean copy(LogManager manager, Name src, String srcCodec, Name dest, String destCodec, Name group) {
097        log.info(String.format("# Copy %s to %s", src, dest));
098        if (!manager.exists(src)) {
099            log.error("source log not found: " + src);
100            return false;
101        }
102        if (manager.exists(dest)) {
103            log.error("destination log already exists: " + dest);
104            return false;
105        }
106        manager.createIfNotExists(dest, manager.size(src));
107        LogAppender<Record> appender = manager.getAppender(dest, getRecordCodec(destCodec));
108        try (LogTailer<Record> tailer = manager.createTailer(group, src, getRecordCodec(srcCodec))) {
109            while (true) {
110                LogRecord<Record> record = tailer.read(Duration.ofSeconds(5));
111                if (record == null) {
112                    break;
113                }
114                appender.append(record.message().getKey(), record.message());
115            }
116        } catch (InterruptedException e) {
117            Thread.currentThread().interrupt();
118            log.error("Interrupted");
119            return false;
120        }
121        return true;
122    }
123
124}