001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.rmi.dgc.VMID;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CopyOnWriteArrayList;
031import java.util.stream.Collectors;
032
033import javax.naming.NamingException;
034import javax.transaction.RollbackException;
035import javax.transaction.Status;
036import javax.transaction.Synchronization;
037import javax.transaction.SystemException;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.common.logging.SequenceTracer;
042import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.core.api.RecoverableClientException;
045import org.nuxeo.ecm.core.event.Event;
046import org.nuxeo.ecm.core.event.EventBundle;
047import org.nuxeo.ecm.core.event.EventContext;
048import org.nuxeo.ecm.core.event.EventListener;
049import org.nuxeo.ecm.core.event.EventService;
050import org.nuxeo.ecm.core.event.EventServiceAdmin;
051import org.nuxeo.ecm.core.event.EventStats;
052import org.nuxeo.ecm.core.event.PostCommitEventListener;
053import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
054import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor;
055import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry;
056import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher;
057import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor;
058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry;
059import org.nuxeo.runtime.api.Framework;
060import org.nuxeo.runtime.transaction.TransactionHelper;
061
062/**
063 * Implementation of the event service.
064 */
065public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
066
067    public static final VMID VMID = new VMID();
068
069    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
070
071    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() {
072        @Override
073        protected CompositeEventBundle initialValue() {
074            return new CompositeEventBundle();
075        }
076    };
077
078    private static class CompositeEventBundle {
079
080        boolean registeredSynchronization;
081
082        final Map<String, EventBundle> byRepository = new HashMap<>();
083
084        void push(Event event) {
085            String repositoryName = event.getContext().getRepositoryName();
086            if (!byRepository.containsKey(repositoryName)) {
087                byRepository.put(repositoryName, new EventBundleImpl());
088            }
089            byRepository.get(repositoryName).push(event);
090        }
091
092    }
093
094    protected final EventListenerList listenerDescriptors;
095
096    protected PostCommitEventExecutor postCommitExec;
097
098    protected volatile AsyncEventExecutor asyncExec;
099
100    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>();
101
102    protected boolean blockAsyncProcessing = false;
103
104    protected boolean blockSyncPostCommitProcessing = false;
105
106    protected boolean bulkModeEnabled = false;
107
108    protected EventPipeRegistry registeredPipes = new EventPipeRegistry();
109
110    protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry();
111
112    protected EventBundleDispatcher pipeDispatcher;
113
114    public EventServiceImpl() {
115        listenerDescriptors = new EventListenerList();
116        postCommitExec = new PostCommitEventExecutor();
117        asyncExec = new AsyncEventExecutor();
118    }
119
120    public void init() {
121        asyncExec.init();
122
123        EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor();
124        if (dispatcherDescriptor != null) {
125            List<EventPipeDescriptor> pipes = registeredPipes.getPipes();
126            if (!pipes.isEmpty()) {
127                pipeDispatcher = dispatcherDescriptor.getInstance();
128                pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters());
129            }
130        }
131    }
132
133    public EventBundleDispatcher getEventBundleDispatcher() {
134        return pipeDispatcher;
135    }
136
137    public void shutdown(long timeoutMillis) throws InterruptedException {
138        postCommitExec.shutdown(timeoutMillis);
139        Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect(
140                Collectors.toSet());
141        if (!notTerminated.isEmpty()) {
142            throw new RuntimeException("Asynch services are still running : " + notTerminated);
143        }
144
145        if (!asyncExec.shutdown(timeoutMillis)) {
146            throw new RuntimeException("Async executor is still running, timeout expired");
147        }
148        if (pipeDispatcher != null) {
149            pipeDispatcher.shutdown();
150        }
151    }
152
153    public void registerForAsyncWait(AsyncWaitHook callback) {
154        asyncWaitHooks.add(callback);
155    }
156
157    public void unregisterForAsyncWait(AsyncWaitHook callback) {
158        asyncWaitHooks.remove(callback);
159    }
160
161    @Override
162    public void waitForAsyncCompletion() {
163        waitForAsyncCompletion(Long.MAX_VALUE);
164    }
165
166    @Override
167    public void waitForAsyncCompletion(long timeout) {
168        Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream()
169                                                        .filter(hook -> !hook.waitForAsyncCompletion())
170                                                        .collect(Collectors.toSet());
171        if (!notCompleted.isEmpty()) {
172            throw new RuntimeException("Async tasks are still running : " + notCompleted);
173        }
174        try {
175            if (!asyncExec.waitForCompletion(timeout)) {
176                throw new RuntimeException("Async event listeners thread pool is not terminated");
177            }
178        } catch (InterruptedException e) {
179            Thread.currentThread().interrupt();
180            // TODO change signature
181            throw new RuntimeException(e);
182        }
183        if (pipeDispatcher != null) {
184            try {
185                pipeDispatcher.waitForCompletion(timeout);
186            } catch (InterruptedException e) {
187                Thread.currentThread().interrupt();
188                throw new RuntimeException(e);
189            }
190        }
191    }
192
193    @Override
194    public void addEventListener(EventListenerDescriptor listener) {
195        listenerDescriptors.add(listener);
196        log.debug("Registered event listener: " + listener.getName());
197    }
198
199    public void addEventPipe(EventPipeDescriptor pipeDescriptor) {
200        registeredPipes.addContribution(pipeDescriptor);
201        log.debug("Registered event pipe: " + pipeDescriptor.getName());
202    }
203
204    public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
205        dispatchers.addContrib(dispatcherDescriptor);
206        log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName());
207    }
208
209    @Override
210    public void removeEventListener(EventListenerDescriptor listener) {
211        listenerDescriptors.removeDescriptor(listener);
212        log.debug("Unregistered event listener: " + listener.getName());
213    }
214
215    public void removeEventPipe(EventPipeDescriptor pipeDescriptor) {
216        registeredPipes.removeContribution(pipeDescriptor);
217        log.debug("Unregistered event pipe: " + pipeDescriptor.getName());
218    }
219
220    public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) {
221        dispatchers.removeContrib(dispatcherDescriptor);
222        log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName());
223    }
224
225    @Override
226    public void fireEvent(String name, EventContext context) {
227        fireEvent(new EventImpl(name, context));
228    }
229
230    @Override
231    public void fireEvent(Event event) {
232
233        String ename = event.getName();
234        EventStats stats = Framework.getService(EventStats.class);
235        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) {
236            if (!desc.acceptEvent(ename)) {
237                continue;
238            }
239            try {
240                long t0 = System.currentTimeMillis();
241                SequenceTracer.start("Fire sync event " + event.getName());
242                desc.asEventListener().handleEvent(event);
243                long elapsed = System.currentTimeMillis() - t0;
244                SequenceTracer.stop("done in " + elapsed + " ms");
245                if (stats != null) {
246                    stats.logSyncExec(desc, elapsed);
247                }
248                if (event.isCanceled()) {
249                    // break loop
250                    return;
251                }
252            } catch (ConcurrentUpdateException e) {
253                // never swallow ConcurrentUpdateException
254                throw e;
255            } catch (RuntimeException e) {
256                // get message
257                SequenceTracer.destroy("failure");
258                String message = "Exception during " + desc.getName() + " sync listener execution, ";
259                if (event.isBubbleException()) {
260                    message += "other listeners will be ignored";
261                } else if (event.isMarkedForRollBack()) {
262                    message += "transaction will be rolled back";
263                    if (event.getRollbackMessage() != null) {
264                        message += " (" + event.getRollbackMessage() + ")";
265                    }
266                } else {
267                    message += "continuing to run other listeners";
268                }
269                // log
270                if (e instanceof RecoverableClientException) {
271                    log.info(message + "\n" + e.getMessage());
272                    log.debug(message, e);
273                } else {
274                    log.error(message, e);
275                }
276                // rethrow or swallow
277                if (TransactionHelper.isTransactionMarkedRollback()) {
278                    throw e;
279                } else if (event.isBubbleException()) {
280                    throw e;
281                } else if (event.isMarkedForRollBack()) {
282                    Exception ee;
283                    if (event.getRollbackException() != null) {
284                        ee = event.getRollbackException();
285                    } else {
286                        ee = e;
287                    }
288                    // when marked for rollback, throw a generic
289                    // RuntimeException to make sure nobody catches it
290                    throw new RuntimeException(message, ee);
291                } else {
292                    // swallow exception
293                }
294            }
295        }
296
297        if (!event.isInline()) { // record the event
298            // don't record the complete event, only a shallow copy
299            ShallowEvent shallowEvent = ShallowEvent.create(event);
300            if (event.isImmediate()) {
301                EventBundleImpl b = new EventBundleImpl();
302                b.push(shallowEvent);
303                fireEventBundle(b);
304            } else {
305                recordEvent(shallowEvent);
306            }
307        }
308    }
309
310    @Override
311    public void fireEventBundle(EventBundle event) {
312        List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
313        List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
314
315        if (bulkModeEnabled) {
316            // run all listeners synchronously in one transaction
317            List<EventListenerDescriptor> listeners = new ArrayList<>();
318            if (!blockSyncPostCommitProcessing) {
319                listeners = postCommitSync;
320            }
321            if (!blockAsyncProcessing) {
322                listeners.addAll(postCommitAsync);
323            }
324            if (!listeners.isEmpty()) {
325                postCommitExec.runBulk(listeners, event);
326            }
327            return;
328        }
329
330        // run sync listeners
331        if (blockSyncPostCommitProcessing) {
332            log.debug("Dropping PostCommit handler execution");
333        } else {
334            if (!postCommitSync.isEmpty()) {
335                postCommitExec.run(postCommitSync, event);
336            }
337        }
338
339        if (blockAsyncProcessing) {
340            log.debug("Dopping bundle");
341            return;
342        }
343
344        // fire async listeners
345        if (pipeDispatcher == null) {
346            asyncExec.run(postCommitAsync, event);
347        } else {
348            // rather than sending to the WorkManager: send to the Pipe
349            pipeDispatcher.sendEventBundle(event);
350        }
351    }
352
353    @Override
354    public void fireEventBundleSync(EventBundle event) {
355        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) {
356            desc.asPostCommitListener().handleEvent(event);
357        }
358        for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) {
359            desc.asPostCommitListener().handleEvent(event);
360        }
361    }
362
363    @Override
364    public List<EventListener> getEventListeners() {
365        return listenerDescriptors.getInLineListeners();
366    }
367
368    @Override
369    public List<PostCommitEventListener> getPostCommitEventListeners() {
370        List<PostCommitEventListener> result = new ArrayList<>();
371
372        result.addAll(listenerDescriptors.getSyncPostCommitListeners());
373        result.addAll(listenerDescriptors.getAsyncPostCommitListeners());
374
375        return result;
376    }
377
378    public EventListenerList getEventListenerList() {
379        return listenerDescriptors;
380    }
381
382    @Override
383    public EventListenerDescriptor getEventListener(String name) {
384        return listenerDescriptors.getDescriptor(name);
385    }
386
387    // methods for monitoring
388
389    @Override
390    public EventListenerList getListenerList() {
391        return listenerDescriptors;
392    }
393
394    @Override
395    public void setListenerEnabledFlag(String listenerName, boolean enabled) {
396        if (!listenerDescriptors.hasListener(listenerName)) {
397            return;
398        }
399
400        for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
401            if (desc.getName().equals(listenerName)) {
402                desc.setEnabled(enabled);
403                synchronized (this) {
404                    listenerDescriptors.recomputeEnabledListeners();
405                }
406                return;
407            }
408        }
409
410        for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
411            if (desc.getName().equals(listenerName)) {
412                desc.setEnabled(enabled);
413                synchronized (this) {
414                    listenerDescriptors.recomputeEnabledListeners();
415                }
416                return;
417            }
418        }
419
420        for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) {
421            if (desc.getName().equals(listenerName)) {
422                desc.setEnabled(enabled);
423                synchronized (this) {
424                    listenerDescriptors.recomputeEnabledListeners();
425                }
426                return;
427            }
428        }
429    }
430
431    @Override
432    public int getActiveThreadsCount() {
433        return asyncExec.getActiveCount();
434    }
435
436    @Override
437    public int getEventsInQueueCount() {
438        return asyncExec.getUnfinishedCount();
439    }
440
441    @Override
442    public boolean isBlockAsyncHandlers() {
443        return blockAsyncProcessing;
444    }
445
446    @Override
447    public boolean isBlockSyncPostCommitHandlers() {
448        return blockSyncPostCommitProcessing;
449    }
450
451    @Override
452    public void setBlockAsyncHandlers(boolean blockAsyncHandlers) {
453        blockAsyncProcessing = blockAsyncHandlers;
454    }
455
456    @Override
457    public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) {
458        blockSyncPostCommitProcessing = blockSyncPostComitHandlers;
459    }
460
461    @Override
462    public boolean isBulkModeEnabled() {
463        return bulkModeEnabled;
464    }
465
466    @Override
467    public void setBulkModeEnabled(boolean bulkModeEnabled) {
468        this.bulkModeEnabled = bulkModeEnabled;
469    }
470
471    protected void recordEvent(Event event) {
472        CompositeEventBundle b = threadBundles.get();
473        b.push(event);
474        if (TransactionHelper.isTransactionActive()) {
475            if (!b.registeredSynchronization) {
476                // register as synchronization
477                try {
478                    TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
479                } catch (NamingException | SystemException | RollbackException e) {
480                    throw new RuntimeException("Cannot register Synchronization", e);
481                }
482                b.registeredSynchronization = true;
483            }
484        } else if (event.isCommitEvent()) {
485            handleTxCommited();
486        }
487    }
488
489    @Override
490    public void beforeCompletion() {
491    }
492
493    @Override
494    public void afterCompletion(int status) {
495        if (status == Status.STATUS_COMMITTED) {
496            handleTxCommited();
497        } else if (status == Status.STATUS_ROLLEDBACK) {
498            handleTxRollbacked();
499        } else {
500            log.error("Unexpected afterCompletion status: " + status);
501        }
502    }
503
504    protected void handleTxRollbacked() {
505        threadBundles.remove();
506    }
507
508    protected void handleTxCommited() {
509        CompositeEventBundle b = threadBundles.get();
510        threadBundles.remove();
511
512        // notify post commit event listeners
513        for (EventBundle bundle : b.byRepository.values()) {
514            try {
515                fireEventBundle(bundle);
516            } catch (NuxeoException e) {
517                log.error("Error while processing " + bundle, e);
518            }
519        }
520    }
521
522}