001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.event.stream;
020
021import java.util.ArrayList;
022import java.util.List;
023
024import javax.naming.NamingException;
025import javax.transaction.RollbackException;
026import javax.transaction.Status;
027import javax.transaction.Synchronization;
028import javax.transaction.SystemException;
029import javax.transaction.TransactionManager;
030
031import org.apache.logging.log4j.LogManager;
032import org.apache.logging.log4j.Logger;
033import org.nuxeo.ecm.core.event.Event;
034import org.nuxeo.ecm.core.event.EventListener;
035import org.nuxeo.ecm.core.event.EventService;
036import org.nuxeo.lib.stream.computation.Record;
037import org.nuxeo.lib.stream.computation.StreamManager;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.stream.StreamService;
040import org.nuxeo.runtime.transaction.TransactionHelper;
041
042/**
043 * Dispatches core events to domain event producers.
044 *
045 * @since 11.4
046 */
047public class DomainEventProducerListener implements EventListener, Synchronization {
048    private static final Logger log = LogManager.getLogger(DomainEventProducerListener.class);
049
050    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE);
051
052    protected static final ThreadLocal<List<DomainEventProducer>> producers = ThreadLocal.withInitial(ArrayList::new);
053
054    @Override
055    public void handleEvent(Event event) {
056        if (!Boolean.TRUE.equals(isEnlisted.get())) {
057            isEnlisted.set(registerSynchronization(this));
058            log.debug("Enlisted to transaction");
059            initDomainEventProducers();
060        }
061
062        producers.get().forEach(producer -> producer.addEvent(event));
063
064        if (!Boolean.TRUE.equals(isEnlisted.get())) {
065            // there is no transaction so don't wait for a commit
066            afterCompletion(Status.STATUS_COMMITTED);
067        }
068    }
069
070    protected void initDomainEventProducers() {
071        producers.set(Framework.getService(EventService.class).createDomainEventProducers());
072    }
073
074    protected void cleanDomainEventProducers() {
075        producers.remove();
076    }
077
078    @Override
079    public void beforeCompletion() {
080        log.debug("beforeCompletion");
081    }
082
083    @Override
084    public void afterCompletion(int status) {
085        try {
086            if (Status.STATUS_COMMITTED == status) {
087                log.debug("Sending domain events after commit");
088                produceDomainEvents();
089            } else {
090                log.info("Skip domain events transaction status is not committed: {}", status);
091            }
092        } finally {
093            isEnlisted.set(false);
094            cleanDomainEventProducers();
095        }
096    }
097
098    protected void produceDomainEvents() {
099        StreamService streamService = Framework.getService(StreamService.class);
100        for (DomainEventProducer producer : producers.get()) {
101            List<Record> records = producer.getDomainEvents();
102            if (records.isEmpty()) {
103                continue;
104            }
105            log.debug("Writing domain events");
106            StreamManager streamManager = streamService.getStreamManager();
107            String stream = producer.getStream();
108            records.forEach(record -> streamManager.append(stream, record));
109        }
110    }
111
112    protected boolean registerSynchronization(Synchronization sync) {
113        try {
114            TransactionManager tm = TransactionHelper.lookupTransactionManager();
115            if (tm != null) {
116                if (tm.getTransaction() != null) {
117                    tm.getTransaction().registerSynchronization(sync);
118                    return true;
119                }
120                return false;
121            } else {
122                log.error("Unable to register synchronization: no TransactionManager");
123                return false;
124            }
125        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
126            log.error("Unable to register synchronization", e);
127            return false;
128        }
129    }
130
131}