001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.consumer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.api.Blob; 024import org.nuxeo.ecm.core.api.CoreInstance; 025import org.nuxeo.ecm.core.api.CoreSession; 026import org.nuxeo.ecm.core.api.DocumentModel; 027import org.nuxeo.ecm.core.api.model.PropertyNotFoundException; 028import org.nuxeo.ecm.core.blob.BlobManager; 029import org.nuxeo.ecm.core.blob.SimpleManagedBlob; 030import org.nuxeo.ecm.platform.importer.mqueues.message.DocumentMessage; 031import org.nuxeo.runtime.transaction.TransactionHelper; 032 033import java.io.Serializable; 034import java.util.Map; 035 036import static org.nuxeo.runtime.transaction.TransactionHelper.commitOrRollbackTransaction; 037 038/** 039 * Consumes DocumentMessage and produce Nuxeo document. 040 * 041 * @since 9.1 042 */ 043public class DocumentMessageConsumer extends AbstractConsumer<DocumentMessage> { 044 private static final Log log = LogFactory.getLog(DocumentMessageConsumer.class); 045 private final String rootPath; 046 private final String repositoryName; 047 private CoreSession session; 048 049 public DocumentMessageConsumer(int consumerId, String repositoryName, String rootPath) { 050 super(consumerId); 051 this.rootPath = rootPath; 052 this.repositoryName = repositoryName; 053 } 054 055 @Override 056 public void close() throws Exception { 057 super.close(); 058 if (session != null) { 059 session.close(); 060 TransactionHelper.commitOrRollbackTransaction(); 061 } 062 } 063 064 @Override 065 public void begin() { 066 TransactionHelper.startTransaction(); 067 if (session == null) { 068 this.session = CoreInstance.openCoreSessionSystem(repositoryName); 069 } 070 } 071 072 @Override 073 public void accept(DocumentMessage message) { 074 DocumentModel doc = session.createDocumentModel(rootPath + message.getParentPath(), message.getName(), message.getType()); 075 doc.putContextData(CoreSession.SKIP_DESTINATION_CHECK_ON_CREATE, true); 076 Blob blob = getBlob(message); 077 if (blob != null) { 078 // doc.setProperty("file", "filename", blob.getFilename()); 079 doc.setProperty("file", "content", blob); 080 } 081 Map<String, Serializable> props = message.getProperties(); 082 if (props != null && !props.isEmpty()) { 083 setDocumentProperties(doc, props); 084 } 085 doc = session.createDocument(doc); 086 } 087 088 private Blob getBlob(DocumentMessage message) { 089 Blob blob = null; 090 if (message.getBlob() != null) { 091 blob = message.getBlob(); 092 } else if (message.getBlobInfo() != null) { 093 BlobManager.BlobInfo blobInfo = message.getBlobInfo(); 094 blob = new SimpleManagedBlob(blobInfo); 095 } 096 return blob; 097 } 098 099 @Override 100 public void commit() { 101 log.debug("commit"); 102 session.save(); 103 // TODO here if tx is in rollback we must throw something 104 commitOrRollbackTransaction(); 105 } 106 107 @Override 108 public void rollback() { 109 log.info("rollback"); 110 TransactionHelper.setTransactionRollbackOnly(); 111 TransactionHelper.commitOrRollbackTransaction(); 112 } 113 114 protected void setDocumentProperties(DocumentModel doc, Map<String, Serializable> properties) { 115 for (Map.Entry<String, Serializable> entry : properties.entrySet()) { 116 try { 117 doc.setPropertyValue(entry.getKey(), entry.getValue()); 118 } catch (PropertyNotFoundException e) { 119 String message = String.format("Property '%s' not found on document type: %s. Skipping it.", 120 entry.getKey(), doc.getType()); 121 log.error(message, e); 122 } 123 } 124 } 125 126 127}