001/*
002 * Copyright (c) 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Bogdan Stefanescu
011 *     Thierry Delprat
012 *     Florent Guillaume
013 */
014package org.nuxeo.ecm.core.event.impl;
015
016import java.rmi.dgc.VMID;
017import java.util.ArrayList;
018import java.util.HashMap;
019import java.util.HashSet;
020import java.util.List;
021import java.util.Map;
022import java.util.Set;
023import java.util.concurrent.CopyOnWriteArrayList;
024
025import javax.naming.NamingException;
026import javax.transaction.RollbackException;
027import javax.transaction.Status;
028import javax.transaction.Synchronization;
029import javax.transaction.SystemException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.core.api.NuxeoException;
034import org.nuxeo.ecm.core.api.RecoverableClientException;
035import org.nuxeo.ecm.core.event.Event;
036import org.nuxeo.ecm.core.event.EventBundle;
037import org.nuxeo.ecm.core.event.EventContext;
038import org.nuxeo.ecm.core.event.EventListener;
039import org.nuxeo.ecm.core.event.EventService;
040import org.nuxeo.ecm.core.event.EventServiceAdmin;
041import org.nuxeo.ecm.core.event.EventStats;
042import org.nuxeo.ecm.core.event.PostCommitEventListener;
043import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
044import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.transaction.TransactionHelper;
047
048/**
049 * Implementation of the event service.
050 */
051public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
052
053    public static final VMID VMID = new VMID();
054
055    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
056
057    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
058        @Override
059        protected CompositeEventBundle initialValue() {
060            return new CompositeEventBundle();
061        }
062    };
063
064    private static class CompositeEventBundle {
065
066        boolean registeredSynchronization;
067
068        final Map<String, EventBundle> byRepository = new HashMap<String, EventBundle>();
069
070        void push(Event event) {
071            String repositoryName = event.getContext().getRepositoryName();
072            if (!byRepository.containsKey(repositoryName)) {
073                byRepository.put(repositoryName, new EventBundleImpl());
074            }
075            byRepository.get(repositoryName).push(event);
076        }
077
078    }
079
080    protected final EventListenerList listenerDescriptors;
081
082    protected PostCommitEventExecutor postCommitExec;
083
084    protected volatile AsyncEventExecutor asyncExec;
085
086    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<AsyncWaitHook>();
087
088    protected boolean blockAsyncProcessing = false;
089
090    protected boolean blockSyncPostCommitProcessing = false;
091
092    protected boolean bulkModeEnabled = false;
093
094    public EventServiceImpl() {
095        listenerDescriptors = new EventListenerList();
096        postCommitExec = new PostCommitEventExecutor();
097        asyncExec = new AsyncEventExecutor();
098    }
099
100    public void init() {
101        asyncExec.init();
102    }
103
104    public void shutdown(long timeoutMillis) throws InterruptedException {
105        postCommitExec.shutdown(timeoutMillis);
106        Set<AsyncWaitHook> notTerminated = new HashSet<AsyncWaitHook>();
107        for (AsyncWaitHook hook : asyncWaitHooks) {
108            if (hook.shutdown() == false) {
109                notTerminated.add(hook);
110            }
111        }
112        if (!notTerminated.isEmpty()) {
113            throw new RuntimeException("Asynch services are still running : " + notTerminated);
114        }
115        if (asyncExec.shutdown(timeoutMillis) == false) {
116            throw new RuntimeException("Async executor is still running, timeout expired");
117        }
118    }
119
120    public void registerForAsyncWait(AsyncWaitHook callback) {
121        asyncWaitHooks.add(callback);
122    }
123
124    public void unregisterForAsyncWait(AsyncWaitHook callback) {
125        asyncWaitHooks.remove(callback);
126    }
127
128    /**
129     * @deprecated use {@link #waitForAsyncCompletion()} instead.
130     */
131    @Deprecated
132    public int getActiveAsyncTaskCount() {
133        return asyncExec.getUnfinishedCount();
134    }
135
136    @Override
137    public void waitForAsyncCompletion() {
138        waitForAsyncCompletion(Long.MAX_VALUE);
139    }
140
141    @Override
142    public void waitForAsyncCompletion(long timeout) {
143        Set<AsyncWaitHook> notCompleted = new HashSet<AsyncWaitHook>();
144        for (AsyncWaitHook hook : asyncWaitHooks) {
145            if (!hook.waitForAsyncCompletion()) {
146                notCompleted.add(hook);
147            }
148        }
149        if (!notCompleted.isEmpty()) {
150            throw new RuntimeException("Async tasks are still running : " + notCompleted);
151        }
152        try {
153            if (!asyncExec.waitForCompletion(timeout)) {
154                throw new RuntimeException("Async event listeners thread pool is not terminated");
155            }
156        } catch (InterruptedException e) {
157            Thread.currentThread().interrupt();
158            // TODO change signature
159            throw new RuntimeException(e);
160        }
161    }
162
163    @Override
164    public void addEventListener(EventListenerDescriptor listener) {
165        listenerDescriptors.add(listener);
166        log.debug("Registered event listener: " + listener.getName());
167    }
168
169    @Override
170    public void removeEventListener(EventListenerDescriptor listener) {
171        listenerDescriptors.removeDescriptor(listener);
172        log.debug("Unregistered event listener: " + listener.getName());
173    }
174
175    @Override
176    public void fireEvent(String name, EventContext context) {
177        fireEvent(new EventImpl(name, context));
178    }
179
180    @Override
181    public void fireEvent(Event event) {
182
183        String ename = event.getName();
184        EventStats stats = Framework.getService(EventStats.class);
185        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
186            if (!desc.acceptEvent(ename)) {
187                continue;
188            }
189            try {
190                long t0 = System.currentTimeMillis();
191                desc.asEventListener().handleEvent(event);
192                if (stats != null) {
193                    stats.logSyncExec(desc, System.currentTimeMillis() - t0);
194                }
195                if (event.isCanceled()) {
196                    // break loop
197                    return;
198                }
199            } catch (RuntimeException e) {
200                // get message
201                String message = "Exception during " + desc.getName() + " sync listener execution, ";
202                if (event.isBubbleException()) {
203                    message += "other listeners will be ignored";
204                } else if (event.isMarkedForRollBack()) {
205                    message += "transaction will be rolled back";
206                    if (event.getRollbackMessage() != null) {
207                        message += " (" + event.getRollbackMessage() + ")";
208                    }
209                } else {
210                    message += "continuing to run other listeners";
211                }
212                // log
213                if (e instanceof RecoverableClientException) {
214                    log.info(message + "\n" + e.getMessage());
215                    log.debug(message, e);
216                } else {
217                    log.error(message, e);
218                }
219                // rethrow or swallow
220                if (event.isBubbleException()) {
221                    throw e;
222                } else if (event.isMarkedForRollBack()) {
223                    Exception ee;
224                    if (event.getRollbackException() != null) {
225                        ee = event.getRollbackException();
226                    } else {
227                        ee = e;
228                    }
229                    // when marked for rollback, throw a generic
230                    // RuntimeException to make sure nobody catches it
231                    throw new RuntimeException(message, ee);
232                } else {
233                    // swallow exception
234                }
235            }
236        }
237
238        if (!event.isInline()) { // record the event
239            // don't record the complete event, only a shallow copy
240            ShallowEvent shallowEvent = ShallowEvent.create(event);
241            if (event.isImmediate()) {
242                EventBundleImpl b = new EventBundleImpl();
243                b.push(shallowEvent);
244                fireEventBundle(b);
245            } else {
246                recordEvent(shallowEvent);
247            }
248        }
249    }
250
251    @Override
252    public void fireEventBundle(EventBundle event) {
253        boolean comesFromJMS = false;
254
255        if (event instanceof ReconnectedEventBundle) {
256            if (((ReconnectedEventBundle) event).comesFromJMS()) {
257                comesFromJMS = true;
258            }
259        }
260
261        List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
262        List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
263
264        if (bulkModeEnabled) {
265            // run all listeners synchronously in one transaction
266            List<EventListenerDescriptor> listeners = new ArrayList<EventListenerDescriptor>();
267            if (!blockSyncPostCommitProcessing) {
268                listeners = postCommitSync;
269            }
270            if (!blockAsyncProcessing) {
271                listeners.addAll(postCommitAsync);
272            }
273            if (!listeners.isEmpty()) {
274                postCommitExec.runBulk(listeners, event);
275            }
276            return;
277        }
278
279        // run sync listeners
280        if (blockSyncPostCommitProcessing) {
281            log.debug("Dropping PostCommit handler execution");
282        } else if (comesFromJMS) {
283            // when called from JMS we must skip sync listeners
284            // - postComit listeners should be on the core
285            // - there is no transaction started by JMS listener
286            log.debug("Deactivating sync post-commit listener since we are called from JMS");
287        } else {
288            if (!postCommitSync.isEmpty()) {
289                postCommitExec.run(postCommitSync, event);
290            }
291        }
292
293        if (blockAsyncProcessing) {
294            log.debug("Dopping bundle");
295            return;
296        }
297
298        // fire async listeners
299        if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) {
300            log.debug("Skipping async exec, this will be triggered via JMS");
301        } else {
302            asyncExec.run(postCommitAsync, event);
303        }
304    }
305
306    @Override
307    public void fireEventBundleSync(EventBundle event) {
308        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
309            desc.asPostCommitListener().handleEvent(event);
310        }
311        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
312            desc.asPostCommitListener().handleEvent(event);
313        }
314    }
315
316    @Override
317    public List<EventListener> getEventListeners() {
318        return listenerDescriptors.getInLineListeners();
319    }
320
321    @Override
322    public List<PostCommitEventListener> getPostCommitEventListeners() {
323        List<PostCommitEventListener> result = new ArrayList<PostCommitEventListener>();
324
325        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
326        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
327
328        return result;
329    }
330
331    public EventListenerList getEventListenerList() {
332        return listenerDescriptors;
333    }
334
335    @Override
336    public EventListenerDescriptor getEventListener(String name) {
337        return listenerDescriptors.getDescriptor(name);
338    }
339
340    // methods for monitoring
341
342    @Override
343    public EventListenerList getListenerList() {
344        return listenerDescriptors;
345    }
346
347    @Override
348    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
349        if (!listenerDescriptors.hasListener(listenerName)) {
350            return;
351        }
352
353        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
354            if (desc.getName().equals(listenerName)) {
355                desc.setEnabled(enabled);
356                synchronized (this) {
357                    listenerDescriptors.recomputeEnabledListeners();
358                }
359                return;
360            }
361        }
362
363        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
364            if (desc.getName().equals(listenerName)) {
365                desc.setEnabled(enabled);
366                synchronized (this) {
367                    listenerDescriptors.recomputeEnabledListeners();
368                }
369                return;
370            }
371        }
372
373        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
374            if (desc.getName().equals(listenerName)) {
375                desc.setEnabled(enabled);
376                synchronized (this) {
377                    listenerDescriptors.recomputeEnabledListeners();
378                }
379                return;
380            }
381        }
382    }
383
384    @Override
385    public int getActiveThreadsCount() {
386        return asyncExec.getActiveCount();
387    }
388
389    @Override
390    public int getEventsInQueueCount() {
391        return asyncExec.getUnfinishedCount();
392    }
393
394    @Override
395    public boolean isBlockAsyncHandlers() {
396        return blockAsyncProcessing;
397    }
398
399    @Override
400    public boolean isBlockSyncPostCommitHandlers() {
401        return blockSyncPostCommitProcessing;
402    }
403
404    @Override
405    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
406        blockAsyncProcessing = blockAsyncHandlers;
407    }
408
409    @Override
410    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
411        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
412    }
413
414    @Override
415    public boolean isBulkModeEnabled() {
416        return bulkModeEnabled;
417    }
418
419    @Override
420    public void setBulkModeEnabled(boolean bulkModeEnabled) {
421        this.bulkModeEnabled = bulkModeEnabled;
422    }
423
424    protected void recordEvent(Event event) {
425        CompositeEventBundle b = threadBundles.get();
426        b.push(event);
427        if (TransactionHelper.isTransactionActive()) {
428            if (!b.registeredSynchronization) {
429                // register as synchronization
430                try {
431                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
432                } catch (NamingException | SystemException | RollbackException e) {
433                    throw new RuntimeException("Cannot register Synchronization", e);
434                }
435                b.registeredSynchronization = true;
436            }
437        } else if (event.isCommitEvent()) {
438            handleTxCommited();
439        }
440    }
441
442    @Override
443    public void beforeCompletion() {
444    }
445
446    @Override
447    public void afterCompletion(int status) {
448        if (status == Status.STATUS_COMMITTED) {
449            handleTxCommited();
450        } else if (status == Status.STATUS_ROLLEDBACK) {
451            handleTxRollbacked();
452        } else {
453            log.error("Unexpected afterCompletion status: " + status);
454        }
455    }
456
457    protected void handleTxRollbacked() {
458        threadBundles.remove();
459    }
460
461    protected void handleTxCommited() {
462        CompositeEventBundle b = threadBundles.get();
463        threadBundles.remove();
464
465        // notify post commit event listeners
466        for (EventBundle bundle : b.byRepository.values()) {
467            try {
468                fireEventBundle(bundle);
469            } catch (NuxeoException e) {
470                log.error("Error while processing " + bundle, e);
471            }
472        }
473    }
474
475}