001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.producer; 020 021import java.io.File; 022import java.io.IOException; 023import java.nio.file.Files; 024import java.util.Iterator; 025import java.util.stream.Stream; 026 027import org.apache.commons.lang3.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.importer.stream.message.BlobMessage; 031import org.nuxeo.lib.stream.pattern.producer.AbstractProducer; 032 033/** 034 * Use a listing file to generate BlobMessage. 035 * 036 * @since 9.1 037 */ 038public class FileBlobMessageProducer extends AbstractProducer<BlobMessage> { 039 private static final Log log = LogFactory.getLog(FileBlobMessageProducer.class); 040 041 protected final File listFile; 042 043 protected final String basePath; 044 045 protected long count = 0; 046 047 protected final long nbBlobs; 048 049 protected Iterator<String> fileIterator; 050 051 protected Stream<String> lines; 052 053 public FileBlobMessageProducer(int producerId, File listFile, String basePath, long nbBlobs) { 054 super(producerId); 055 this.listFile = listFile; 056 this.nbBlobs = nbBlobs; 057 this.basePath = StringUtils.defaultString(basePath); 058 log.info("Producer using file list: " + listFile.getAbsolutePath()); 059 getFileIterator(); 060 061 } 062 063 protected void getFileIterator() { 064 try { 065 if (lines != null) { 066 lines.close(); 067 } 068 lines = Files.lines(listFile.toPath()); 069 fileIterator = lines.iterator(); 070 } catch (IOException e) { 071 String msg = "Failed to read file: " + listFile.getAbsolutePath(); 072 log.error(msg, e); 073 throw new IllegalArgumentException(e); 074 } 075 } 076 077 @Override 078 public int getPartition(BlobMessage message, int partitions) { 079 return ((int) count) % partitions; 080 } 081 082 @Override 083 public void close() throws Exception { 084 super.close(); 085 fileIterator = null; 086 if (lines != null) { 087 lines.close(); 088 } 089 lines = null; 090 } 091 092 @Override 093 public boolean hasNext() { 094 if (nbBlobs > 0 && count >= nbBlobs) { 095 return false; 096 } 097 if (!fileIterator.hasNext()) { 098 if (nbBlobs == 0) { 099 return false; 100 } 101 // loop until we get the nb blobs 102 getFileIterator(); 103 } 104 return true; 105 } 106 107 @Override 108 public BlobMessage next() { 109 String filePath = fileIterator.next(); 110 count += 1; 111 return new BlobMessage.FileMessageBuilder(new File(basePath, filePath).toString()).build(); 112 } 113}