001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019 020package org.nuxeo.ecm.platform.importer.mqueues.producer; 021 022import org.nuxeo.ecm.core.blob.BlobManager; 023import org.nuxeo.ecm.platform.importer.mqueues.message.DocumentMessage; 024 025import java.io.BufferedReader; 026import java.io.File; 027import java.io.FileNotFoundException; 028import java.io.FileReader; 029import java.io.IOException; 030import java.nio.file.Files; 031import java.nio.file.Path; 032import java.util.Collections; 033import java.util.List; 034import java.util.stream.Collectors; 035import java.util.stream.Stream; 036 037 038/** 039 * Provide a blob reference for a message. 040 * The blobs must be already imported in the binary store. 041 * 042 * @since 9.1 043 */ 044public class RandomBlobInfoProvider implements AutoCloseable { 045 private final Path basePath; 046 private final List<Path> fileList; 047 private File currentFile; 048 private int currentFileIndex; 049 private BufferedReader currentReader; 050 private FileReader currentFileReader; 051 052 /** 053 * Use the path of CSV files generated by the BlobMessageConsumer 054 * 055 */ 056 public RandomBlobInfoProvider(Path blobInfoDirectory, int seed) { 057 this.basePath = blobInfoDirectory; 058 this.currentFileIndex = seed; 059 this.fileList = listBlobInfoFiles(); 060 getNextBufferedReader(); 061 } 062 063 private List<Path> listBlobInfoFiles() { 064 final List<Path> ret; 065 try (Stream<Path> paths = Files.walk(basePath)) { 066 ret = paths.filter(path -> (Files.isRegularFile(path) && path.toString().endsWith("csv"))).collect(Collectors.toList()); 067 Collections.sort(ret, (p1, p2) -> p1.getFileName().compareTo(p2.getFileName())); 068 } catch (IOException e) { 069 throw new IllegalArgumentException("Invalid blobInfo directory: " + basePath, e); 070 } 071 if (ret.isEmpty()) { 072 throw new IllegalArgumentException("Invalid blobInfo directory no csv file found: " + basePath); 073 } 074 return ret; 075 } 076 077 public BlobManager.BlobInfo getBlobInfo(DocumentMessage.Builder builder) { 078 String line = getNextLine(); 079 String[] tokens = line.split(","); 080 if (tokens.length < 6) { 081 throw new IllegalArgumentException("Invalid csv file not enough field per line: " + currentFile + " " + line); 082 } 083 BlobManager.BlobInfo ret = new BlobManager.BlobInfo(); 084 ret.key = tokens[0].trim(); 085 ret.digest = tokens[1].trim(); 086 ret.length = Long.valueOf(tokens[2].trim()); 087 ret.filename = tokens[3].trim().replace("\"", ""); 088 ret.mimeType = tokens[4].trim(); 089 ret.encoding = tokens[5].trim(); 090 return ret; 091 } 092 093 private void getNextBufferedReader() { 094 currentFile = fileList.get(currentFileIndex % fileList.size()).toFile(); 095 currentFileIndex += 1; 096 try { 097 currentFileReader = new FileReader(currentFile); 098 currentReader = new BufferedReader(new FileReader(currentFile)); 099 // skip the header line 100 currentReader.readLine(); 101 } catch (FileNotFoundException e) { 102 throw new IllegalArgumentException("Invalid file: " + currentFile, e); 103 } catch (IOException e) { 104 throw new IllegalArgumentException("Can not read file: " + currentFile, e); 105 } 106 } 107 108 private String getNextLine() { 109 String ret; 110 try { 111 ret = currentReader.readLine(); 112 if (ret == null) { 113 currentReader.close(); 114 currentFileReader.close(); 115 getNextBufferedReader(); 116 return getNextLine(); 117 } 118 } catch (IOException e) { 119 throw new IllegalArgumentException("Can not read file: " + currentFile, e); 120 } 121 return ret; 122 } 123 124 @Override 125 public void close() throws Exception { 126 if (currentReader != null) { 127 currentReader.close(); 128 } 129 if (currentFileReader != null) { 130 currentFileReader.close(); 131 } 132 } 133}