001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.rmi.dgc.VMID;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.stream.Collectors;
032
033import javax.naming.NamingException;
034import javax.transaction.RollbackException;
035import javax.transaction.Status;
036import javax.transaction.Synchronization;
037import javax.transaction.SystemException;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.common.utils.Path;
042import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.api.RecoverableClientException;
045import org.nuxeo.ecm.core.event.Event;
046import org.nuxeo.ecm.core.event.EventBundle;
047import org.nuxeo.ecm.core.event.EventContext;
048import org.nuxeo.ecm.core.event.EventListener;
049import org.nuxeo.ecm.core.event.EventService;
050import org.nuxeo.ecm.core.event.EventServiceAdmin;
051import org.nuxeo.ecm.core.event.EventServiceComponent;
052import org.nuxeo.ecm.core.event.EventStats;
053import org.nuxeo.ecm.core.event.PostCommitEventListener;
054import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor;
055import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry;
056import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher;
057import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor;
058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry;
059import org.nuxeo.ecm.core.event.stream.DomainEventProducer;
060import org.nuxeo.ecm.core.event.stream.DomainEventProducerDescriptor;
061import org.nuxeo.lib.stream.computation.Record;
062import org.nuxeo.lib.stream.computation.Settings;
063import org.nuxeo.runtime.api.Framework;
064import org.nuxeo.runtime.codec.CodecService;
065import org.nuxeo.runtime.model.DescriptorRegistry;
066import org.nuxeo.runtime.stream.StreamService;
067import org.nuxeo.runtime.transaction.TransactionHelper;
068
069import io.opencensus.trace.AttributeValue;
070import io.opencensus.trace.Span;
071import io.opencensus.trace.Tracer;
072import io.opencensus.trace.Tracing;
073
074/**
075 * Implementation of the event service.
076 */
077public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
078
079    public static final VMID VMID = new VMID();
080
081    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
082
083    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
084        @Override
085        protected CompositeEventBundle initialValue() {
086            return new CompositeEventBundle();
087        }
088    };
089
090    private static class CompositeEventBundle {
091
092        boolean registeredSynchronization;
093
094        final Map<String, EventBundle> byRepository = new HashMap<>();
095
096        void push(Event event) {
097            String repositoryName = event.getContext().getRepositoryName();
098            if (!byRepository.containsKey(repositoryName)) {
099                byRepository.put(repositoryName, new EventBundleImpl());
100            }
101            byRepository.get(repositoryName).push(event);
102        }
103
104    }
105
106    protected final EventListenerList listenerDescriptors;
107
108    protected PostCommitEventExecutor postCommitExec;
109
110    protected volatile AsyncEventExecutor asyncExec;
111
112    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>();
113
114    protected boolean blockAsyncProcessing = false;
115
116    protected boolean blockSyncPostCommitProcessing = false;
117
118    protected boolean bulkModeEnabled = false;
119
120    protected EventPipeRegistry registeredPipes = new EventPipeRegistry();
121
122    protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry();
123
124    protected EventBundleDispatcher pipeDispatcher;
125
126    // @since 11.4
127    protected DescriptorRegistry domainEventProducers = new DescriptorRegistry();
128
129    // @since 11.4
130    protected static final String REGISTRY_TARGET_NAME = "EventService";
131
132    public EventServiceImpl() {
133        listenerDescriptors = new EventListenerList();
134        postCommitExec = new PostCommitEventExecutor();
135        asyncExec = new AsyncEventExecutor();
136    }
137
138    public void init() {
139        asyncExec.init();
140
141        EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor();
142        if (dispatcherDescriptor != null) {
143            List<EventPipeDescriptor> pipes = registeredPipes.getPipes();
144            if (!pipes.isEmpty()) {
145                pipeDispatcher = dispatcherDescriptor.getInstance();
146                pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters());
147            }
148        }
149        initDomainEventStreams();
150    }
151
152    public EventBundleDispatcher getEventBundleDispatcher() {
153        return pipeDispatcher;
154    }
155
156    public void addDomainEventProducer(DomainEventProducerDescriptor descriptor) {
157        if (descriptor.isEnabled()) {
158            domainEventProducers.register(REGISTRY_TARGET_NAME, EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP,
159                    descriptor);
160            log.debug("Registered domain event producer: " + descriptor.getName());
161        } else {
162            domainEventProducers.unregister(REGISTRY_TARGET_NAME, EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP,
163                    descriptor);
164            log.debug("Unregistered domain event producer (disabled): " + descriptor.getName());
165        }
166    }
167
168    public void removeDomainEventProducer(DomainEventProducerDescriptor descriptor) {
169        domainEventProducers.unregister(REGISTRY_TARGET_NAME, EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP,
170                descriptor);
171        log.debug("Unregistered domain event producer: " + descriptor.getName());
172    }
173
174    public void shutdown(long timeoutMillis) throws InterruptedException {
175        postCommitExec.shutdown(timeoutMillis);
176        Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect(
177                Collectors.toSet());
178        if (!notTerminated.isEmpty()) {
179            throw new RuntimeException("Asynch services are still running : " + notTerminated);
180        }
181
182        if (!asyncExec.shutdown(timeoutMillis)) {
183            throw new RuntimeException("Async executor is still running, timeout expired");
184        }
185        if (pipeDispatcher != null) {
186            pipeDispatcher.shutdown();
187        }
188    }
189
190    public void registerForAsyncWait(AsyncWaitHook callback) {
191        asyncWaitHooks.add(callback);
192    }
193
194    public void unregisterForAsyncWait(AsyncWaitHook callback) {
195        asyncWaitHooks.remove(callback);
196    }
197
198    @Override
199    public void waitForAsyncCompletion() {
200        waitForAsyncCompletion(Long.MAX_VALUE);
201    }
202
203    @Override
204    public void waitForAsyncCompletion(long timeout) {
205        Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream()
206                                                        .filter(hook -> !hook.waitForAsyncCompletion())
207                                                        .collect(Collectors.toSet());
208        if (!notCompleted.isEmpty()) {
209            throw new RuntimeException("Async tasks are still running : " + notCompleted);
210        }
211        try {
212            if (!asyncExec.waitForCompletion(timeout)) {
213                throw new RuntimeException("Async event listeners thread pool is not terminated");
214            }
215        } catch (InterruptedException e) {
216            Thread.currentThread().interrupt();
217            // TODO change signature
218            throw new RuntimeException(e);
219        }
220        if (pipeDispatcher != null) {
221            try {
222                pipeDispatcher.waitForCompletion(timeout);
223            } catch (InterruptedException e) {
224                Thread.currentThread().interrupt();
225                throw new RuntimeException(e);
226            }
227        }
228    }
229
230    @Override
231    public void addEventListener(EventListenerDescriptor listener) {
232        listenerDescriptors.add(listener);
233        log.debug("Registered event listener: " + listener.getName());
234    }
235
236    public void addEventPipe(EventPipeDescriptor pipeDescriptor) {
237        registeredPipes.addContribution(pipeDescriptor);
238        log.debug("Registered event pipe: " + pipeDescriptor.getName());
239    }
240
241    public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
242        dispatchers.addContrib(dispatcherDescriptor);
243        log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName());
244    }
245
246    @Override
247    public void removeEventListener(EventListenerDescriptor listener) {
248        listenerDescriptors.removeDescriptor(listener);
249        log.debug("Unregistered event listener: " + listener.getName());
250    }
251
252    public void removeEventPipe(EventPipeDescriptor pipeDescriptor) {
253        registeredPipes.removeContribution(pipeDescriptor);
254        log.debug("Unregistered event pipe: " + pipeDescriptor.getName());
255    }
256
257    public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
258        dispatchers.removeContrib(dispatcherDescriptor);
259        log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName());
260    }
261
262    @Override
263    public void fireEvent(String name, EventContext context) {
264        fireEvent(new EventImpl(name, context));
265    }
266
267    @Override
268    public void fireEvent(Event event) {
269
270        String ename = event.getName();
271        EventStats stats = Framework.getService(EventStats.class);
272        Tracer tracer = Tracing.getTracer();
273        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
274            if (!desc.acceptEvent(ename)) {
275                continue;
276            }
277            try {
278                long t0 = System.currentTimeMillis();
279                desc.asEventListener().handleEvent(event);
280                long elapsed = System.currentTimeMillis() - t0;
281                traceAddAnnotation(event, tracer, elapsed, desc.getName());
282                if (stats != null) {
283                    stats.logSyncExec(desc, elapsed);
284                }
285                if (event.isCanceled()) {
286                    // break loop
287                    return;
288                }
289            } catch (ConcurrentUpdateException e) {
290                // never swallow ConcurrentUpdateException
291                throw e;
292            } catch (RuntimeException e) {
293                // get message
294                String message = "Exception during " + desc.getName() + " sync listener execution, ";
295                if (event.isBubbleException()) {
296                    message += "other listeners will be ignored";
297                } else if (event.isMarkedForRollBack()) {
298                    message += "transaction will be rolled back";
299                    if (event.getRollbackMessage() != null) {
300                        message += " (" + event.getRollbackMessage() + ")";
301                    }
302                } else {
303                    message += "continuing to run other listeners";
304                }
305                // log
306                tracer.getCurrentSpan().addAnnotation("EventService#fireEvent " + event.getName() + ": " + message);
307                if (e instanceof RecoverableClientException) {
308                    log.info(message + "\n" + e.getMessage());
309                    log.debug(message, e);
310                } else {
311                    log.error(message, e);
312                }
313                // rethrow or swallow
314                if (TransactionHelper.isTransactionMarkedRollback()) {
315                    throw e;
316                } else if (event.isBubbleException()) {
317                    throw e;
318                } else if (event.isMarkedForRollBack()) {
319                    // make sure the transaction is marked rollback-only,
320                    // even if some code later incorrectly swallows the rethrown exception
321                    TransactionHelper.setTransactionRollbackOnly();
322
323                    Exception ee;
324                    if (event.getRollbackException() != null) {
325                        ee = event.getRollbackException();
326                    } else {
327                        ee = e;
328                    }
329
330                    if (ee instanceof NuxeoException) {
331                        throw (NuxeoException) ee;
332                    } else {
333                        throw new NuxeoException(message, ee);
334                    }
335                } else {
336                    // swallow exception
337                }
338            }
339        }
340
341        if (!event.isInline()) { // record the event
342            // don't record the complete event, only a shallow copy
343            ShallowEvent shallowEvent = ShallowEvent.create(event);
344            if (event.isImmediate()) {
345                EventBundleImpl b = new EventBundleImpl();
346                b.push(shallowEvent);
347                tracer.getCurrentSpan().addAnnotation("EventService#fireEvent firing immediate: " + event.getName());
348                fireEventBundle(b);
349            } else {
350                recordEvent(shallowEvent);
351            }
352        }
353    }
354
355    protected void traceAddAnnotation(Event event, Tracer tracer, long elapsed, String listener) {
356        Map<String, AttributeValue> attributes = new HashMap<>();
357        attributes.put("event", AttributeValue.stringAttributeValue(event.getName()));
358        attributes.put("listener", AttributeValue.stringAttributeValue(listener));
359        attributes.put("duration_ms", AttributeValue.longAttributeValue(elapsed));
360        EventContext eventContext = event.getContext();
361        if (eventContext instanceof DocumentEventContext) {
362            DocumentEventContext docContext = (DocumentEventContext) eventContext;
363            if (docContext.getSourceDocument() != null) {
364                Path docPath = docContext.getSourceDocument().getPath();
365                if (docPath != null) {
366                    attributes.put("doc", AttributeValue.stringAttributeValue(docPath.toString()));
367                }
368                String id = docContext.getSourceDocument().getId();
369                if (id != null) {
370                    attributes.put("doc_id", AttributeValue.stringAttributeValue(id));
371                }
372            }
373        }
374        tracer.getCurrentSpan().addAnnotation("EventService#fireEvent Event fired", attributes);
375    }
376
377    @Override
378    public void fireEventBundle(EventBundle event) {
379        Span span = Tracing.getTracer().getCurrentSpan();
380        span.addAnnotation("EventService#fireEventBundle");
381        try {
382            List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
383            List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
384
385            if (bulkModeEnabled) {
386                // run all listeners synchronously in one transaction
387                List<EventListenerDescriptor> listeners = new ArrayList<>();
388                if (!blockSyncPostCommitProcessing) {
389                    listeners = postCommitSync;
390                }
391                if (!blockAsyncProcessing) {
392                    listeners.addAll(postCommitAsync);
393                }
394                if (!listeners.isEmpty()) {
395                    postCommitExec.runBulk(listeners, event);
396                }
397                return;
398            }
399
400            // run sync listeners
401            if (blockSyncPostCommitProcessing) {
402                log.debug("Dropping PostCommit handler execution");
403            } else {
404                if (!postCommitSync.isEmpty()) {
405                    postCommitExec.run(postCommitSync, event);
406                }
407            }
408
409            if (blockAsyncProcessing) {
410                log.debug("Dopping bundle");
411                return;
412            }
413
414            // fire async listeners
415            if (pipeDispatcher == null) {
416                asyncExec.run(postCommitAsync, event);
417            } else {
418                // rather than sending to the WorkManager: send to the Pipe
419                pipeDispatcher.sendEventBundle(event);
420            }
421        } finally {
422            span.addAnnotation("EventService#fireEventBundle.done");
423        }
424    }
425
426    @Override
427    public void fireEventBundleSync(EventBundle event) {
428        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
429            desc.asPostCommitListener().handleEvent(event);
430        }
431        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
432            desc.asPostCommitListener().handleEvent(event);
433        }
434    }
435
436    @Override
437    public List<EventListener> getEventListeners() {
438        return listenerDescriptors.getInLineListeners();
439    }
440
441    @Override
442    public List<PostCommitEventListener> getPostCommitEventListeners() {
443        List<PostCommitEventListener> result = new ArrayList<>();
444
445        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
446        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
447
448        return result;
449    }
450
451    public EventListenerList getEventListenerList() {
452        return listenerDescriptors;
453    }
454
455    @Override
456    public EventListenerDescriptor getEventListener(String name) {
457        return listenerDescriptors.getDescriptor(name);
458    }
459
460    // methods for monitoring
461
462    @Override
463    public EventListenerList getListenerList() {
464        return listenerDescriptors;
465    }
466
467    @Override
468    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
469        if (!listenerDescriptors.hasListener(listenerName)) {
470            return;
471        }
472
473        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
474            if (desc.getName().equals(listenerName)) {
475                desc.setEnabled(enabled);
476                synchronized (this) {
477                    listenerDescriptors.recomputeEnabledListeners();
478                }
479                return;
480            }
481        }
482
483        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
484            if (desc.getName().equals(listenerName)) {
485                desc.setEnabled(enabled);
486                synchronized (this) {
487                    listenerDescriptors.recomputeEnabledListeners();
488                }
489                return;
490            }
491        }
492
493        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
494            if (desc.getName().equals(listenerName)) {
495                desc.setEnabled(enabled);
496                synchronized (this) {
497                    listenerDescriptors.recomputeEnabledListeners();
498                }
499                return;
500            }
501        }
502    }
503
504    @Override
505    public int getActiveThreadsCount() {
506        return asyncExec.getActiveCount();
507    }
508
509    @Override
510    public int getEventsInQueueCount() {
511        return asyncExec.getUnfinishedCount();
512    }
513
514    @Override
515    public boolean isBlockAsyncHandlers() {
516        return blockAsyncProcessing;
517    }
518
519    @Override
520    public boolean isBlockSyncPostCommitHandlers() {
521        return blockSyncPostCommitProcessing;
522    }
523
524    @Override
525    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
526        blockAsyncProcessing = blockAsyncHandlers;
527    }
528
529    @Override
530    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
531        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
532    }
533
534    @Override
535    public boolean isBulkModeEnabled() {
536        return bulkModeEnabled;
537    }
538
539    @Override
540    public void setBulkModeEnabled(boolean bulkModeEnabled) {
541        this.bulkModeEnabled = bulkModeEnabled;
542    }
543
544    protected void recordEvent(Event event) {
545        CompositeEventBundle b = threadBundles.get();
546        b.push(event);
547        if (TransactionHelper.isTransactionActive()) {
548            if (!b.registeredSynchronization) {
549                // register as synchronization
550                try {
551                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
552                } catch (NamingException | SystemException | RollbackException e) {
553                    throw new RuntimeException("Cannot register Synchronization", e);
554                }
555                b.registeredSynchronization = true;
556            }
557        } else if (event.isCommitEvent()) {
558            handleTxCommited();
559        }
560    }
561
562    @Override
563    public void beforeCompletion() {
564        Span span = Tracing.getTracer().getCurrentSpan();
565        span.addAnnotation("EventService#beforeCompletion");
566    }
567
568    @Override
569    public void afterCompletion(int status) {
570        Span span = Tracing.getTracer().getCurrentSpan();
571        if (status == Status.STATUS_COMMITTED) {
572            span.addAnnotation("EventService#afterCompletion committed");
573            handleTxCommited();
574        } else if (status == Status.STATUS_ROLLEDBACK) {
575            span.addAnnotation("EventService#afterCompletion ROLLBACK");
576            handleTxRollbacked();
577        } else {
578            log.error("Unexpected afterCompletion status: " + status);
579        }
580        span.addAnnotation("EventService#afterCompletion.done");
581    }
582
583    protected void handleTxRollbacked() {
584        threadBundles.remove();
585    }
586
587    protected void handleTxCommited() {
588        CompositeEventBundle b = threadBundles.get();
589        threadBundles.remove();
590
591        // notify post commit event listeners
592        for (EventBundle bundle : b.byRepository.values()) {
593            try {
594                fireEventBundle(bundle);
595            } catch (NuxeoException e) {
596                log.error("Error while processing " + bundle, e);
597            }
598        }
599    }
600
601    @Override
602    public List<DomainEventProducer> createDomainEventProducers() {
603        // TODO: optimize this by keeping an immutable list
604        List<DomainEventProducerDescriptor> descriptors = domainEventProducers.getDescriptors(REGISTRY_TARGET_NAME,
605                EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP);
606        return descriptors.stream().map(DomainEventProducerDescriptor::newInstance).collect(Collectors.toList());
607    }
608
609    protected void initDomainEventStreams() {
610        List<DomainEventProducerDescriptor> descriptors = domainEventProducers.getDescriptors(REGISTRY_TARGET_NAME,
611                EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP);
612        Settings settings = new Settings(1, 1);
613        List<String> streams = new ArrayList<>();
614        CodecService codecService = Framework.getService(CodecService.class);
615        descriptors.forEach(descriptor -> {
616            String streamName = descriptor.getStream().name;
617            streams.add(streamName);
618            settings.setPartitions(streamName, descriptor.getStream().partitions);
619            String codec = descriptor.getStream().codec;
620            if (codec != null) {
621                settings.setCodec(streamName, codecService.getCodec(codec, Record.class));
622            }
623            descriptor.getStream().filters.forEach(filter -> settings.addFilter(streamName, filter.getFilter()));
624        });
625        StreamService streamService = Framework.getService(StreamService.class);
626        if (streamService == null) {
627            log.warn("StreamService is not available, you might want to use RuntimeStreamFeature.class test Feature");
628        } else {
629            streamService.getStreamManager().register(streams, settings);
630        }
631    }
632
633}