001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.consumer;
020
021import org.nuxeo.ecm.core.api.Blob;
022import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
023import org.nuxeo.ecm.core.api.impl.blob.StringBlob;
024import org.nuxeo.ecm.core.blob.BlobManager;
025import org.nuxeo.ecm.core.blob.BlobProvider;
026import org.nuxeo.ecm.platform.importer.mqueues.message.BlobMessage;
027import org.nuxeo.runtime.api.Framework;
028
029import java.io.BufferedWriter;
030import java.io.File;
031import java.io.FileWriter;
032import java.io.IOException;
033import java.io.PrintWriter;
034import java.nio.file.Path;
035import java.nio.file.Paths;
036
037/**
038 * @since 9.1
039 */
040public class BlobMessageConsumer extends AbstractConsumer<BlobMessage> {
041    private static final java.lang.String DEFAULT_ENCODING = "UTF-8";
042    private static final String HEADER = "key, digest, length, filename, mimetype, encoding\n";
043    private final BlobProvider blobProvider;
044    private final PrintWriter outputWriter;
045    private final FileWriter outputFileWriter;
046    private final String blobProviderName;
047
048    public BlobMessageConsumer(int consumerId, String blobProviderName, Path outputBlobInfoDirectory) {
049        super(consumerId);
050        this.blobProviderName = blobProviderName;
051        this.blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderName);
052        if (blobProvider == null) {
053            throw new IllegalArgumentException("Invalid blob provider: " + blobProviderName);
054        }
055        Path outputFile = Paths.get(outputBlobInfoDirectory.toString(), "bi-" + consumerId + ".csv");
056        try {
057            outputBlobInfoDirectory.toFile().mkdirs();
058            outputFileWriter = new FileWriter(outputFile.toFile(), true);
059            BufferedWriter bw = new BufferedWriter(outputFileWriter);
060            outputWriter = new PrintWriter(bw);
061            outputWriter.write(HEADER);
062        } catch (IOException e) {
063            throw new IllegalArgumentException("Invalid output path: " + outputFile, e);
064        }
065    }
066
067    @Override
068    public void close() throws Exception {
069        outputWriter.close();
070        outputFileWriter.close();
071    }
072
073    @Override
074    public void begin() {
075
076    }
077
078    @Override
079    public void accept(BlobMessage message) {
080        try {
081            Blob blob;
082            if (message.getPath() != null) {
083                blob = new FileBlob(new File(message.getPath()));
084            } else {
085                // we don't submit filename or encoding this is not saved in the binary store but in the document
086                blob = new StringBlob(message.getContent(), null, null, null);
087            }
088            BlobManager.BlobInfo bi = new BlobManager.BlobInfo();
089            bi.digest = blobProvider.writeBlob(blob, null);
090            bi.key = blobProviderName + ":" + bi.digest;
091            bi.length = blob.getLength();
092            bi.filename = message.getFilename();
093            bi.mimeType = message.getMimetype();
094            bi.encoding = message.getEncoding();
095            saveBlobInfo(bi);
096        } catch (IOException e) {
097            throw new IllegalArgumentException("Invalid blob: " + message, e);
098        }
099    }
100
101    private void saveBlobInfo(BlobManager.BlobInfo bi) {
102        outputWriter.write(String.format("%s, %s, %d, \"%s\", %s, %s\n",
103                bi.key, bi.digest, bi.length, sanitize(bi.filename), sanitize(bi.mimeType), sanitize(bi.encoding)));
104    }
105
106    private String sanitize(String str) {
107        if (str == null || str.trim().isEmpty()) {
108            return "";
109        }
110        return str;
111    }
112
113    @Override
114    public void commit() {
115        outputWriter.flush();
116    }
117
118    @Override
119    public void rollback() {
120
121    }
122}