001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 */
021package org.nuxeo.ecm.core.event.impl;
022
023import java.rmi.dgc.VMID;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031
032import javax.naming.NamingException;
033import javax.transaction.RollbackException;
034import javax.transaction.Status;
035import javax.transaction.Synchronization;
036import javax.transaction.SystemException;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.common.logging.SequenceTracer;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.api.RecoverableClientException;
043import org.nuxeo.ecm.core.event.Event;
044import org.nuxeo.ecm.core.event.EventBundle;
045import org.nuxeo.ecm.core.event.EventContext;
046import org.nuxeo.ecm.core.event.EventListener;
047import org.nuxeo.ecm.core.event.EventService;
048import org.nuxeo.ecm.core.event.EventServiceAdmin;
049import org.nuxeo.ecm.core.event.EventStats;
050import org.nuxeo.ecm.core.event.PostCommitEventListener;
051import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
052import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig;
053import org.nuxeo.runtime.api.Framework;
054import org.nuxeo.runtime.transaction.TransactionHelper;
055
056/**
057 * Implementation of the event service.
058 */
059public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
060
061    public static final VMID VMID = new VMID();
062
063    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
064
065    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
066        @Override
067        protected CompositeEventBundle initialValue() {
068            return new CompositeEventBundle();
069        }
070    };
071
072    private static class CompositeEventBundle {
073
074        boolean registeredSynchronization;
075
076        final Map<String, EventBundle> byRepository = new HashMap<String, EventBundle>();
077
078        void push(Event event) {
079            String repositoryName = event.getContext().getRepositoryName();
080            if (!byRepository.containsKey(repositoryName)) {
081                byRepository.put(repositoryName, new EventBundleImpl());
082            }
083            byRepository.get(repositoryName).push(event);
084        }
085
086    }
087
088    protected final EventListenerList listenerDescriptors;
089
090    protected PostCommitEventExecutor postCommitExec;
091
092    protected volatile AsyncEventExecutor asyncExec;
093
094    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<AsyncWaitHook>();
095
096    protected boolean blockAsyncProcessing = false;
097
098    protected boolean blockSyncPostCommitProcessing = false;
099
100    protected boolean bulkModeEnabled = false;
101
102    public EventServiceImpl() {
103        listenerDescriptors = new EventListenerList();
104        postCommitExec = new PostCommitEventExecutor();
105        asyncExec = new AsyncEventExecutor();
106    }
107
108    public void init() {
109        asyncExec.init();
110    }
111
112    public void shutdown(long timeoutMillis) throws InterruptedException {
113        postCommitExec.shutdown(timeoutMillis);
114        Set<AsyncWaitHook> notTerminated = new HashSet<AsyncWaitHook>();
115        for (AsyncWaitHook hook : asyncWaitHooks) {
116            if (hook.shutdown() == false) {
117                notTerminated.add(hook);
118            }
119        }
120        if (!notTerminated.isEmpty()) {
121            throw new RuntimeException("Asynch services are still running : " + notTerminated);
122        }
123        if (asyncExec.shutdown(timeoutMillis) == false) {
124            throw new RuntimeException("Async executor is still running, timeout expired");
125        }
126    }
127
128    public void registerForAsyncWait(AsyncWaitHook callback) {
129        asyncWaitHooks.add(callback);
130    }
131
132    public void unregisterForAsyncWait(AsyncWaitHook callback) {
133        asyncWaitHooks.remove(callback);
134    }
135
136    /**
137     * @deprecated use {@link #waitForAsyncCompletion()} instead.
138     */
139    @Deprecated
140    public int getActiveAsyncTaskCount() {
141        return asyncExec.getUnfinishedCount();
142    }
143
144    @Override
145    public void waitForAsyncCompletion() {
146        waitForAsyncCompletion(Long.MAX_VALUE);
147    }
148
149    @Override
150    public void waitForAsyncCompletion(long timeout) {
151        Set<AsyncWaitHook> notCompleted = new HashSet<AsyncWaitHook>();
152        for (AsyncWaitHook hook : asyncWaitHooks) {
153            if (!hook.waitForAsyncCompletion()) {
154                notCompleted.add(hook);
155            }
156        }
157        if (!notCompleted.isEmpty()) {
158            throw new RuntimeException("Async tasks are still running : " + notCompleted);
159        }
160        try {
161            if (!asyncExec.waitForCompletion(timeout)) {
162                throw new RuntimeException("Async event listeners thread pool is not terminated");
163            }
164        } catch (InterruptedException e) {
165            Thread.currentThread().interrupt();
166            // TODO change signature
167            throw new RuntimeException(e);
168        }
169    }
170
171    @Override
172    public void addEventListener(EventListenerDescriptor listener) {
173        listenerDescriptors.add(listener);
174        log.debug("Registered event listener: " + listener.getName());
175    }
176
177    @Override
178    public void removeEventListener(EventListenerDescriptor listener) {
179        listenerDescriptors.removeDescriptor(listener);
180        log.debug("Unregistered event listener: " + listener.getName());
181    }
182
183    @Override
184    public void fireEvent(String name, EventContext context) {
185        fireEvent(new EventImpl(name, context));
186    }
187
188    @Override
189    public void fireEvent(Event event) {
190
191        String ename = event.getName();
192        EventStats stats = Framework.getService(EventStats.class);
193        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
194            if (!desc.acceptEvent(ename)) {
195                continue;
196            }
197            try {
198                long t0 = System.currentTimeMillis();
199                SequenceTracer.start("Fire sync event " + event.getName());
200                desc.asEventListener().handleEvent(event);
201                long elapsed = System.currentTimeMillis() - t0;
202                SequenceTracer.stop("done in " + elapsed + " ms");
203                if (stats != null) {
204                    stats.logSyncExec(desc, elapsed);
205                }
206                if (event.isCanceled()) {
207                    // break loop
208                    return;
209                }
210            } catch (RuntimeException e) {
211                // get message
212                SequenceTracer.destroy("failure");
213                String message = "Exception during " + desc.getName() + " sync listener execution, ";
214                if (event.isBubbleException()) {
215                    message += "other listeners will be ignored";
216                } else if (event.isMarkedForRollBack()) {
217                    message += "transaction will be rolled back";
218                    if (event.getRollbackMessage() != null) {
219                        message += " (" + event.getRollbackMessage() + ")";
220                    }
221                } else {
222                    message += "continuing to run other listeners";
223                }
224                // log
225                if (e instanceof RecoverableClientException) {
226                    log.info(message + "\n" + e.getMessage());
227                    log.debug(message, e);
228                } else {
229                    log.error(message, e);
230                }
231                // rethrow or swallow
232                if (event.isBubbleException()) {
233                    throw e;
234                } else if (event.isMarkedForRollBack()) {
235                    Exception ee;
236                    if (event.getRollbackException() != null) {
237                        ee = event.getRollbackException();
238                    } else {
239                        ee = e;
240                    }
241                    // when marked for rollback, throw a generic
242                    // RuntimeException to make sure nobody catches it
243                    throw new RuntimeException(message, ee);
244                } else {
245                    // swallow exception
246                }
247            }
248        }
249
250        if (!event.isInline()) { // record the event
251            // don't record the complete event, only a shallow copy
252            ShallowEvent shallowEvent = ShallowEvent.create(event);
253            if (event.isImmediate()) {
254                EventBundleImpl b = new EventBundleImpl();
255                b.push(shallowEvent);
256                fireEventBundle(b);
257            } else {
258                recordEvent(shallowEvent);
259            }
260        }
261    }
262
263    @Override
264    public void fireEventBundle(EventBundle event) {
265        boolean comesFromJMS = false;
266
267        if (event instanceof ReconnectedEventBundle) {
268            if (((ReconnectedEventBundle) event).comesFromJMS()) {
269                comesFromJMS = true;
270            }
271        }
272
273        List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
274        List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
275
276        if (bulkModeEnabled) {
277            // run all listeners synchronously in one transaction
278            List<EventListenerDescriptor> listeners = new ArrayList<EventListenerDescriptor>();
279            if (!blockSyncPostCommitProcessing) {
280                listeners = postCommitSync;
281            }
282            if (!blockAsyncProcessing) {
283                listeners.addAll(postCommitAsync);
284            }
285            if (!listeners.isEmpty()) {
286                postCommitExec.runBulk(listeners, event);
287            }
288            return;
289        }
290
291        // run sync listeners
292        if (blockSyncPostCommitProcessing) {
293            log.debug("Dropping PostCommit handler execution");
294        } else if (comesFromJMS) {
295            // when called from JMS we must skip sync listeners
296            // - postComit listeners should be on the core
297            // - there is no transaction started by JMS listener
298            log.debug("Deactivating sync post-commit listener since we are called from JMS");
299        } else {
300            if (!postCommitSync.isEmpty()) {
301                postCommitExec.run(postCommitSync, event);
302            }
303        }
304
305        if (blockAsyncProcessing) {
306            log.debug("Dopping bundle");
307            return;
308        }
309
310        // fire async listeners
311        if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) {
312            log.debug("Skipping async exec, this will be triggered via JMS");
313        } else {
314            asyncExec.run(postCommitAsync, event);
315        }
316    }
317
318    @Override
319    public void fireEventBundleSync(EventBundle event) {
320        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
321            desc.asPostCommitListener().handleEvent(event);
322        }
323        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
324            desc.asPostCommitListener().handleEvent(event);
325        }
326    }
327
328    @Override
329    public List<EventListener> getEventListeners() {
330        return listenerDescriptors.getInLineListeners();
331    }
332
333    @Override
334    public List<PostCommitEventListener> getPostCommitEventListeners() {
335        List<PostCommitEventListener> result = new ArrayList<PostCommitEventListener>();
336
337        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
338        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
339
340        return result;
341    }
342
343    public EventListenerList getEventListenerList() {
344        return listenerDescriptors;
345    }
346
347    @Override
348    public EventListenerDescriptor getEventListener(String name) {
349        return listenerDescriptors.getDescriptor(name);
350    }
351
352    // methods for monitoring
353
354    @Override
355    public EventListenerList getListenerList() {
356        return listenerDescriptors;
357    }
358
359    @Override
360    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
361        if (!listenerDescriptors.hasListener(listenerName)) {
362            return;
363        }
364
365        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
366            if (desc.getName().equals(listenerName)) {
367                desc.setEnabled(enabled);
368                synchronized (this) {
369                    listenerDescriptors.recomputeEnabledListeners();
370                }
371                return;
372            }
373        }
374
375        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
376            if (desc.getName().equals(listenerName)) {
377                desc.setEnabled(enabled);
378                synchronized (this) {
379                    listenerDescriptors.recomputeEnabledListeners();
380                }
381                return;
382            }
383        }
384
385        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
386            if (desc.getName().equals(listenerName)) {
387                desc.setEnabled(enabled);
388                synchronized (this) {
389                    listenerDescriptors.recomputeEnabledListeners();
390                }
391                return;
392            }
393        }
394    }
395
396    @Override
397    public int getActiveThreadsCount() {
398        return asyncExec.getActiveCount();
399    }
400
401    @Override
402    public int getEventsInQueueCount() {
403        return asyncExec.getUnfinishedCount();
404    }
405
406    @Override
407    public boolean isBlockAsyncHandlers() {
408        return blockAsyncProcessing;
409    }
410
411    @Override
412    public boolean isBlockSyncPostCommitHandlers() {
413        return blockSyncPostCommitProcessing;
414    }
415
416    @Override
417    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
418        blockAsyncProcessing = blockAsyncHandlers;
419    }
420
421    @Override
422    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
423        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
424    }
425
426    @Override
427    public boolean isBulkModeEnabled() {
428        return bulkModeEnabled;
429    }
430
431    @Override
432    public void setBulkModeEnabled(boolean bulkModeEnabled) {
433        this.bulkModeEnabled = bulkModeEnabled;
434    }
435
436    protected void recordEvent(Event event) {
437        CompositeEventBundle b = threadBundles.get();
438        b.push(event);
439        if (TransactionHelper.isTransactionActive()) {
440            if (!b.registeredSynchronization) {
441                // register as synchronization
442                try {
443                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
444                } catch (NamingException | SystemException | RollbackException e) {
445                    throw new RuntimeException("Cannot register Synchronization", e);
446                }
447                b.registeredSynchronization = true;
448            }
449        } else if (event.isCommitEvent()) {
450            handleTxCommited();
451        }
452    }
453
454    @Override
455    public void beforeCompletion() {
456    }
457
458    @Override
459    public void afterCompletion(int status) {
460        if (status == Status.STATUS_COMMITTED) {
461            handleTxCommited();
462        } else if (status == Status.STATUS_ROLLEDBACK) {
463            handleTxRollbacked();
464        } else {
465            log.error("Unexpected afterCompletion status: " + status);
466        }
467    }
468
469    protected void handleTxRollbacked() {
470        threadBundles.remove();
471    }
472
473    protected void handleTxCommited() {
474        CompositeEventBundle b = threadBundles.get();
475        threadBundles.remove();
476
477        // notify post commit event listeners
478        for (EventBundle bundle : b.byRepository.values()) {
479            try {
480                fireEventBundle(bundle);
481            } catch (NuxeoException e) {
482                log.error("Error while processing " + bundle, e);
483            }
484        }
485    }
486
487}