001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.producer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.platform.importer.mqueues.pattern.message.BlobMessage; 024 025import java.io.File; 026import java.io.IOException; 027import java.nio.file.Files; 028import java.util.Iterator; 029 030/** 031 * Use a listing file to generate BlobMessage. 032 * 033 * @since 9.1 034 */ 035public class FileBlobMessageProducer extends AbstractProducer<BlobMessage> { 036 private static final Log log = LogFactory.getLog(FileBlobMessageProducer.class); 037 private final File listFile; 038 private int count = 0; 039 private Iterator<String> fileIterator; 040 041 public FileBlobMessageProducer(int producerId, File listFile) { 042 super(producerId); 043 this.listFile = listFile; 044 log.info("Producer using file list: " + listFile.getAbsolutePath()); 045 try { 046 fileIterator = Files.lines(listFile.toPath()).iterator(); 047 } catch (IOException e) { 048 String msg = "Failed to read file: " + listFile.getAbsolutePath(); 049 log.error(msg, e); 050 throw new IllegalArgumentException(e); 051 } 052 053 } 054 055 @Override 056 public int getPartition(BlobMessage message, int partitions) { 057 return count % partitions; 058 } 059 060 @Override 061 public void close() throws Exception { 062 super.close(); 063 fileIterator = null; 064 } 065 066 @Override 067 public boolean hasNext() { 068 return fileIterator.hasNext(); 069 } 070 071 @Override 072 public BlobMessage next() { 073 String filePath = fileIterator.next(); 074 count += 1; 075 // TODO guess mimetype, length ? 076 return new BlobMessage.FileMessageBuilder(filePath).build(); 077 } 078}