001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.rmi.dgc.VMID;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.stream.Collectors;
032
033import javax.naming.NamingException;
034import javax.transaction.RollbackException;
035import javax.transaction.Status;
036import javax.transaction.Synchronization;
037import javax.transaction.SystemException;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.common.logging.SequenceTracer;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.api.RecoverableClientException;
044import org.nuxeo.ecm.core.event.Event;
045import org.nuxeo.ecm.core.event.EventBundle;
046import org.nuxeo.ecm.core.event.EventContext;
047import org.nuxeo.ecm.core.event.EventListener;
048import org.nuxeo.ecm.core.event.EventService;
049import org.nuxeo.ecm.core.event.EventServiceAdmin;
050import org.nuxeo.ecm.core.event.EventStats;
051import org.nuxeo.ecm.core.event.PostCommitEventListener;
052import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
053import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig;
054import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor;
055import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry;
056import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher;
057import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor;
058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.transaction.TransactionHelper;
061
062/**
063 * Implementation of the event service.
064 */
065public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
066
067    public static final VMID VMID = new VMID();
068
069    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
070
071    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
072        @Override
073        protected CompositeEventBundle initialValue() {
074            return new CompositeEventBundle();
075        }
076    };
077
078    private static class CompositeEventBundle {
079
080        boolean registeredSynchronization;
081
082        final Map<String, EventBundle> byRepository = new HashMap<>();
083
084        void push(Event event) {
085            String repositoryName = event.getContext().getRepositoryName();
086            if (!byRepository.containsKey(repositoryName)) {
087                byRepository.put(repositoryName, new EventBundleImpl());
088            }
089            byRepository.get(repositoryName).push(event);
090        }
091
092    }
093
094    protected final EventListenerList listenerDescriptors;
095
096    protected PostCommitEventExecutor postCommitExec;
097
098    protected volatile AsyncEventExecutor asyncExec;
099
100    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>();
101
102    protected boolean blockAsyncProcessing = false;
103
104    protected boolean blockSyncPostCommitProcessing = false;
105
106    protected boolean bulkModeEnabled = false;
107
108    protected EventPipeRegistry registeredPipes = new EventPipeRegistry();
109
110    protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry();
111
112    protected EventBundleDispatcher pipeDispatcher;
113
114    public EventServiceImpl() {
115        listenerDescriptors = new EventListenerList();
116        postCommitExec = new PostCommitEventExecutor();
117        asyncExec = new AsyncEventExecutor();
118    }
119
120    public void init() {
121        asyncExec.init();
122
123        EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor();
124        if (dispatcherDescriptor != null) {
125            List<EventPipeDescriptor> pipes = registeredPipes.getPipes();
126            if (!pipes.isEmpty()) {
127                pipeDispatcher = dispatcherDescriptor.getInstance();
128                pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters());
129            }
130        }
131    }
132
133    public EventBundleDispatcher getEventBundleDispatcher() {
134        return pipeDispatcher;
135    }
136
137    public void shutdown(long timeoutMillis) throws InterruptedException {
138        postCommitExec.shutdown(timeoutMillis);
139        Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect(
140                Collectors.toSet());
141        if (!notTerminated.isEmpty()) {
142            throw new RuntimeException("Asynch services are still running : " + notTerminated);
143        }
144
145        if (!asyncExec.shutdown(timeoutMillis)) {
146            throw new RuntimeException("Async executor is still running, timeout expired");
147        }
148        if (pipeDispatcher != null) {
149            pipeDispatcher.shutdown();
150        }
151    }
152
153    public void registerForAsyncWait(AsyncWaitHook callback) {
154        asyncWaitHooks.add(callback);
155    }
156
157    public void unregisterForAsyncWait(AsyncWaitHook callback) {
158        asyncWaitHooks.remove(callback);
159    }
160
161    @Override
162    public void waitForAsyncCompletion() {
163        waitForAsyncCompletion(Long.MAX_VALUE);
164    }
165
166    @Override
167    public void waitForAsyncCompletion(long timeout) {
168        Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream()
169                                                        .filter(hook -> !hook.waitForAsyncCompletion())
170                                                        .collect(Collectors.toSet());
171        if (!notCompleted.isEmpty()) {
172            throw new RuntimeException("Async tasks are still running : " + notCompleted);
173        }
174        try {
175            if (!asyncExec.waitForCompletion(timeout)) {
176                throw new RuntimeException("Async event listeners thread pool is not terminated");
177            }
178        } catch (InterruptedException e) {
179            Thread.currentThread().interrupt();
180            // TODO change signature
181            throw new RuntimeException(e);
182        }
183        if (pipeDispatcher != null) {
184            try {
185                pipeDispatcher.waitForCompletion(timeout);
186            } catch (InterruptedException e) {
187                Thread.currentThread().interrupt();
188                throw new RuntimeException(e);
189            }
190        }
191    }
192
193    @Override
194    public void addEventListener(EventListenerDescriptor listener) {
195        listenerDescriptors.add(listener);
196        log.debug("Registered event listener: " + listener.getName());
197    }
198
199    public void addEventPipe(EventPipeDescriptor pipeDescriptor) {
200        registeredPipes.addContribution(pipeDescriptor);
201        log.debug("Registered event pipe: " + pipeDescriptor.getName());
202    }
203
204    public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
205        dispatchers.addContrib(dispatcherDescriptor);
206        log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName());
207    }
208
209    @Override
210    public void removeEventListener(EventListenerDescriptor listener) {
211        listenerDescriptors.removeDescriptor(listener);
212        log.debug("Unregistered event listener: " + listener.getName());
213    }
214
215    public void removeEventPipe(EventPipeDescriptor pipeDescriptor) {
216        registeredPipes.removeContribution(pipeDescriptor);
217        log.debug("Unregistered event pipe: " + pipeDescriptor.getName());
218    }
219
220    public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
221        dispatchers.removeContrib(dispatcherDescriptor);
222        log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName());
223    }
224
225    @Override
226    public void fireEvent(String name, EventContext context) {
227        fireEvent(new EventImpl(name, context));
228    }
229
230    @Override
231    public void fireEvent(Event event) {
232
233        String ename = event.getName();
234        EventStats stats = Framework.getService(EventStats.class);
235        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
236            if (!desc.acceptEvent(ename)) {
237                continue;
238            }
239            try {
240                long t0 = System.currentTimeMillis();
241                SequenceTracer.start("Fire sync event " + event.getName());
242                desc.asEventListener().handleEvent(event);
243                long elapsed = System.currentTimeMillis() - t0;
244                SequenceTracer.stop("done in " + elapsed + " ms");
245                if (stats != null) {
246                    stats.logSyncExec(desc, elapsed);
247                }
248                if (event.isCanceled()) {
249                    // break loop
250                    return;
251                }
252            } catch (RuntimeException e) {
253                // get message
254                SequenceTracer.destroy("failure");
255                String message = "Exception during " + desc.getName() + " sync listener execution, ";
256                if (event.isBubbleException()) {
257                    message += "other listeners will be ignored";
258                } else if (event.isMarkedForRollBack()) {
259                    message += "transaction will be rolled back";
260                    if (event.getRollbackMessage() != null) {
261                        message += " (" + event.getRollbackMessage() + ")";
262                    }
263                } else {
264                    message += "continuing to run other listeners";
265                }
266                // log
267                if (e instanceof RecoverableClientException) {
268                    log.info(message + "\n" + e.getMessage());
269                    log.debug(message, e);
270                } else {
271                    log.error(message, e);
272                }
273                // rethrow or swallow
274                if (event.isBubbleException()) {
275                    throw e;
276                } else if (event.isMarkedForRollBack()) {
277                    Exception ee;
278                    if (event.getRollbackException() != null) {
279                        ee = event.getRollbackException();
280                    } else {
281                        ee = e;
282                    }
283                    // when marked for rollback, throw a generic
284                    // RuntimeException to make sure nobody catches it
285                    throw new RuntimeException(message, ee);
286                } else {
287                    // swallow exception
288                }
289            }
290        }
291
292        if (!event.isInline()) { // record the event
293            // don't record the complete event, only a shallow copy
294            ShallowEvent shallowEvent = ShallowEvent.create(event);
295            if (event.isImmediate()) {
296                EventBundleImpl b = new EventBundleImpl();
297                b.push(shallowEvent);
298                fireEventBundle(b);
299            } else {
300                recordEvent(shallowEvent);
301            }
302        }
303    }
304
305    @Override
306    public void fireEventBundle(EventBundle event) {
307        boolean comesFromJMS = false;
308
309        if (event instanceof ReconnectedEventBundle) {
310            if (((ReconnectedEventBundle) event).comesFromJMS()) {
311                comesFromJMS = true;
312            }
313        }
314
315        List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
316        List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
317
318        if (bulkModeEnabled) {
319            // run all listeners synchronously in one transaction
320            List<EventListenerDescriptor> listeners = new ArrayList<>();
321            if (!blockSyncPostCommitProcessing) {
322                listeners = postCommitSync;
323            }
324            if (!blockAsyncProcessing) {
325                listeners.addAll(postCommitAsync);
326            }
327            if (!listeners.isEmpty()) {
328                postCommitExec.runBulk(listeners, event);
329            }
330            return;
331        }
332
333        // run sync listeners
334        if (blockSyncPostCommitProcessing) {
335            log.debug("Dropping PostCommit handler execution");
336        } else if (comesFromJMS) {
337            // when called from JMS we must skip sync listeners
338            // - postComit listeners should be on the core
339            // - there is no transaction started by JMS listener
340            log.debug("Deactivating sync post-commit listener since we are called from JMS");
341        } else {
342            if (!postCommitSync.isEmpty()) {
343                postCommitExec.run(postCommitSync, event);
344            }
345        }
346
347        if (blockAsyncProcessing) {
348            log.debug("Dopping bundle");
349            return;
350        }
351
352        // fire async listeners
353        if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) {
354            log.debug("Skipping async exec, this will be triggered via JMS");
355        } else {
356            if (pipeDispatcher == null) {
357                asyncExec.run(postCommitAsync, event);
358            } else {
359                // rather than sending to the WorkManager: send to the Pipe
360                pipeDispatcher.sendEventBundle(event);
361            }
362        }
363    }
364
365    @Override
366    public void fireEventBundleSync(EventBundle event) {
367        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
368            desc.asPostCommitListener().handleEvent(event);
369        }
370        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
371            desc.asPostCommitListener().handleEvent(event);
372        }
373    }
374
375    @Override
376    public List<EventListener> getEventListeners() {
377        return listenerDescriptors.getInLineListeners();
378    }
379
380    @Override
381    public List<PostCommitEventListener> getPostCommitEventListeners() {
382        List<PostCommitEventListener> result = new ArrayList<>();
383
384        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
385        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
386
387        return result;
388    }
389
390    public EventListenerList getEventListenerList() {
391        return listenerDescriptors;
392    }
393
394    @Override
395    public EventListenerDescriptor getEventListener(String name) {
396        return listenerDescriptors.getDescriptor(name);
397    }
398
399    // methods for monitoring
400
401    @Override
402    public EventListenerList getListenerList() {
403        return listenerDescriptors;
404    }
405
406    @Override
407    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
408        if (!listenerDescriptors.hasListener(listenerName)) {
409            return;
410        }
411
412        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
413            if (desc.getName().equals(listenerName)) {
414                desc.setEnabled(enabled);
415                synchronized (this) {
416                    listenerDescriptors.recomputeEnabledListeners();
417                }
418                return;
419            }
420        }
421
422        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
423            if (desc.getName().equals(listenerName)) {
424                desc.setEnabled(enabled);
425                synchronized (this) {
426                    listenerDescriptors.recomputeEnabledListeners();
427                }
428                return;
429            }
430        }
431
432        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
433            if (desc.getName().equals(listenerName)) {
434                desc.setEnabled(enabled);
435                synchronized (this) {
436                    listenerDescriptors.recomputeEnabledListeners();
437                }
438                return;
439            }
440        }
441    }
442
443    @Override
444    public int getActiveThreadsCount() {
445        return asyncExec.getActiveCount();
446    }
447
448    @Override
449    public int getEventsInQueueCount() {
450        return asyncExec.getUnfinishedCount();
451    }
452
453    @Override
454    public boolean isBlockAsyncHandlers() {
455        return blockAsyncProcessing;
456    }
457
458    @Override
459    public boolean isBlockSyncPostCommitHandlers() {
460        return blockSyncPostCommitProcessing;
461    }
462
463    @Override
464    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
465        blockAsyncProcessing = blockAsyncHandlers;
466    }
467
468    @Override
469    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
470        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
471    }
472
473    @Override
474    public boolean isBulkModeEnabled() {
475        return bulkModeEnabled;
476    }
477
478    @Override
479    public void setBulkModeEnabled(boolean bulkModeEnabled) {
480        this.bulkModeEnabled = bulkModeEnabled;
481    }
482
483    protected void recordEvent(Event event) {
484        CompositeEventBundle b = threadBundles.get();
485        b.push(event);
486        if (TransactionHelper.isTransactionActive()) {
487            if (!b.registeredSynchronization) {
488                // register as synchronization
489                try {
490                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
491                } catch (NamingException | SystemException | RollbackException e) {
492                    throw new RuntimeException("Cannot register Synchronization", e);
493                }
494                b.registeredSynchronization = true;
495            }
496        } else if (event.isCommitEvent()) {
497            handleTxCommited();
498        }
499    }
500
501    @Override
502    public void beforeCompletion() {
503    }
504
505    @Override
506    public void afterCompletion(int status) {
507        if (status == Status.STATUS_COMMITTED) {
508            handleTxCommited();
509        } else if (status == Status.STATUS_ROLLEDBACK) {
510            handleTxRollbacked();
511        } else {
512            log.error("Unexpected afterCompletion status: " + status);
513        }
514    }
515
516    protected void handleTxRollbacked() {
517        threadBundles.remove();
518    }
519
520    protected void handleTxCommited() {
521        CompositeEventBundle b = threadBundles.get();
522        threadBundles.remove();
523
524        // notify post commit event listeners
525        for (EventBundle bundle : b.byRepository.values()) {
526            try {
527                fireEventBundle(bundle);
528            } catch (NuxeoException e) {
529                log.error("Error while processing " + bundle, e);
530            }
531        }
532    }
533
534}