001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import static org.nuxeo.runtime.transaction.TransactionHelper.commitOrRollbackTransaction; 022 023import java.io.Serializable; 024import java.util.Map; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.ecm.core.api.Blob; 029import org.nuxeo.ecm.core.api.CoreInstance; 030import org.nuxeo.ecm.core.api.CoreSession; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.api.model.PropertyNotFoundException; 033import org.nuxeo.ecm.core.blob.BlobInfo; 034import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 035import org.nuxeo.importer.stream.message.DocumentMessage; 036import org.nuxeo.lib.stream.pattern.consumer.AbstractConsumer; 037import org.nuxeo.runtime.transaction.TransactionHelper; 038 039/** 040 * Consumes DocumentMessage and produce Nuxeo document. 041 * 042 * @since 9.1 043 */ 044public class DocumentMessageConsumer extends AbstractConsumer<DocumentMessage> { 045 private static final Log log = LogFactory.getLog(DocumentMessageConsumer.class); 046 047 protected final String rootPath; 048 049 protected final String repositoryName; 050 051 protected CoreSession session; 052 053 public DocumentMessageConsumer(String consumerId, String repositoryName, String rootPath) { 054 super(consumerId); 055 this.rootPath = rootPath; 056 this.repositoryName = repositoryName; 057 } 058 059 @Override 060 public void close() throws Exception { 061 super.close(); 062 if (session != null) { 063 session.close(); 064 TransactionHelper.commitOrRollbackTransaction(); 065 } 066 } 067 068 @Override 069 public void begin() { 070 TransactionHelper.startTransaction(); 071 if (session == null) { 072 this.session = CoreInstance.openCoreSessionSystem(repositoryName); 073 } 074 } 075 076 @Override 077 public void accept(DocumentMessage message) { 078 DocumentModel doc = session.createDocumentModel(rootPath + message.getParentPath(), message.getName(), 079 message.getType()); 080 doc.putContextData(CoreSession.SKIP_DESTINATION_CHECK_ON_CREATE, true); 081 Blob blob = getBlob(message); 082 if (blob != null) { 083 // doc.setProperty("file", "filename", blob.getFilename()); 084 doc.setProperty("file", "content", blob); 085 } 086 Map<String, Serializable> props = message.getProperties(); 087 if (props != null && !props.isEmpty()) { 088 setDocumentProperties(doc, props); 089 } 090 doc = session.createDocument(doc); 091 } 092 093 protected Blob getBlob(DocumentMessage message) { 094 Blob blob = null; 095 if (message.getBlob() != null) { 096 blob = message.getBlob(); 097 } else if (message.getBlobInfo() != null) { 098 BlobInfo blobInfo = message.getBlobInfo(); 099 blob = new SimpleManagedBlob(blobInfo); 100 } 101 return blob; 102 } 103 104 @Override 105 public void commit() { 106 log.debug("commit"); 107 session.save(); 108 // TODO: here if tx is in rollback we must throw something 109 commitOrRollbackTransaction(); 110 } 111 112 @Override 113 public void rollback() { 114 log.info("rollback"); 115 TransactionHelper.setTransactionRollbackOnly(); 116 TransactionHelper.commitOrRollbackTransaction(); 117 } 118 119 protected void setDocumentProperties(DocumentModel doc, Map<String, Serializable> properties) { 120 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 121 try { 122 doc.setPropertyValue(entry.getKey(), entry.getValue()); 123 } catch (PropertyNotFoundException e) { 124 String message = String.format("Property '%s' not found on document type: %s. Skipping it.", 125 entry.getKey(), doc.getType()); 126 log.error(message, e); 127 } 128 } 129 } 130 131}