001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.importer.stream.producer;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.file.Files;
024import java.util.Iterator;
025import java.util.stream.Stream;
026
027import org.apache.commons.lang3.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.importer.stream.message.BlobMessage;
031import org.nuxeo.lib.stream.pattern.producer.AbstractProducer;
032
033/**
034 * Use a listing file to generate BlobMessage.
035 *
036 * @since 9.1
037 */
038public class FileBlobMessageProducer extends AbstractProducer<BlobMessage> {
039    private static final Log log = LogFactory.getLog(FileBlobMessageProducer.class);
040
041    protected final File listFile;
042
043    protected final String basePath;
044
045    protected long count = 0;
046
047    protected final long nbBlobs;
048
049    protected Iterator<String> fileIterator;
050
051    protected Stream<String> lines;
052
053    public FileBlobMessageProducer(int producerId, File listFile, String basePath, long nbBlobs) {
054        super(producerId);
055        this.listFile = listFile;
056        this.nbBlobs = nbBlobs;
057        this.basePath = StringUtils.defaultString(basePath);
058        log.info("Producer using file list: " + listFile.getAbsolutePath());
059        getFileIterator();
060
061    }
062
063    protected void getFileIterator() {
064        try {
065            if (lines != null) {
066                lines.close();
067            }
068            lines = Files.lines(listFile.toPath());
069            fileIterator = lines.iterator();
070        } catch (IOException e) {
071            String msg = "Failed to read file: " + listFile.getAbsolutePath();
072            log.error(msg, e);
073            throw new IllegalArgumentException(e);
074        }
075    }
076
077    @Override
078    public int getPartition(BlobMessage message, int partitions) {
079        return ((int) count) % partitions;
080    }
081
082    @Override
083    public void close() throws Exception {
084        super.close();
085        fileIterator = null;
086        if (lines != null) {
087            lines.close();
088        }
089        lines = null;
090    }
091
092    @Override
093    public boolean hasNext() {
094        if (nbBlobs > 0 && count >= nbBlobs) {
095            return false;
096        }
097        if (!fileIterator.hasNext()) {
098            if (nbBlobs == 0) {
099                return false;
100            }
101            // loop until we get the nb blobs
102            getFileIterator();
103        }
104        return true;
105    }
106
107    @Override
108    public BlobMessage next() {
109        String filePath = fileIterator.next();
110        count += 1;
111        return new BlobMessage.FileMessageBuilder(new File(basePath, filePath).toString()).build();
112    }
113}