001/*
002 * (C) Copyright 2006-2008 Nuxeo SAS (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     bstefanescu
016 */
017package org.nuxeo.ecm.core.event.jms;
018
019import javax.jms.JMSException;
020import javax.jms.ObjectMessage;
021import javax.jms.Topic;
022import javax.jms.TopicConnection;
023import javax.jms.TopicConnectionFactory;
024import javax.jms.TopicPublisher;
025import javax.jms.TopicSession;
026import javax.naming.InitialContext;
027import javax.naming.NamingException;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.event.EventBundle;
032import org.nuxeo.ecm.core.event.PostCommitEventListener;
033import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
034
035/**
036 * Forwards Core EventBundles to JMS topics.
037 *
038 * @author Tiry
039 */
040public class JmsEventForwarder implements PostCommitEventListener {
041
042    public static final String NUXEO_JMS_TOPIC = "topic/NuxeoMessages";
043
044    private static final Log log = LogFactory.getLog(JmsEventForwarder.class);
045
046    protected boolean jmsBusIsActive = true;
047
048    protected void produceJMSMessage(SerializableEventBundle message) throws JMSBusNotActiveException {
049        InitialContext ctx;
050        Topic nuxeoTopic;
051        try {
052            ctx = new InitialContext();
053            nuxeoTopic = (Topic) ctx.lookup(NUXEO_JMS_TOPIC);
054        } catch (NamingException e) {
055            jmsBusIsActive = false;
056            throw new JMSBusNotActiveException(e);
057        }
058
059        TopicConnection nuxeoTopicConnection = null;
060        TopicSession nuxeoTopicSession = null;
061        TopicPublisher nuxeoMessagePublisher = null;
062        try {
063            TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
064            nuxeoTopicConnection = factory.createTopicConnection();
065            nuxeoTopicSession = nuxeoTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
066
067            ObjectMessage jmsMessage = nuxeoTopicSession.createObjectMessage(message);
068
069            // add Headers for JMS message
070            jmsMessage.setStringProperty("BundleEvent", message.getEventBundleName());
071
072            nuxeoMessagePublisher = nuxeoTopicSession.createPublisher(nuxeoTopic);
073
074            nuxeoMessagePublisher.send(jmsMessage);
075            log.debug("Event bundle " + message.getEventBundleName() + " forwarded to JMS topic");
076
077        } catch (NamingException | JMSException e) {
078            log.error("Error during JMS forwarding", e);
079        } finally {
080            if (nuxeoTopicSession != null) {
081                try {
082                    if (nuxeoMessagePublisher != null) {
083                        nuxeoMessagePublisher.close();
084                    }
085                    nuxeoTopicConnection.close();
086                    nuxeoTopicSession.close();
087                } catch (JMSException e) {
088                    log.error("Error during JMS cleanup", e);
089                }
090            }
091        }
092    }
093
094    @Override
095    public void handleEvent(EventBundle events) {
096        if (!canForwardMessage(events)) {
097            return;
098        }
099        try {
100            produceJMSMessage(new SerializableEventBundle(events));
101        } catch (JMSBusNotActiveException e) {
102            log.debug("JMS Bus is not active, cannot forward message");
103        }
104    }
105
106    protected boolean canForwardMessage(EventBundle events) {
107        // Check Bus is Active
108        if (!jmsBusIsActive) {
109            log.debug("JMS Bus is not active, cannot forward message");
110            return false;
111        }
112        if (events instanceof ReconnectedEventBundle) {
113            if (((ReconnectedEventBundle) events).comesFromJMS()) {
114                log.debug("Message already comes from JMS bus, not forwarding");
115                return false;
116            }
117        }
118        return true;
119    }
120
121}