001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020package org.nuxeo.ecm.platform.importer.mqueues.pattern.producer;
021
022import org.nuxeo.ecm.core.blob.BlobInfo;
023import org.nuxeo.ecm.platform.importer.mqueues.pattern.message.DocumentMessage;
024
025import java.io.BufferedReader;
026import java.io.File;
027import java.io.FileNotFoundException;
028import java.io.FileReader;
029import java.io.IOException;
030import java.nio.file.Files;
031import java.nio.file.Path;
032import java.util.Collections;
033import java.util.Comparator;
034import java.util.List;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037
038
039/**
040 * Provide a blob reference for a message.
041 * The blobs must be already imported in the binary store.
042 *
043 * @since 9.1
044 */
045public class RandomBlobInfoProvider implements AutoCloseable {
046    private final Path basePath;
047    private final List<Path> fileList;
048    private File currentFile;
049    private int currentFileIndex;
050    private BufferedReader currentReader;
051    private FileReader currentFileReader;
052
053    /**
054     * Use the path of CSV files generated by the BlobMessageConsumer
055     *
056     */
057    public RandomBlobInfoProvider(Path blobInfoDirectory, int seed) {
058        this.basePath = blobInfoDirectory;
059        this.currentFileIndex = seed;
060        this.fileList = listBlobInfoFiles();
061        getNextBufferedReader();
062    }
063
064    private List<Path> listBlobInfoFiles() {
065        final List<Path> ret;
066        try (Stream<Path> paths = Files.walk(basePath)) {
067            ret = paths.filter(path -> (Files.isRegularFile(path) && path.toString().endsWith("csv"))).collect(Collectors.toList());
068            ret.sort(Comparator.comparing(Path::getFileName));
069        } catch (IOException e) {
070            throw new IllegalArgumentException("Invalid blobInfo directory: " + basePath, e);
071        }
072        if (ret.isEmpty()) {
073            throw new IllegalArgumentException("Invalid blobInfo directory no csv file found: " + basePath);
074        }
075        return ret;
076    }
077
078    public BlobInfo getBlobInfo(DocumentMessage.Builder builder) {
079        String line = getNextLine();
080        String[] tokens = line.split(",");
081        if (tokens.length < 6) {
082            throw new IllegalArgumentException("Invalid csv file not enough field per line: " + currentFile + " " + line);
083        }
084        BlobInfo ret = new BlobInfo();
085        ret.key = tokens[0].trim();
086        ret.digest = tokens[1].trim();
087        ret.length = Long.valueOf(tokens[2].trim());
088        ret.filename = tokens[3].trim().replace("\"", "");
089        ret.mimeType = tokens[4].trim();
090        ret.encoding = tokens[5].trim();
091        return ret;
092    }
093
094    private void getNextBufferedReader() {
095        currentFile = fileList.get(currentFileIndex % fileList.size()).toFile();
096        currentFileIndex += 1;
097        try {
098            currentFileReader = new FileReader(currentFile);
099            currentReader = new BufferedReader(new FileReader(currentFile));
100            // skip the header line
101            currentReader.readLine();
102        } catch (FileNotFoundException e) {
103            throw new IllegalArgumentException("Invalid file: " + currentFile, e);
104        } catch (IOException e) {
105            throw new IllegalArgumentException("Can not read file: " + currentFile, e);
106        }
107    }
108
109    private String getNextLine() {
110        String ret;
111        try {
112            ret = currentReader.readLine();
113            if (ret == null) {
114                currentReader.close();
115                currentFileReader.close();
116                getNextBufferedReader();
117                return getNextLine();
118            }
119        } catch (IOException e) {
120            throw new IllegalArgumentException("Can not read file: " + currentFile, e);
121        }
122        return ret;
123    }
124
125    @Override
126    public void close() throws Exception {
127        if (currentReader != null) {
128            currentReader.close();
129        }
130        if (currentFileReader != null) {
131            currentFileReader.close();
132        }
133    }
134}