001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import static org.nuxeo.runtime.transaction.TransactionHelper.commitOrRollbackTransaction; 022 023import java.io.Serializable; 024import java.util.Map; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.ecm.core.api.Blob; 029import org.nuxeo.ecm.core.api.CoreInstance; 030import org.nuxeo.ecm.core.api.CoreSession; 031import org.nuxeo.ecm.core.api.DocumentModel; 032import org.nuxeo.ecm.core.api.model.PropertyNotFoundException; 033import org.nuxeo.ecm.core.blob.BlobInfo; 034import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 035import org.nuxeo.importer.stream.message.DocumentMessage; 036import org.nuxeo.lib.stream.pattern.consumer.AbstractConsumer; 037import org.nuxeo.runtime.transaction.TransactionHelper; 038 039/** 040 * Consumes DocumentMessage and produce Nuxeo document. 041 * 042 * @since 9.1 043 */ 044public class DocumentMessageConsumer extends AbstractConsumer<DocumentMessage> { 045 private static final Log log = LogFactory.getLog(DocumentMessageConsumer.class); 046 047 protected final String rootPath; 048 049 protected final String repositoryName; 050 051 protected CoreSession session; 052 053 public DocumentMessageConsumer(String consumerId, String repositoryName, String rootPath) { 054 super(consumerId); 055 this.rootPath = rootPath; 056 this.repositoryName = repositoryName; 057 } 058 059 @Override 060 public void close() throws Exception { 061 super.close(); 062 TransactionHelper.commitOrRollbackTransaction(); 063 } 064 065 @Override 066 public void begin() { 067 TransactionHelper.startTransaction(); 068 if (session == null) { 069 this.session = CoreInstance.getCoreSessionSystem(repositoryName); 070 } 071 } 072 073 @Override 074 public void accept(DocumentMessage message) { 075 DocumentModel doc = session.createDocumentModel(rootPath + message.getParentPath(), message.getName(), 076 message.getType()); 077 doc.putContextData(CoreSession.SKIP_DESTINATION_CHECK_ON_CREATE, true); 078 Blob blob = getBlob(message); 079 if (blob != null) { 080 doc.setProperty("file", "content", blob); 081 } 082 Map<String, Serializable> props = message.getProperties(); 083 if (props != null && !props.isEmpty()) { 084 setDocumentProperties(doc, props); 085 } 086 doc = session.createDocument(doc); 087 } 088 089 protected Blob getBlob(DocumentMessage message) { 090 Blob blob = null; 091 if (message.getBlob() != null) { 092 blob = message.getBlob(); 093 } else if (message.getBlobInfo() != null) { 094 BlobInfo blobInfo = message.getBlobInfo(); 095 blob = new SimpleManagedBlob(blobInfo); 096 } 097 return blob; 098 } 099 100 @Override 101 public void commit() { 102 log.debug("commit"); 103 session.save(); 104 // TODO: here if tx is in rollback we must throw something 105 commitOrRollbackTransaction(); 106 } 107 108 @Override 109 public void rollback() { 110 log.info("rollback"); 111 TransactionHelper.setTransactionRollbackOnly(); 112 TransactionHelper.commitOrRollbackTransaction(); 113 } 114 115 protected void setDocumentProperties(DocumentModel doc, Map<String, Serializable> properties) { 116 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 117 try { 118 doc.setPropertyValue(entry.getKey(), entry.getValue()); 119 } catch (PropertyNotFoundException e) { 120 String message = String.format("Property '%s' not found on document type: %s. Skipping it.", 121 entry.getKey(), doc.getType()); 122 log.error(message, e); 123 } 124 } 125 } 126 127}