001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer; 020 021import org.nuxeo.ecm.core.api.Blob; 022import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 023import org.nuxeo.ecm.core.api.impl.blob.StringBlob; 024import org.nuxeo.ecm.core.blob.BlobInfo; 025import org.nuxeo.ecm.core.blob.BlobManager; 026import org.nuxeo.ecm.core.blob.BlobProvider; 027import org.nuxeo.ecm.platform.importer.mqueues.pattern.message.BlobMessage; 028import org.nuxeo.runtime.api.Framework; 029 030import java.io.BufferedWriter; 031import java.io.File; 032import java.io.FileWriter; 033import java.io.IOException; 034import java.io.PrintWriter; 035import java.nio.file.Path; 036import java.nio.file.Paths; 037import java.util.concurrent.atomic.AtomicInteger; 038 039/** 040 * @since 9.1 041 */ 042public class BlobMessageConsumer extends AbstractConsumer<BlobMessage> { 043 private static final AtomicInteger consumerCounter = new AtomicInteger(0); 044 private final Integer id = consumerCounter.getAndIncrement(); 045 private static final java.lang.String DEFAULT_ENCODING = "UTF-8"; 046 private static final String HEADER = "key, digest, length, filename, mimetype, encoding\n"; 047 private final BlobProvider blobProvider; 048 private final PrintWriter outputWriter; 049 private final FileWriter outputFileWriter; 050 private final String blobProviderName; 051 052 public BlobMessageConsumer(String consumerId, String blobProviderName, Path outputBlobInfoDirectory) { 053 super(consumerId); 054 this.blobProviderName = blobProviderName; 055 this.blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderName); 056 if (blobProvider == null) { 057 throw new IllegalArgumentException("Invalid blob provider: " + blobProviderName); 058 } 059 Path outputFile = Paths.get(outputBlobInfoDirectory.toString(), 060 String.format("bi-%02d.csv", id)); 061 try { 062 outputBlobInfoDirectory.toFile().mkdirs(); 063 outputFileWriter = new FileWriter(outputFile.toFile(), true); 064 BufferedWriter bw = new BufferedWriter(outputFileWriter); 065 outputWriter = new PrintWriter(bw); 066 outputWriter.write(HEADER); 067 } catch (IOException e) { 068 throw new IllegalArgumentException("Invalid output path: " + outputFile, e); 069 } 070 } 071 072 @Override 073 public void close() throws Exception { 074 outputWriter.close(); 075 outputFileWriter.close(); 076 } 077 078 @Override 079 public void begin() { 080 081 } 082 083 @Override 084 public void accept(BlobMessage message) { 085 try { 086 Blob blob; 087 if (message.getPath() != null) { 088 blob = new FileBlob(new File(message.getPath())); 089 } else { 090 // we don't submit filename or encoding this is not saved in the binary store but in the document 091 blob = new StringBlob(message.getContent(), null, null, null); 092 } 093 BlobInfo bi = new BlobInfo(); 094 bi.digest = blobProvider.writeBlob(blob); 095 bi.key = blobProviderName + ":" + bi.digest; 096 bi.length = blob.getLength(); 097 bi.filename = message.getFilename(); 098 bi.mimeType = message.getMimetype(); 099 bi.encoding = message.getEncoding(); 100 saveBlobInfo(bi); 101 } catch (IOException e) { 102 throw new IllegalArgumentException("Invalid blob: " + message, e); 103 } 104 } 105 106 private void saveBlobInfo(BlobInfo bi) { 107 outputWriter.write(String.format("%s, %s, %d, \"%s\", %s, %s\n", 108 bi.key, bi.digest, bi.length, sanitize(bi.filename), sanitize(bi.mimeType), sanitize(bi.encoding))); 109 } 110 111 private String sanitize(String str) { 112 if (str == null || str.trim().isEmpty()) { 113 return ""; 114 } 115 return str; 116 } 117 118 @Override 119 public void commit() { 120 outputWriter.flush(); 121 } 122 123 @Override 124 public void rollback() { 125 126 } 127}