001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.ecm.platform.importer.mqueues.pattern.producer; 021 022import org.nuxeo.ecm.core.blob.BlobInfo; 023import org.nuxeo.ecm.platform.importer.mqueues.pattern.message.DocumentMessage; 024 025import java.io.BufferedReader; 026import java.io.File; 027import java.io.FileNotFoundException; 028import java.io.FileReader; 029import java.io.IOException; 030import java.nio.file.Files; 031import java.nio.file.Path; 032import java.util.Collections; 033import java.util.Comparator; 034import java.util.List; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037 038 039/** 040 * Provide a blob reference for a message. 041 * The blobs must be already imported in the binary store. 042 * 043 * @since 9.1 044 */ 045public class RandomBlobInfoProvider implements AutoCloseable { 046 private final Path basePath; 047 private final List<Path> fileList; 048 private File currentFile; 049 private int currentFileIndex; 050 private BufferedReader currentReader; 051 private FileReader currentFileReader; 052 053 /** 054 * Use the path of CSV files generated by the BlobMessageConsumer 055 * 056 */ 057 public RandomBlobInfoProvider(Path blobInfoDirectory, int seed) { 058 this.basePath = blobInfoDirectory; 059 this.currentFileIndex = seed; 060 this.fileList = listBlobInfoFiles(); 061 getNextBufferedReader(); 062 } 063 064 private List<Path> listBlobInfoFiles() { 065 final List<Path> ret; 066 try (Stream<Path> paths = Files.walk(basePath)) { 067 ret = paths.filter(path -> (Files.isRegularFile(path) && path.toString().endsWith("csv"))).collect(Collectors.toList()); 068 ret.sort(Comparator.comparing(Path::getFileName)); 069 } catch (IOException e) { 070 throw new IllegalArgumentException("Invalid blobInfo directory: " + basePath, e); 071 } 072 if (ret.isEmpty()) { 073 throw new IllegalArgumentException("Invalid blobInfo directory no csv file found: " + basePath); 074 } 075 return ret; 076 } 077 078 public BlobInfo getBlobInfo(DocumentMessage.Builder builder) { 079 String line = getNextLine(); 080 String[] tokens = line.split(","); 081 if (tokens.length < 6) { 082 throw new IllegalArgumentException("Invalid csv file not enough field per line: " + currentFile + " " + line); 083 } 084 BlobInfo ret = new BlobInfo(); 085 ret.key = tokens[0].trim(); 086 ret.digest = tokens[1].trim(); 087 ret.length = Long.valueOf(tokens[2].trim()); 088 ret.filename = tokens[3].trim().replace("\"", ""); 089 ret.mimeType = tokens[4].trim(); 090 ret.encoding = tokens[5].trim(); 091 return ret; 092 } 093 094 private void getNextBufferedReader() { 095 currentFile = fileList.get(currentFileIndex % fileList.size()).toFile(); 096 currentFileIndex += 1; 097 try { 098 currentFileReader = new FileReader(currentFile); 099 currentReader = new BufferedReader(new FileReader(currentFile)); 100 // skip the header line 101 currentReader.readLine(); 102 } catch (FileNotFoundException e) { 103 throw new IllegalArgumentException("Invalid file: " + currentFile, e); 104 } catch (IOException e) { 105 throw new IllegalArgumentException("Can not read file: " + currentFile, e); 106 } 107 } 108 109 private String getNextLine() { 110 String ret; 111 try { 112 ret = currentReader.readLine(); 113 if (ret == null) { 114 currentReader.close(); 115 currentFileReader.close(); 116 getNextBufferedReader(); 117 return getNextLine(); 118 } 119 } catch (IOException e) { 120 throw new IllegalArgumentException("Can not read file: " + currentFile, e); 121 } 122 return ret; 123 } 124 125 @Override 126 public void close() throws Exception { 127 if (currentReader != null) { 128 currentReader.close(); 129 } 130 if (currentFileReader != null) { 131 currentFileReader.close(); 132 } 133 } 134}