001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.nuxeo.ecm.core.api.Blob; 026import org.nuxeo.ecm.core.api.impl.blob.FileBlob; 027import org.nuxeo.ecm.core.api.impl.blob.StringBlob; 028import org.nuxeo.ecm.core.blob.BlobInfo; 029import org.nuxeo.ecm.core.blob.BlobManager; 030import org.nuxeo.ecm.core.blob.BlobProvider; 031import org.nuxeo.importer.stream.message.BlobMessage; 032import org.nuxeo.lib.stream.pattern.consumer.AbstractConsumer; 033import org.nuxeo.runtime.api.Framework; 034 035/** 036 * Import BlobMessage into a Nuxeo BlobProvider, persist BlobInformation. 037 * 038 * @since 9.1 039 */ 040public class BlobMessageConsumer extends AbstractConsumer<BlobMessage> { 041 protected static final AtomicInteger consumerCounter = new AtomicInteger(0); 042 043 protected final BlobProvider blobProvider; 044 045 protected final String blobProviderName; 046 047 protected final BlobInfoWriter blobInfoWriter; 048 049 public BlobMessageConsumer(String consumerId, String blobProviderName, BlobInfoWriter blobInfoWriter) { 050 super(consumerId); 051 this.blobProviderName = blobProviderName; 052 this.blobProvider = Framework.getService(BlobManager.class).getBlobProvider(blobProviderName); 053 if (blobProvider == null) { 054 throw new IllegalArgumentException("Invalid blob provider: " + blobProviderName); 055 } 056 this.blobInfoWriter = blobInfoWriter; 057 } 058 059 @Override 060 public void begin() { 061 062 } 063 064 @Override 065 public void accept(BlobMessage message) { 066 try { 067 Blob blob; 068 if (message.getPath() != null) { 069 blob = new FileBlob(new File(message.getPath())); 070 } else { 071 // we don't submit filename or encoding this is not saved in the binary store but in the document 072 blob = new StringBlob(message.getContent(), null, null, null); 073 } 074 String digest = blobProvider.writeBlob(blob); 075 long length = blob.getLength(); 076 saveBlobInfo(message, digest, length); 077 } catch (IOException e) { 078 throw new IllegalArgumentException("Invalid blob: " + message, e); 079 } 080 } 081 082 protected void saveBlobInfo(BlobMessage message, String digest, long length) throws IOException { 083 BlobInfo bi = new BlobInfo(); 084 bi.digest = digest; 085 bi.key = blobProviderName + ":" + bi.digest; 086 bi.length = length; 087 bi.filename = message.getFilename(); 088 bi.mimeType = message.getMimetype(); 089 bi.encoding = message.getEncoding(); 090 blobInfoWriter.save(null, bi); 091 } 092 093 @Override 094 public void commit() { 095 096 } 097 098 @Override 099 public void rollback() { 100 101 } 102}