001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer;
020
021import org.nuxeo.ecm.core.api.Blob;
022import org.nuxeo.ecm.core.api.impl.blob.FileBlob;
023import org.nuxeo.ecm.core.api.impl.blob.StringBlob;
024import org.nuxeo.ecm.core.blob.BlobInfo;
025import org.nuxeo.ecm.core.blob.BlobManager;
026import org.nuxeo.ecm.core.blob.BlobProvider;
027import org.nuxeo.ecm.platform.importer.mqueues.pattern.message.BlobMessage;
028import org.nuxeo.runtime.api.Framework;
029
030import java.io.BufferedWriter;
031import java.io.File;
032import java.io.FileWriter;
033import java.io.IOException;
034import java.io.PrintWriter;
035import java.nio.file.Path;
036import java.nio.file.Paths;
037import java.util.concurrent.atomic.AtomicInteger;
038
039/**
040 * @since 9.1
041 */
042public class BlobMessageConsumer extends AbstractConsumer<BlobMessage> {
043    private static final AtomicInteger consumerCounter = new AtomicInteger(0);
044    private final Integer id = consumerCounter.getAndIncrement();
045    private static final java.lang.String DEFAULT_ENCODING = "UTF-8";
046    private static final String HEADER = "key, digest, length, filename, mimetype, encoding\n";
047    private final BlobProvider blobProvider;
048    private final PrintWriter outputWriter;
049    private final FileWriter outputFileWriter;
050    private final String blobProviderName;
051
052    public BlobMessageConsumer(String consumerId, String blobProviderName, Path outputBlobInfoDirectory) {
053        super(consumerId);
054        this.blobProviderName = blobProviderName;
055        this.blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderName);
056        if (blobProvider == null) {
057            throw new IllegalArgumentException("Invalid blob provider: " + blobProviderName);
058        }
059        Path outputFile = Paths.get(outputBlobInfoDirectory.toString(),
060                String.format("bi-%02d.csv", id));
061        try {
062            outputBlobInfoDirectory.toFile().mkdirs();
063            outputFileWriter = new FileWriter(outputFile.toFile(), true);
064            BufferedWriter bw = new BufferedWriter(outputFileWriter);
065            outputWriter = new PrintWriter(bw);
066            outputWriter.write(HEADER);
067        } catch (IOException e) {
068            throw new IllegalArgumentException("Invalid output path: " + outputFile, e);
069        }
070    }
071
072    @Override
073    public void close() throws Exception {
074        outputWriter.close();
075        outputFileWriter.close();
076    }
077
078    @Override
079    public void begin() {
080
081    }
082
083    @Override
084    public void accept(BlobMessage message) {
085        try {
086            Blob blob;
087            if (message.getPath() != null) {
088                blob = new FileBlob(new File(message.getPath()));
089            } else {
090                // we don't submit filename or encoding this is not saved in the binary store but in the document
091                blob = new StringBlob(message.getContent(), null, null, null);
092            }
093            BlobInfo bi = new BlobInfo();
094            bi.digest = blobProvider.writeBlob(blob);
095            bi.key = blobProviderName + ":" + bi.digest;
096            bi.length = blob.getLength();
097            bi.filename = message.getFilename();
098            bi.mimeType = message.getMimetype();
099            bi.encoding = message.getEncoding();
100            saveBlobInfo(bi);
101        } catch (IOException e) {
102            throw new IllegalArgumentException("Invalid blob: " + message, e);
103        }
104    }
105
106    private void saveBlobInfo(BlobInfo bi) {
107        outputWriter.write(String.format("%s, %s, %d, \"%s\", %s, %s\n",
108                bi.key, bi.digest, bi.length, sanitize(bi.filename), sanitize(bi.mimeType), sanitize(bi.encoding)));
109    }
110
111    private String sanitize(String str) {
112        if (str == null || str.trim().isEmpty()) {
113            return "";
114        }
115        return str;
116    }
117
118    @Override
119    public void commit() {
120        outputWriter.flush();
121    }
122
123    @Override
124    public void rollback() {
125
126    }
127}