001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.importer.stream.producer;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.file.Files;
024import java.util.Iterator;
025import java.util.stream.Stream;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.importer.stream.message.BlobMessage;
030import org.nuxeo.lib.stream.pattern.producer.AbstractProducer;
031
032/**
033 * Use a listing file to generate BlobMessage.
034 *
035 * @since 9.1
036 */
037public class FileBlobMessageProducer extends AbstractProducer<BlobMessage> {
038    private static final Log log = LogFactory.getLog(FileBlobMessageProducer.class);
039
040    protected final File listFile;
041
042    protected int count = 0;
043
044    protected Stream<String> stream;
045
046    protected Iterator<String> fileIterator;
047
048    public FileBlobMessageProducer(int producerId, File listFile) {
049        super(producerId);
050        this.listFile = listFile;
051        log.info("Producer using file list: " + listFile.getAbsolutePath());
052        try {
053            stream = Files.lines(listFile.toPath());
054            fileIterator = stream.iterator();
055        } catch (IOException e) {
056            String msg = "Failed to read file: " + listFile.getAbsolutePath();
057            log.error(msg, e);
058            throw new IllegalArgumentException(e);
059        }
060
061    }
062
063    @Override
064    public int getPartition(BlobMessage message, int partitions) {
065        return count % partitions;
066    }
067
068    @Override
069    public void close() throws Exception {
070        super.close();
071        stream.close();
072        stream = null;
073        fileIterator = null;
074    }
075
076    @Override
077    public boolean hasNext() {
078        return fileIterator.hasNext();
079    }
080
081    @Override
082    public BlobMessage next() {
083        String filePath = fileIterator.next();
084        count += 1;
085        // TODO: guess mimetype, length ?
086        return new BlobMessage.FileMessageBuilder(filePath).build();
087    }
088}