001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.consumer; 020 021import org.nuxeo.ecm.core.api.Blob; 022import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 023import org.nuxeo.ecm.core.api.impl.blob.StringBlob; 024import org.nuxeo.ecm.core.blob.BlobManager; 025import org.nuxeo.ecm.core.blob.BlobProvider; 026import org.nuxeo.ecm.platform.importer.mqueues.message.BlobMessage; 027import org.nuxeo.runtime.api.Framework; 028 029import java.io.BufferedWriter; 030import java.io.File; 031import java.io.FileWriter; 032import java.io.IOException; 033import java.io.PrintWriter; 034import java.nio.file.Path; 035import java.nio.file.Paths; 036 037/** 038 * @since 9.1 039 */ 040public class BlobMessageConsumer extends AbstractConsumer<BlobMessage> { 041 private static final java.lang.String DEFAULT_ENCODING = "UTF-8"; 042 private static final String HEADER = "key, digest, length, filename, mimetype, encoding\n"; 043 private final BlobProvider blobProvider; 044 private final PrintWriter outputWriter; 045 private final FileWriter outputFileWriter; 046 private final String blobProviderName; 047 048 public BlobMessageConsumer(int consumerId, String blobProviderName, Path outputBlobInfoDirectory) { 049 super(consumerId); 050 this.blobProviderName = blobProviderName; 051 this.blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderName); 052 if (blobProvider == null) { 053 throw new IllegalArgumentException("Invalid blob provider: " + blobProviderName); 054 } 055 Path outputFile = Paths.get(outputBlobInfoDirectory.toString(), "bi-" + consumerId + ".csv"); 056 try { 057 outputBlobInfoDirectory.toFile().mkdirs(); 058 outputFileWriter = new FileWriter(outputFile.toFile(), true); 059 BufferedWriter bw = new BufferedWriter(outputFileWriter); 060 outputWriter = new PrintWriter(bw); 061 outputWriter.write(HEADER); 062 } catch (IOException e) { 063 throw new IllegalArgumentException("Invalid output path: " + outputFile, e); 064 } 065 } 066 067 @Override 068 public void close() throws Exception { 069 outputWriter.close(); 070 outputFileWriter.close(); 071 } 072 073 @Override 074 public void begin() { 075 076 } 077 078 @Override 079 public void accept(BlobMessage message) { 080 try { 081 Blob blob; 082 if (message.getPath() != null) { 083 blob = new FileBlob(new File(message.getPath())); 084 } else { 085 // we don't submit filename or encoding this is not saved in the binary store but in the document 086 blob = new StringBlob(message.getContent(), null, null, null); 087 } 088 BlobManager.BlobInfo bi = new BlobManager.BlobInfo(); 089 bi.digest = blobProvider.writeBlob(blob, null); 090 bi.key = blobProviderName + ":" + bi.digest; 091 bi.length = blob.getLength(); 092 bi.filename = message.getFilename(); 093 bi.mimeType = message.getMimetype(); 094 bi.encoding = message.getEncoding(); 095 saveBlobInfo(bi); 096 } catch (IOException e) { 097 throw new IllegalArgumentException("Invalid blob: " + message, e); 098 } 099 } 100 101 private void saveBlobInfo(BlobManager.BlobInfo bi) { 102 outputWriter.write(String.format("%s, %s, %d, \"%s\", %s, %s\n", 103 bi.key, bi.digest, bi.length, sanitize(bi.filename), sanitize(bi.mimeType), sanitize(bi.encoding))); 104 } 105 106 private String sanitize(String str) { 107 if (str == null || str.trim().isEmpty()) { 108 return ""; 109 } 110 return str; 111 } 112 113 @Override 114 public void commit() { 115 outputWriter.flush(); 116 } 117 118 @Override 119 public void rollback() { 120 121 } 122}