001/*
002 * (C) Copyright 2006-2008 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bstefanescu
018 */
019package org.nuxeo.ecm.core.event.jms;
020
021import javax.jms.JMSException;
022import javax.jms.ObjectMessage;
023import javax.jms.Topic;
024import javax.jms.TopicConnection;
025import javax.jms.TopicConnectionFactory;
026import javax.jms.TopicPublisher;
027import javax.jms.TopicSession;
028import javax.naming.InitialContext;
029import javax.naming.NamingException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.event.EventBundle;
034import org.nuxeo.ecm.core.event.PostCommitEventListener;
035import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
036
037/**
038 * Forwards Core EventBundles to JMS topics.
039 *
040 * @author Tiry
041 */
042public class JmsEventForwarder implements PostCommitEventListener {
043
044    public static final String NUXEO_JMS_TOPIC = "topic/NuxeoMessages";
045
046    private static final Log log = LogFactory.getLog(JmsEventForwarder.class);
047
048    protected boolean jmsBusIsActive = true;
049
050    protected void produceJMSMessage(SerializableEventBundle message) throws JMSBusNotActiveException {
051        InitialContext ctx;
052        Topic nuxeoTopic;
053        try {
054            ctx = new InitialContext();
055            nuxeoTopic = (Topic) ctx.lookup(NUXEO_JMS_TOPIC);
056        } catch (NamingException e) {
057            jmsBusIsActive = false;
058            throw new JMSBusNotActiveException(e);
059        }
060
061        TopicConnection nuxeoTopicConnection = null;
062        TopicSession nuxeoTopicSession = null;
063        TopicPublisher nuxeoMessagePublisher = null;
064        try {
065            TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
066            nuxeoTopicConnection = factory.createTopicConnection();
067            nuxeoTopicSession = nuxeoTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
068
069            ObjectMessage jmsMessage = nuxeoTopicSession.createObjectMessage(message);
070
071            // add Headers for JMS message
072            jmsMessage.setStringProperty("BundleEvent", message.getEventBundleName());
073
074            nuxeoMessagePublisher = nuxeoTopicSession.createPublisher(nuxeoTopic);
075
076            nuxeoMessagePublisher.send(jmsMessage);
077            log.debug("Event bundle " + message.getEventBundleName() + " forwarded to JMS topic");
078
079        } catch (NamingException | JMSException e) {
080            log.error("Error during JMS forwarding", e);
081        } finally {
082            if (nuxeoTopicSession != null) {
083                try {
084                    if (nuxeoMessagePublisher != null) {
085                        nuxeoMessagePublisher.close();
086                    }
087                    nuxeoTopicConnection.close();
088                    nuxeoTopicSession.close();
089                } catch (JMSException e) {
090                    log.error("Error during JMS cleanup", e);
091                }
092            }
093        }
094    }
095
096    @Override
097    public void handleEvent(EventBundle events) {
098        if (!canForwardMessage(events)) {
099            return;
100        }
101        try {
102            produceJMSMessage(new SerializableEventBundle(events));
103        } catch (JMSBusNotActiveException e) {
104            log.debug("JMS Bus is not active, cannot forward message");
105        }
106    }
107
108    protected boolean canForwardMessage(EventBundle events) {
109        // Check Bus is Active
110        if (!jmsBusIsActive) {
111            log.debug("JMS Bus is not active, cannot forward message");
112            return false;
113        }
114        if (events instanceof ReconnectedEventBundle) {
115            if (((ReconnectedEventBundle) events).comesFromJMS()) {
116                log.debug("Message already comes from JMS bus, not forwarding");
117                return false;
118            }
119        }
120        return true;
121    }
122
123}