001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.rmi.dgc.VMID;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.stream.Collectors;
032
033import javax.naming.NamingException;
034import javax.transaction.RollbackException;
035import javax.transaction.Status;
036import javax.transaction.Synchronization;
037import javax.transaction.SystemException;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.common.logging.SequenceTracer;
042import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.api.RecoverableClientException;
045import org.nuxeo.ecm.core.event.Event;
046import org.nuxeo.ecm.core.event.EventBundle;
047import org.nuxeo.ecm.core.event.EventContext;
048import org.nuxeo.ecm.core.event.EventListener;
049import org.nuxeo.ecm.core.event.EventService;
050import org.nuxeo.ecm.core.event.EventServiceAdmin;
051import org.nuxeo.ecm.core.event.EventStats;
052import org.nuxeo.ecm.core.event.PostCommitEventListener;
053import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
054import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig;
055import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor;
056import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry;
057import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher;
058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor;
059import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry;
060import org.nuxeo.runtime.api.Framework;
061import org.nuxeo.runtime.transaction.TransactionHelper;
062
063/**
064 * Implementation of the event service.
065 */
066public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
067
068    public static final VMID VMID = new VMID();
069
070    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
071
072    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
073        @Override
074        protected CompositeEventBundle initialValue() {
075            return new CompositeEventBundle();
076        }
077    };
078
079    private static class CompositeEventBundle {
080
081        boolean registeredSynchronization;
082
083        final Map<String, EventBundle> byRepository = new HashMap<>();
084
085        void push(Event event) {
086            String repositoryName = event.getContext().getRepositoryName();
087            if (!byRepository.containsKey(repositoryName)) {
088                byRepository.put(repositoryName, new EventBundleImpl());
089            }
090            byRepository.get(repositoryName).push(event);
091        }
092
093    }
094
095    protected final EventListenerList listenerDescriptors;
096
097    protected PostCommitEventExecutor postCommitExec;
098
099    protected volatile AsyncEventExecutor asyncExec;
100
101    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>();
102
103    protected boolean blockAsyncProcessing = false;
104
105    protected boolean blockSyncPostCommitProcessing = false;
106
107    protected boolean bulkModeEnabled = false;
108
109    protected EventPipeRegistry registeredPipes = new EventPipeRegistry();
110
111    protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry();
112
113    protected EventBundleDispatcher pipeDispatcher;
114
115    public EventServiceImpl() {
116        listenerDescriptors = new EventListenerList();
117        postCommitExec = new PostCommitEventExecutor();
118        asyncExec = new AsyncEventExecutor();
119    }
120
121    public void init() {
122        asyncExec.init();
123
124        EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor();
125        if (dispatcherDescriptor != null) {
126            List<EventPipeDescriptor> pipes = registeredPipes.getPipes();
127            if (!pipes.isEmpty()) {
128                pipeDispatcher = dispatcherDescriptor.getInstance();
129                pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters());
130            }
131        }
132    }
133
134    public EventBundleDispatcher getEventBundleDispatcher() {
135        return pipeDispatcher;
136    }
137
138    public void shutdown(long timeoutMillis) throws InterruptedException {
139        postCommitExec.shutdown(timeoutMillis);
140        Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect(
141                Collectors.toSet());
142        if (!notTerminated.isEmpty()) {
143            throw new RuntimeException("Asynch services are still running : " + notTerminated);
144        }
145
146        if (!asyncExec.shutdown(timeoutMillis)) {
147            throw new RuntimeException("Async executor is still running, timeout expired");
148        }
149        if (pipeDispatcher != null) {
150            pipeDispatcher.shutdown();
151        }
152    }
153
154    public void registerForAsyncWait(AsyncWaitHook callback) {
155        asyncWaitHooks.add(callback);
156    }
157
158    public void unregisterForAsyncWait(AsyncWaitHook callback) {
159        asyncWaitHooks.remove(callback);
160    }
161
162    @Override
163    public void waitForAsyncCompletion() {
164        waitForAsyncCompletion(Long.MAX_VALUE);
165    }
166
167    @Override
168    public void waitForAsyncCompletion(long timeout) {
169        Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream()
170                                                        .filter(hook -> !hook.waitForAsyncCompletion())
171                                                        .collect(Collectors.toSet());
172        if (!notCompleted.isEmpty()) {
173            throw new RuntimeException("Async tasks are still running : " + notCompleted);
174        }
175        try {
176            if (!asyncExec.waitForCompletion(timeout)) {
177                throw new RuntimeException("Async event listeners thread pool is not terminated");
178            }
179        } catch (InterruptedException e) {
180            Thread.currentThread().interrupt();
181            // TODO change signature
182            throw new RuntimeException(e);
183        }
184        if (pipeDispatcher != null) {
185            try {
186                pipeDispatcher.waitForCompletion(timeout);
187            } catch (InterruptedException e) {
188                Thread.currentThread().interrupt();
189                throw new RuntimeException(e);
190            }
191        }
192    }
193
194    @Override
195    public void addEventListener(EventListenerDescriptor listener) {
196        listenerDescriptors.add(listener);
197        log.debug("Registered event listener: " + listener.getName());
198    }
199
200    public void addEventPipe(EventPipeDescriptor pipeDescriptor) {
201        registeredPipes.addContribution(pipeDescriptor);
202        log.debug("Registered event pipe: " + pipeDescriptor.getName());
203    }
204
205    public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
206        dispatchers.addContrib(dispatcherDescriptor);
207        log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName());
208    }
209
210    @Override
211    public void removeEventListener(EventListenerDescriptor listener) {
212        listenerDescriptors.removeDescriptor(listener);
213        log.debug("Unregistered event listener: " + listener.getName());
214    }
215
216    public void removeEventPipe(EventPipeDescriptor pipeDescriptor) {
217        registeredPipes.removeContribution(pipeDescriptor);
218        log.debug("Unregistered event pipe: " + pipeDescriptor.getName());
219    }
220
221    public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
222        dispatchers.removeContrib(dispatcherDescriptor);
223        log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName());
224    }
225
226    @Override
227    public void fireEvent(String name, EventContext context) {
228        fireEvent(new EventImpl(name, context));
229    }
230
231    @Override
232    public void fireEvent(Event event) {
233
234        String ename = event.getName();
235        EventStats stats = Framework.getService(EventStats.class);
236        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
237            if (!desc.acceptEvent(ename)) {
238                continue;
239            }
240            try {
241                long t0 = System.currentTimeMillis();
242                SequenceTracer.start("Fire sync event " + event.getName());
243                desc.asEventListener().handleEvent(event);
244                long elapsed = System.currentTimeMillis() - t0;
245                SequenceTracer.stop("done in " + elapsed + " ms");
246                if (stats != null) {
247                    stats.logSyncExec(desc, elapsed);
248                }
249                if (event.isCanceled()) {
250                    // break loop
251                    return;
252                }
253            } catch (ConcurrentUpdateException e) {
254                // never swallow ConcurrentUpdateException
255                throw e;
256            } catch (RuntimeException e) {
257                // get message
258                SequenceTracer.destroy("failure");
259                String message = "Exception during " + desc.getName() + " sync listener execution, ";
260                if (event.isBubbleException()) {
261                    message += "other listeners will be ignored";
262                } else if (event.isMarkedForRollBack()) {
263                    message += "transaction will be rolled back";
264                    if (event.getRollbackMessage() != null) {
265                        message += " (" + event.getRollbackMessage() + ")";
266                    }
267                } else {
268                    message += "continuing to run other listeners";
269                }
270                // log
271                if (e instanceof RecoverableClientException) {
272                    log.info(message + "\n" + e.getMessage());
273                    log.debug(message, e);
274                } else {
275                    log.error(message, e);
276                }
277                // rethrow or swallow
278                if (TransactionHelper.isTransactionMarkedRollback()) {
279                    throw e;
280                } else if (event.isBubbleException()) {
281                    throw e;
282                } else if (event.isMarkedForRollBack()) {
283                    Exception ee;
284                    if (event.getRollbackException() != null) {
285                        ee = event.getRollbackException();
286                    } else {
287                        ee = e;
288                    }
289                    // when marked for rollback, throw a generic
290                    // RuntimeException to make sure nobody catches it
291                    throw new RuntimeException(message, ee);
292                } else {
293                    // swallow exception
294                }
295            }
296        }
297
298        if (!event.isInline()) { // record the event
299            // don't record the complete event, only a shallow copy
300            ShallowEvent shallowEvent = ShallowEvent.create(event);
301            if (event.isImmediate()) {
302                EventBundleImpl b = new EventBundleImpl();
303                b.push(shallowEvent);
304                fireEventBundle(b);
305            } else {
306                recordEvent(shallowEvent);
307            }
308        }
309    }
310
311    @Override
312    public void fireEventBundle(EventBundle event) {
313        boolean comesFromJMS = false;
314
315        if (event instanceof ReconnectedEventBundle) {
316            if (((ReconnectedEventBundle) event).comesFromJMS()) {
317                comesFromJMS = true;
318            }
319        }
320
321        List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
322        List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
323
324        if (bulkModeEnabled) {
325            // run all listeners synchronously in one transaction
326            List<EventListenerDescriptor> listeners = new ArrayList<>();
327            if (!blockSyncPostCommitProcessing) {
328                listeners = postCommitSync;
329            }
330            if (!blockAsyncProcessing) {
331                listeners.addAll(postCommitAsync);
332            }
333            if (!listeners.isEmpty()) {
334                postCommitExec.runBulk(listeners, event);
335            }
336            return;
337        }
338
339        // run sync listeners
340        if (blockSyncPostCommitProcessing) {
341            log.debug("Dropping PostCommit handler execution");
342        } else if (comesFromJMS) {
343            // when called from JMS we must skip sync listeners
344            // - postComit listeners should be on the core
345            // - there is no transaction started by JMS listener
346            log.debug("Deactivating sync post-commit listener since we are called from JMS");
347        } else {
348            if (!postCommitSync.isEmpty()) {
349                postCommitExec.run(postCommitSync, event);
350            }
351        }
352
353        if (blockAsyncProcessing) {
354            log.debug("Dopping bundle");
355            return;
356        }
357
358        // fire async listeners
359        if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) {
360            log.debug("Skipping async exec, this will be triggered via JMS");
361        } else {
362            if (pipeDispatcher == null) {
363                asyncExec.run(postCommitAsync, event);
364            } else {
365                // rather than sending to the WorkManager: send to the Pipe
366                pipeDispatcher.sendEventBundle(event);
367            }
368        }
369    }
370
371    @Override
372    public void fireEventBundleSync(EventBundle event) {
373        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
374            desc.asPostCommitListener().handleEvent(event);
375        }
376        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
377            desc.asPostCommitListener().handleEvent(event);
378        }
379    }
380
381    @Override
382    public List<EventListener> getEventListeners() {
383        return listenerDescriptors.getInLineListeners();
384    }
385
386    @Override
387    public List<PostCommitEventListener> getPostCommitEventListeners() {
388        List<PostCommitEventListener> result = new ArrayList<>();
389
390        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
391        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
392
393        return result;
394    }
395
396    public EventListenerList getEventListenerList() {
397        return listenerDescriptors;
398    }
399
400    @Override
401    public EventListenerDescriptor getEventListener(String name) {
402        return listenerDescriptors.getDescriptor(name);
403    }
404
405    // methods for monitoring
406
407    @Override
408    public EventListenerList getListenerList() {
409        return listenerDescriptors;
410    }
411
412    @Override
413    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
414        if (!listenerDescriptors.hasListener(listenerName)) {
415            return;
416        }
417
418        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
419            if (desc.getName().equals(listenerName)) {
420                desc.setEnabled(enabled);
421                synchronized (this) {
422                    listenerDescriptors.recomputeEnabledListeners();
423                }
424                return;
425            }
426        }
427
428        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
429            if (desc.getName().equals(listenerName)) {
430                desc.setEnabled(enabled);
431                synchronized (this) {
432                    listenerDescriptors.recomputeEnabledListeners();
433                }
434                return;
435            }
436        }
437
438        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
439            if (desc.getName().equals(listenerName)) {
440                desc.setEnabled(enabled);
441                synchronized (this) {
442                    listenerDescriptors.recomputeEnabledListeners();
443                }
444                return;
445            }
446        }
447    }
448
449    @Override
450    public int getActiveThreadsCount() {
451        return asyncExec.getActiveCount();
452    }
453
454    @Override
455    public int getEventsInQueueCount() {
456        return asyncExec.getUnfinishedCount();
457    }
458
459    @Override
460    public boolean isBlockAsyncHandlers() {
461        return blockAsyncProcessing;
462    }
463
464    @Override
465    public boolean isBlockSyncPostCommitHandlers() {
466        return blockSyncPostCommitProcessing;
467    }
468
469    @Override
470    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
471        blockAsyncProcessing = blockAsyncHandlers;
472    }
473
474    @Override
475    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
476        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
477    }
478
479    @Override
480    public boolean isBulkModeEnabled() {
481        return bulkModeEnabled;
482    }
483
484    @Override
485    public void setBulkModeEnabled(boolean bulkModeEnabled) {
486        this.bulkModeEnabled = bulkModeEnabled;
487    }
488
489    protected void recordEvent(Event event) {
490        CompositeEventBundle b = threadBundles.get();
491        b.push(event);
492        if (TransactionHelper.isTransactionActive()) {
493            if (!b.registeredSynchronization) {
494                // register as synchronization
495                try {
496                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
497                } catch (NamingException | SystemException | RollbackException e) {
498                    throw new RuntimeException("Cannot register Synchronization", e);
499                }
500                b.registeredSynchronization = true;
501            }
502        } else if (event.isCommitEvent()) {
503            handleTxCommited();
504        }
505    }
506
507    @Override
508    public void beforeCompletion() {
509    }
510
511    @Override
512    public void afterCompletion(int status) {
513        if (status == Status.STATUS_COMMITTED) {
514            handleTxCommited();
515        } else if (status == Status.STATUS_ROLLEDBACK) {
516            handleTxRollbacked();
517        } else {
518            log.error("Unexpected afterCompletion status: " + status);
519        }
520    }
521
522    protected void handleTxRollbacked() {
523        threadBundles.remove();
524    }
525
526    protected void handleTxCommited() {
527        CompositeEventBundle b = threadBundles.get();
528        threadBundles.remove();
529
530        // notify post commit event listeners
531        for (EventBundle bundle : b.byRepository.values()) {
532            try {
533                fireEventBundle(bundle);
534            } catch (NuxeoException e) {
535                log.error("Error while processing " + bundle, e);
536            }
537        }
538    }
539
540}