001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import static org.nuxeo.runtime.transaction.TransactionHelper.commitOrRollbackTransaction; 022 023import java.io.Serializable; 024import java.util.Map; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.ecm.core.api.Blob; 029import org.nuxeo.ecm.core.api.CloseableCoreSession; 030import org.nuxeo.ecm.core.api.CoreInstance; 031import org.nuxeo.ecm.core.api.CoreSession; 032import org.nuxeo.ecm.core.api.DocumentModel; 033import org.nuxeo.ecm.core.api.model.PropertyNotFoundException; 034import org.nuxeo.ecm.core.blob.BlobInfo; 035import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 036import org.nuxeo.importer.stream.message.DocumentMessage; 037import org.nuxeo.lib.stream.pattern.consumer.AbstractConsumer; 038import org.nuxeo.runtime.transaction.TransactionHelper; 039 040/** 041 * Consumes DocumentMessage and produce Nuxeo document. 042 * 043 * @since 9.1 044 */ 045public class DocumentMessageConsumer extends AbstractConsumer<DocumentMessage> { 046 private static final Log log = LogFactory.getLog(DocumentMessageConsumer.class); 047 048 protected final String rootPath; 049 050 protected final String repositoryName; 051 052 protected CoreSession session; 053 054 public DocumentMessageConsumer(String consumerId, String repositoryName, String rootPath) { 055 super(consumerId); 056 this.rootPath = rootPath; 057 this.repositoryName = repositoryName; 058 } 059 060 @Override 061 public void close() throws Exception { 062 super.close(); 063 if (session != null) { 064 ((CloseableCoreSession) session).close(); 065 TransactionHelper.commitOrRollbackTransaction(); 066 } 067 } 068 069 @Override 070 public void begin() { 071 TransactionHelper.startTransaction(); 072 if (session == null) { 073 this.session = CoreInstance.openCoreSessionSystem(repositoryName); 074 } 075 } 076 077 @Override 078 public void accept(DocumentMessage message) { 079 DocumentModel doc = session.createDocumentModel(rootPath + message.getParentPath(), message.getName(), 080 message.getType()); 081 doc.putContextData(CoreSession.SKIP_DESTINATION_CHECK_ON_CREATE, true); 082 Blob blob = getBlob(message); 083 if (blob != null) { 084 // doc.setProperty("file", "filename", blob.getFilename()); 085 doc.setProperty("file", "content", blob); 086 } 087 Map<String, Serializable> props = message.getProperties(); 088 if (props != null && !props.isEmpty()) { 089 setDocumentProperties(doc, props); 090 } 091 doc = session.createDocument(doc); 092 } 093 094 protected Blob getBlob(DocumentMessage message) { 095 Blob blob = null; 096 if (message.getBlob() != null) { 097 blob = message.getBlob(); 098 } else if (message.getBlobInfo() != null) { 099 BlobInfo blobInfo = message.getBlobInfo(); 100 blob = new SimpleManagedBlob(blobInfo); 101 } 102 return blob; 103 } 104 105 @Override 106 public void commit() { 107 log.debug("commit"); 108 session.save(); 109 // TODO: here if tx is in rollback we must throw something 110 commitOrRollbackTransaction(); 111 } 112 113 @Override 114 public void rollback() { 115 log.info("rollback"); 116 TransactionHelper.setTransactionRollbackOnly(); 117 TransactionHelper.commitOrRollbackTransaction(); 118 } 119 120 protected void setDocumentProperties(DocumentModel doc, Map<String, Serializable> properties) { 121 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 122 try { 123 doc.setPropertyValue(entry.getKey(), entry.getValue()); 124 } catch (PropertyNotFoundException e) { 125 String message = String.format("Property '%s' not found on document type: %s. Skipping it.", 126 entry.getKey(), doc.getType()); 127 log.error(message, e); 128 } 129 } 130 } 131 132}