001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.common.logging.SequenceTracer;
027import org.nuxeo.ecm.core.api.NuxeoException;
028import org.nuxeo.ecm.core.api.RecoverableClientException;
029import org.nuxeo.ecm.core.event.Event;
030import org.nuxeo.ecm.core.event.EventBundle;
031import org.nuxeo.ecm.core.event.EventContext;
032import org.nuxeo.ecm.core.event.EventListener;
033import org.nuxeo.ecm.core.event.EventService;
034import org.nuxeo.ecm.core.event.EventServiceAdmin;
035import org.nuxeo.ecm.core.event.EventStats;
036import org.nuxeo.ecm.core.event.PostCommitEventListener;
037import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
038import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig;
039import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor;
040import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry;
041import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher;
042import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor;
043import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047import javax.naming.NamingException;
048import javax.transaction.RollbackException;
049import javax.transaction.Status;
050import javax.transaction.Synchronization;
051import javax.transaction.SystemException;
052import java.rmi.dgc.VMID;
053import java.util.ArrayList;
054import java.util.HashMap;
055import java.util.List;
056import java.util.Map;
057import java.util.Set;
058import java.util.concurrent.CopyOnWriteArrayList;
059import java.util.stream.Collectors;
060
061/**
062 * Implementation of the event service.
063 */
064public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
065
066    public static final VMID VMID = new VMID();
067
068    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
069
070    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
071        @Override
072        protected CompositeEventBundle initialValue() {
073            return new CompositeEventBundle();
074        }
075    };
076
077    private static class CompositeEventBundle {
078
079        boolean registeredSynchronization;
080
081        final Map<String, EventBundle> byRepository = new HashMap<String, EventBundle>();
082
083        void push(Event event) {
084            String repositoryName = event.getContext().getRepositoryName();
085            if (!byRepository.containsKey(repositoryName)) {
086                byRepository.put(repositoryName, new EventBundleImpl());
087            }
088            byRepository.get(repositoryName).push(event);
089        }
090
091    }
092
093    protected final EventListenerList listenerDescriptors;
094
095    protected PostCommitEventExecutor postCommitExec;
096
097    protected volatile AsyncEventExecutor asyncExec;
098
099    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<AsyncWaitHook>();
100
101    protected boolean blockAsyncProcessing = false;
102
103    protected boolean blockSyncPostCommitProcessing = false;
104
105    protected boolean bulkModeEnabled = false;
106
107    protected EventPipeRegistry registeredPipes = new EventPipeRegistry();
108
109    protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry();
110
111    protected EventBundleDispatcher pipeDispatcher;
112
113    public EventServiceImpl() {
114        listenerDescriptors = new EventListenerList();
115        postCommitExec = new PostCommitEventExecutor();
116        asyncExec = new AsyncEventExecutor();
117    }
118
119    public void init() {
120        asyncExec.init();
121
122        EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor();
123        if (dispatcherDescriptor != null) {
124            List<EventPipeDescriptor> pipes = registeredPipes.getPipes();
125            if (!pipes.isEmpty()) {
126                pipeDispatcher = dispatcherDescriptor.getInstance();
127                pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters());
128            }
129        }
130    }
131
132    public EventBundleDispatcher getEventBundleDispatcher() {
133        return pipeDispatcher;
134    }
135
136    public void shutdown(long timeoutMillis) throws InterruptedException {
137        postCommitExec.shutdown(timeoutMillis);
138        Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream()
139                .filter(hook -> hook.shutdown() == false)
140                .collect(Collectors.toSet());
141        if (!notTerminated.isEmpty()) {
142            throw new RuntimeException("Asynch services are still running : " + notTerminated);
143        }
144
145        if (asyncExec.shutdown(timeoutMillis) == false) {
146            throw new RuntimeException("Async executor is still running, timeout expired");
147        }
148        if (pipeDispatcher != null) {
149            pipeDispatcher.shutdown();
150        }
151    }
152
153    public void registerForAsyncWait(AsyncWaitHook callback) {
154        asyncWaitHooks.add(callback);
155    }
156
157    public void unregisterForAsyncWait(AsyncWaitHook callback) {
158        asyncWaitHooks.remove(callback);
159    }
160
161    /**
162     * @deprecated use {@link #waitForAsyncCompletion()} instead.
163     */
164    @Deprecated
165    public int getActiveAsyncTaskCount() {
166        return asyncExec.getUnfinishedCount();
167    }
168
169    @Override
170    public void waitForAsyncCompletion() {
171        waitForAsyncCompletion(Long.MAX_VALUE);
172    }
173
174    @Override
175    public void waitForAsyncCompletion(long timeout) {
176        Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream()
177                .filter(hook -> !hook.waitForAsyncCompletion())
178                .collect(Collectors.toSet());
179        if (!notCompleted.isEmpty()) {
180            throw new RuntimeException("Async tasks are still running : " + notCompleted);
181        }
182        try {
183            if (!asyncExec.waitForCompletion(timeout)) {
184                throw new RuntimeException("Async event listeners thread pool is not terminated");
185            }
186        } catch (InterruptedException e) {
187            Thread.currentThread().interrupt();
188            // TODO change signature
189            throw new RuntimeException(e);
190        }
191        if (pipeDispatcher != null) {
192            try {
193                pipeDispatcher.waitForCompletion(timeout);
194            } catch (InterruptedException e) {
195                Thread.currentThread().interrupt();
196                throw new RuntimeException(e);
197            }
198        }
199    }
200
201    @Override
202    public void addEventListener(EventListenerDescriptor listener) {
203        listenerDescriptors.add(listener);
204        log.debug("Registered event listener: " + listener.getName());
205    }
206
207    public void addEventPipe(EventPipeDescriptor pipeDescriptor) {
208        registeredPipes.addContribution(pipeDescriptor);
209        log.debug("Registered event pipe: " + pipeDescriptor.getName());
210    }
211
212    public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
213        dispatchers.addContrib(dispatcherDescriptor);
214        log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName());
215    }
216
217    @Override
218    public void removeEventListener(EventListenerDescriptor listener) {
219        listenerDescriptors.removeDescriptor(listener);
220        log.debug("Unregistered event listener: " + listener.getName());
221    }
222
223    public void removeEventPipe(EventPipeDescriptor pipeDescriptor) {
224        registeredPipes.removeContribution(pipeDescriptor);
225        log.debug("Unregistered event pipe: " + pipeDescriptor.getName());
226    }
227
228    public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
229        dispatchers.removeContrib(dispatcherDescriptor);
230        log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName());
231    }
232
233    @Override
234    public void fireEvent(String name, EventContext context) {
235        fireEvent(new EventImpl(name, context));
236    }
237
238    @Override
239    public void fireEvent(Event event) {
240
241        String ename = event.getName();
242        EventStats stats = Framework.getService(EventStats.class);
243        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
244            if (!desc.acceptEvent(ename)) {
245                continue;
246            }
247            try {
248                long t0 = System.currentTimeMillis();
249                SequenceTracer.start("Fire sync event " + event.getName());
250                desc.asEventListener().handleEvent(event);
251                long elapsed = System.currentTimeMillis() - t0;
252                SequenceTracer.stop("done in " + elapsed + " ms");
253                if (stats != null) {
254                    stats.logSyncExec(desc, elapsed);
255                }
256                if (event.isCanceled()) {
257                    // break loop
258                    return;
259                }
260            } catch (RuntimeException e) {
261                // get message
262                SequenceTracer.destroy("failure");
263                String message = "Exception during " + desc.getName() + " sync listener execution, ";
264                if (event.isBubbleException()) {
265                    message += "other listeners will be ignored";
266                } else if (event.isMarkedForRollBack()) {
267                    message += "transaction will be rolled back";
268                    if (event.getRollbackMessage() != null) {
269                        message += " (" + event.getRollbackMessage() + ")";
270                    }
271                } else {
272                    message += "continuing to run other listeners";
273                }
274                // log
275                if (e instanceof RecoverableClientException) {
276                    log.info(message + "\n" + e.getMessage());
277                    log.debug(message, e);
278                } else {
279                    log.error(message, e);
280                }
281                // rethrow or swallow
282                if (event.isBubbleException()) {
283                    throw e;
284                } else if (event.isMarkedForRollBack()) {
285                    Exception ee;
286                    if (event.getRollbackException() != null) {
287                        ee = event.getRollbackException();
288                    } else {
289                        ee = e;
290                    }
291                    // when marked for rollback, throw a generic
292                    // RuntimeException to make sure nobody catches it
293                    throw new RuntimeException(message, ee);
294                } else {
295                    // swallow exception
296                }
297            }
298        }
299
300        if (!event.isInline()) { // record the event
301            // don't record the complete event, only a shallow copy
302            ShallowEvent shallowEvent = ShallowEvent.create(event);
303            if (event.isImmediate()) {
304                EventBundleImpl b = new EventBundleImpl();
305                b.push(shallowEvent);
306                fireEventBundle(b);
307            } else {
308                recordEvent(shallowEvent);
309            }
310        }
311    }
312
313    @Override
314    public void fireEventBundle(EventBundle event) {
315        boolean comesFromJMS = false;
316
317        if (event instanceof ReconnectedEventBundle) {
318            if (((ReconnectedEventBundle) event).comesFromJMS()) {
319                comesFromJMS = true;
320            }
321        }
322
323        List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
324        List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
325
326        if (bulkModeEnabled) {
327            // run all listeners synchronously in one transaction
328            List<EventListenerDescriptor> listeners = new ArrayList<EventListenerDescriptor>();
329            if (!blockSyncPostCommitProcessing) {
330                listeners = postCommitSync;
331            }
332            if (!blockAsyncProcessing) {
333                listeners.addAll(postCommitAsync);
334            }
335            if (!listeners.isEmpty()) {
336                postCommitExec.runBulk(listeners, event);
337            }
338            return;
339        }
340
341        // run sync listeners
342        if (blockSyncPostCommitProcessing) {
343            log.debug("Dropping PostCommit handler execution");
344        } else if (comesFromJMS) {
345            // when called from JMS we must skip sync listeners
346            // - postComit listeners should be on the core
347            // - there is no transaction started by JMS listener
348            log.debug("Deactivating sync post-commit listener since we are called from JMS");
349        } else {
350            if (!postCommitSync.isEmpty()) {
351                postCommitExec.run(postCommitSync, event);
352            }
353        }
354
355        if (blockAsyncProcessing) {
356            log.debug("Dopping bundle");
357            return;
358        }
359
360        // fire async listeners
361        if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) {
362            log.debug("Skipping async exec, this will be triggered via JMS");
363        } else {
364            if (pipeDispatcher == null) {
365                asyncExec.run(postCommitAsync, event);
366            } else {
367                // rather than sending to the WorkManager: send to the Pipe
368                pipeDispatcher.sendEventBundle(event);
369            }
370        }
371    }
372
373    @Override
374    public void fireEventBundleSync(EventBundle event) {
375        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
376            desc.asPostCommitListener().handleEvent(event);
377        }
378        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
379            desc.asPostCommitListener().handleEvent(event);
380        }
381    }
382
383    @Override
384    public List<EventListener> getEventListeners() {
385        return listenerDescriptors.getInLineListeners();
386    }
387
388    @Override
389    public List<PostCommitEventListener> getPostCommitEventListeners() {
390        List<PostCommitEventListener> result = new ArrayList<PostCommitEventListener>();
391
392        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
393        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
394
395        return result;
396    }
397
398    public EventListenerList getEventListenerList() {
399        return listenerDescriptors;
400    }
401
402    @Override
403    public EventListenerDescriptor getEventListener(String name) {
404        return listenerDescriptors.getDescriptor(name);
405    }
406
407    // methods for monitoring
408
409    @Override
410    public EventListenerList getListenerList() {
411        return listenerDescriptors;
412    }
413
414    @Override
415    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
416        if (!listenerDescriptors.hasListener(listenerName)) {
417            return;
418        }
419
420        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
421            if (desc.getName().equals(listenerName)) {
422                desc.setEnabled(enabled);
423                synchronized (this) {
424                    listenerDescriptors.recomputeEnabledListeners();
425                }
426                return;
427            }
428        }
429
430        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
431            if (desc.getName().equals(listenerName)) {
432                desc.setEnabled(enabled);
433                synchronized (this) {
434                    listenerDescriptors.recomputeEnabledListeners();
435                }
436                return;
437            }
438        }
439
440        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
441            if (desc.getName().equals(listenerName)) {
442                desc.setEnabled(enabled);
443                synchronized (this) {
444                    listenerDescriptors.recomputeEnabledListeners();
445                }
446                return;
447            }
448        }
449    }
450
451    @Override
452    public int getActiveThreadsCount() {
453        return asyncExec.getActiveCount();
454    }
455
456    @Override
457    public int getEventsInQueueCount() {
458        return asyncExec.getUnfinishedCount();
459    }
460
461    @Override
462    public boolean isBlockAsyncHandlers() {
463        return blockAsyncProcessing;
464    }
465
466    @Override
467    public boolean isBlockSyncPostCommitHandlers() {
468        return blockSyncPostCommitProcessing;
469    }
470
471    @Override
472    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
473        blockAsyncProcessing = blockAsyncHandlers;
474    }
475
476    @Override
477    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
478        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
479    }
480
481    @Override
482    public boolean isBulkModeEnabled() {
483        return bulkModeEnabled;
484    }
485
486    @Override
487    public void setBulkModeEnabled(boolean bulkModeEnabled) {
488        this.bulkModeEnabled = bulkModeEnabled;
489    }
490
491    protected void recordEvent(Event event) {
492        CompositeEventBundle b = threadBundles.get();
493        b.push(event);
494        if (TransactionHelper.isTransactionActive()) {
495            if (!b.registeredSynchronization) {
496                // register as synchronization
497                try {
498                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
499                } catch (NamingException | SystemException | RollbackException e) {
500                    throw new RuntimeException("Cannot register Synchronization", e);
501                }
502                b.registeredSynchronization = true;
503            }
504        } else if (event.isCommitEvent()) {
505            handleTxCommited();
506        }
507    }
508
509    @Override
510    public void beforeCompletion() {
511    }
512
513    @Override
514    public void afterCompletion(int status) {
515        if (status == Status.STATUS_COMMITTED) {
516            handleTxCommited();
517        } else if (status == Status.STATUS_ROLLEDBACK) {
518            handleTxRollbacked();
519        } else {
520            log.error("Unexpected afterCompletion status: " + status);
521        }
522    }
523
524    protected void handleTxRollbacked() {
525        threadBundles.remove();
526    }
527
528    protected void handleTxCommited() {
529        CompositeEventBundle b = threadBundles.get();
530        threadBundles.remove();
531
532        // notify post commit event listeners
533        for (EventBundle bundle : b.byRepository.values()) {
534            try {
535                fireEventBundle(bundle);
536            } catch (NuxeoException e) {
537                log.error("Error while processing " + bundle, e);
538            }
539        }
540    }
541
542}