001/* 002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bstefanescu 018 */ 019package org.nuxeo.ecm.core.event.jms; 020 021import javax.jms.JMSException; 022import javax.jms.ObjectMessage; 023import javax.jms.Topic; 024import javax.jms.TopicConnection; 025import javax.jms.TopicConnectionFactory; 026import javax.jms.TopicPublisher; 027import javax.jms.TopicSession; 028import javax.naming.InitialContext; 029import javax.naming.NamingException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.event.EventBundle; 034import org.nuxeo.ecm.core.event.PostCommitEventListener; 035import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 036 037/** 038 * Forwards Core EventBundles to JMS topics. 039 * 040 * @author Tiry 041 */ 042public class JmsEventForwarder implements PostCommitEventListener { 043 044 public static final String NUXEO_JMS_TOPIC = "topic/NuxeoMessages"; 045 046 private static final Log log = LogFactory.getLog(JmsEventForwarder.class); 047 048 protected boolean jmsBusIsActive = true; 049 050 protected void produceJMSMessage(SerializableEventBundle message) throws JMSBusNotActiveException { 051 InitialContext ctx; 052 Topic nuxeoTopic; 053 try { 054 ctx = new InitialContext(); 055 nuxeoTopic = (Topic) ctx.lookup(NUXEO_JMS_TOPIC); 056 } catch (NamingException e) { 057 jmsBusIsActive = false; 058 throw new JMSBusNotActiveException(e); 059 } 060 061 TopicConnection nuxeoTopicConnection = null; 062 TopicSession nuxeoTopicSession = null; 063 TopicPublisher nuxeoMessagePublisher = null; 064 try { 065 TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory"); 066 nuxeoTopicConnection = factory.createTopicConnection(); 067 nuxeoTopicSession = nuxeoTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 068 069 ObjectMessage jmsMessage = nuxeoTopicSession.createObjectMessage(message); 070 071 // add Headers for JMS message 072 jmsMessage.setStringProperty("BundleEvent", message.getEventBundleName()); 073 074 nuxeoMessagePublisher = nuxeoTopicSession.createPublisher(nuxeoTopic); 075 076 nuxeoMessagePublisher.send(jmsMessage); 077 log.debug("Event bundle " + message.getEventBundleName() + " forwarded to JMS topic"); 078 079 } catch (NamingException | JMSException e) { 080 log.error("Error during JMS forwarding", e); 081 } finally { 082 if (nuxeoTopicSession != null) { 083 try { 084 if (nuxeoMessagePublisher != null) { 085 nuxeoMessagePublisher.close(); 086 } 087 nuxeoTopicConnection.close(); 088 nuxeoTopicSession.close(); 089 } catch (JMSException e) { 090 log.error("Error during JMS cleanup", e); 091 } 092 } 093 } 094 } 095 096 @Override 097 public void handleEvent(EventBundle events) { 098 if (!canForwardMessage(events)) { 099 return; 100 } 101 try { 102 produceJMSMessage(new SerializableEventBundle(events)); 103 } catch (JMSBusNotActiveException e) { 104 log.debug("JMS Bus is not active, cannot forward message"); 105 } 106 } 107 108 protected boolean canForwardMessage(EventBundle events) { 109 // Check Bus is Active 110 if (!jmsBusIsActive) { 111 log.debug("JMS Bus is not active, cannot forward message"); 112 return false; 113 } 114 if (events instanceof ReconnectedEventBundle) { 115 if (((ReconnectedEventBundle) events).comesFromJMS()) { 116 log.debug("Message already comes from JMS bus, not forwarding"); 117 return false; 118 } 119 } 120 return true; 121 } 122 123}