001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019
020package org.nuxeo.ecm.platform.importer.mqueues.producer;
021
022import org.nuxeo.ecm.core.blob.BlobManager;
023import org.nuxeo.ecm.platform.importer.mqueues.message.DocumentMessage;
024
025import java.io.BufferedReader;
026import java.io.File;
027import java.io.FileNotFoundException;
028import java.io.FileReader;
029import java.io.IOException;
030import java.nio.file.Files;
031import java.nio.file.Path;
032import java.util.Collections;
033import java.util.List;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036
037
038/**
039 * Provide a blob reference for a message.
040 * The blobs must be already imported in the binary store.
041 *
042 * @since 9.1
043 */
044public class RandomBlobInfoProvider implements AutoCloseable {
045    private final Path basePath;
046    private final List<Path> fileList;
047    private File currentFile;
048    private int currentFileIndex;
049    private BufferedReader currentReader;
050    private FileReader currentFileReader;
051
052    /**
053     * Use the path of CSV files generated by the BlobMessageConsumer
054     *
055     */
056    public RandomBlobInfoProvider(Path blobInfoDirectory, int seed) {
057        this.basePath = blobInfoDirectory;
058        this.currentFileIndex = seed;
059        this.fileList = listBlobInfoFiles();
060        getNextBufferedReader();
061    }
062
063    private List<Path> listBlobInfoFiles() {
064        final List<Path> ret;
065        try (Stream<Path> paths = Files.walk(basePath)) {
066            ret = paths.filter(path -> (Files.isRegularFile(path) && path.toString().endsWith("csv"))).collect(Collectors.toList());
067            Collections.sort(ret, (p1, p2) -> p1.getFileName().compareTo(p2.getFileName()));
068        } catch (IOException e) {
069            throw new IllegalArgumentException("Invalid blobInfo directory: " + basePath, e);
070        }
071        if (ret.isEmpty()) {
072            throw new IllegalArgumentException("Invalid blobInfo directory no csv file found: " + basePath);
073        }
074        return ret;
075    }
076
077    public BlobManager.BlobInfo getBlobInfo(DocumentMessage.Builder builder) {
078        String line = getNextLine();
079        String[] tokens = line.split(",");
080        if (tokens.length < 6) {
081            throw new IllegalArgumentException("Invalid csv file not enough field per line: " + currentFile + " " + line);
082        }
083        BlobManager.BlobInfo ret = new BlobManager.BlobInfo();
084        ret.key = tokens[0].trim();
085        ret.digest = tokens[1].trim();
086        ret.length = Long.valueOf(tokens[2].trim());
087        ret.filename = tokens[3].trim().replace("\"", "");
088        ret.mimeType = tokens[4].trim();
089        ret.encoding = tokens[5].trim();
090        return ret;
091    }
092
093    private void getNextBufferedReader() {
094        currentFile = fileList.get(currentFileIndex % fileList.size()).toFile();
095        currentFileIndex += 1;
096        try {
097            currentFileReader = new FileReader(currentFile);
098            currentReader = new BufferedReader(new FileReader(currentFile));
099            // skip the header line
100            currentReader.readLine();
101        } catch (FileNotFoundException e) {
102            throw new IllegalArgumentException("Invalid file: " + currentFile, e);
103        } catch (IOException e) {
104            throw new IllegalArgumentException("Can not read file: " + currentFile, e);
105        }
106    }
107
108    private String getNextLine() {
109        String ret;
110        try {
111            ret = currentReader.readLine();
112            if (ret == null) {
113                currentReader.close();
114                currentFileReader.close();
115                getNextBufferedReader();
116                return getNextLine();
117            }
118        } catch (IOException e) {
119            throw new IllegalArgumentException("Can not read file: " + currentFile, e);
120        }
121        return ret;
122    }
123
124    @Override
125    public void close() throws Exception {
126        if (currentReader != null) {
127            currentReader.close();
128        }
129        if (currentFileReader != null) {
130            currentFileReader.close();
131        }
132    }
133}