001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.event.impl;
021
022import java.util.List;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.FutureTask;
027import java.util.concurrent.RejectedExecutionException;
028import java.util.concurrent.SynchronousQueue;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.common.logging.SequenceTracer;
038import org.nuxeo.ecm.core.event.EventBundle;
039import org.nuxeo.ecm.core.event.EventStats;
040import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.transaction.TransactionHelper;
043
044/**
045 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage
046 * transactions).
047 * <p>
048 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners.
049 */
050public class PostCommitEventExecutor {
051
052    private static final Log log = LogFactory.getLog(PostCommitEventExecutor.class);
053
054    public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs";
055
056    public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s
057
058    public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min
059
060    private Integer defaultTimeoutMs;
061
062    public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min
063
064    public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout";
065
066    private static final long KEEP_ALIVE_TIME_SECOND = 10;
067
068    private static final int MAX_POOL_SIZE = 100;
069
070    protected final ExecutorService executor;
071
072    /**
073     * Creates non-daemon threads at normal priority.
074     */
075    private static class NamedThreadFactory implements ThreadFactory {
076
077        private final AtomicInteger threadNumber = new AtomicInteger();
078
079        private final ThreadGroup group;
080
081        private final String prefix;
082
083        public NamedThreadFactory(String prefix) {
084            SecurityManager sm = System.getSecurityManager();
085            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
086            this.prefix = prefix;
087        }
088
089        @Override
090        public Thread newThread(Runnable r) {
091            String name = prefix + threadNumber.incrementAndGet();
092            Thread thread = new Thread(group, r, name);
093            // do not set daemon
094            thread.setPriority(Thread.NORM_PRIORITY);
095            return thread;
096        }
097    }
098
099    public PostCommitEventExecutor() {
100        // use as much thread as needed up to MAX_POOL_SIZE
101        // keep them alive a moment for reuse
102        // have all threads torn down when there is no work to do
103        ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-");
104        executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS,
105                new SynchronousQueue<Runnable>(), threadFactory);
106        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
107    }
108
109    protected int getDefaultTimeoutMs() {
110        if (defaultTimeoutMs == null) {
111            if (Framework.getProperty(TIMEOUT_MS_PROP) != null) {
112                defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP));
113            } else if (Framework.isTestModeSet()) {
114                defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS;
115            } else {
116                defaultTimeoutMs = DEFAULT_TIMEOUT_MS;
117            }
118        }
119        return defaultTimeoutMs;
120    }
121
122    public void shutdown(long timeoutMillis) throws InterruptedException {
123        executor.shutdown();
124        executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
125        if (!executor.isTerminated()) {
126            executor.shutdownNow();
127        }
128    }
129
130    public void run(List<EventListenerDescriptor> listeners, EventBundle event) {
131        run(listeners, event, getDefaultTimeoutMs(), false);
132    }
133
134    public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) {
135        String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S);
136        run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true);
137    }
138
139    public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) {
140        // check that there's at list one listener interested
141        boolean some = false;
142        for (EventListenerDescriptor listener : listeners) {
143            if (listener.acceptBundle(bundle)) {
144                some = true;
145                break;
146            }
147        }
148        if (!some) {
149            if (log.isDebugEnabled()) {
150                log.debug("Events postcommit execution has nothing to do");
151            }
152            return;
153        }
154
155        if (log.isDebugEnabled()) {
156            log.debug(String.format("Events postcommit execution starting with timeout %sms%s",
157                    Long.valueOf(timeoutMillis), bulk ? " in bulk mode" : ""));
158        }
159
160        Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) : new EventBundleBulkRunner(
161                listeners, bundle);
162        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(callable);
163        try {
164            executor.execute(futureTask);
165        } catch (RejectedExecutionException e) {
166            log.error("Events postcommit execution rejected", e);
167            return;
168        }
169        try {
170            // wait for runner to be finished, with timeout
171            Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS);
172            if (Boolean.FALSE.equals(ok)) {
173                log.error("Events postcommit bulk execution aborted due to previous error");
174            }
175        } catch (InterruptedException e) {
176            // restore interrupted status
177            Thread.currentThread().interrupt();
178            // interrupt thread
179            futureTask.cancel(true); // mayInterruptIfRunning=true
180        } catch (TimeoutException e) {
181            if (!bulk) {
182                log.warn(String.format("Events postcommit execution exceeded timeout of %sms, leaving thread running",
183                        Long.valueOf(timeoutMillis)));
184                // don't cancel task, let it run
185            } else {
186                log.error(String.format(
187                        "Events postcommit bulk execution exceeded timeout of %sms, interrupting thread",
188                        Long.valueOf(timeoutMillis)));
189                futureTask.cancel(true); // mayInterruptIfRunning=true
190            }
191        } catch (ExecutionException e) {
192            log.error("Events postcommit execution encountered unexpected exception", e.getCause());
193        }
194
195        if (log.isDebugEnabled()) {
196            log.debug("Events postcommit execution finished");
197        }
198    }
199
200    /**
201     * Lets the listeners process the event bundle.
202     * <p>
203     * For each listener, the event bundle is reconnected to a session and a transaction is started.
204     * <p>
205     * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for
206     * the other listeners.
207     * <p>
208     * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left
209     * running separately from the main thread that initiated post-commit processing).
210     */
211    protected static class EventBundleRunner implements Callable<Boolean> {
212
213        protected final List<EventListenerDescriptor> listeners;
214
215        protected final EventBundle bundle;
216
217        protected String callerThread;
218
219        public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
220            this.listeners = listeners;
221            this.bundle = bundle;
222            callerThread = SequenceTracer.getThreadName();
223        }
224
225        @Override
226        public Boolean call() {
227            if (log.isDebugEnabled()) {
228                log.debug("Events postcommit execution starting in thread: " + Thread.currentThread().getName());
229            }
230            SequenceTracer.startFrom(callerThread, "Postcommit", "#ff410f");
231            long t0 = System.currentTimeMillis();
232            EventStats stats = Framework.getLocalService(EventStats.class);
233
234            for (EventListenerDescriptor listener : listeners) {
235                EventBundle filtered = listener.filterBundle(bundle);
236                if (filtered.isEmpty()) {
237                    continue;
238                }
239                if (log.isDebugEnabled()) {
240                    log.debug("Events postcommit execution start for listener: " + listener.getName());
241                }
242                SequenceTracer.start("run listener " + listener.getName());
243                long t1 = System.currentTimeMillis();
244
245                boolean ok = false;
246                ReconnectedEventBundle reconnected = null;
247                // transaction timeout is managed by the FutureTask
248                boolean tx = TransactionHelper.startTransaction();
249                try {
250                    reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString());
251
252                    listener.asPostCommitListener().handleEvent(reconnected);
253
254                    if (Thread.currentThread().isInterrupted()) {
255                        log.error("Events postcommit execution interrupted for listener: " + listener.getName());
256                        SequenceTracer.destroy("interrupted");
257                        ok = false;
258                    } else {
259                        ok = true;
260                    }
261                } catch (RuntimeException e) {
262                    log.error("Events postcommit execution encountered exception for listener: " + listener.getName(),
263                            e);
264                    // don't rethrow, but rollback (ok=false) and continue loop
265                } finally {
266                    try {
267                        if (reconnected != null) {
268                            reconnected.disconnect();
269                        }
270                    } finally {
271                        if (tx) {
272                            if (!ok) {
273                                TransactionHelper.setTransactionRollbackOnly();
274                                log.error("Rolling back transaction");
275                            }
276                            TransactionHelper.commitOrRollbackTransaction();
277                        }
278                        long elapsed = System.currentTimeMillis() - t1;
279                        if (stats != null) {
280                            stats.logAsyncExec(listener, elapsed);
281                        }
282                        if (log.isDebugEnabled()) {
283                            log.debug("Events postcommit execution end for listener: " + listener.getName() + " in "
284                                    + elapsed + "ms");
285                        }
286                        SequenceTracer.stop("listener done " + elapsed + " ms");
287                    }
288                }
289                // even if interrupted due to timeout, we continue the loop
290            }
291            long elapsed = System.currentTimeMillis() - t0;
292            if (log.isDebugEnabled()) {
293                log.debug("Events postcommit execution finished in " + elapsed + "ms");
294            }
295            SequenceTracer.stop("postcommit done" + elapsed + " ms");
296            return Boolean.TRUE; // no error to report
297        }
298    }
299
300    /**
301     * Lets the listeners process the event bundle in bulk mode.
302     * <p>
303     * The event bundle is reconnected to a single session and a single transaction is started for all the listeners.
304     * <p>
305     * In case of exception in a listener, the transaction is rolled back and processing stops.
306     * <p>
307     * In case of timeout, the transaction is rolled back and processing stops.
308     */
309    protected static class EventBundleBulkRunner implements Callable<Boolean> {
310
311        protected final List<EventListenerDescriptor> listeners;
312
313        protected final EventBundle bundle;
314        protected final String callerThread;
315
316        public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
317            this.listeners = listeners;
318            this.bundle = bundle;
319            callerThread = SequenceTracer.getThreadName();
320        }
321
322        @Override
323        public Boolean call() {
324            SequenceTracer.startFrom(callerThread, "BulkPostcommit", "#ff410f");
325            if (log.isDebugEnabled()) {
326                log.debug("Events postcommit bulk execution starting in thread: " + Thread.currentThread().getName());
327            }
328            long t0 = System.currentTimeMillis();
329
330            boolean ok = false;
331            boolean interrupt = false;
332            ReconnectedEventBundle reconnected = null;
333            // transaction timeout is managed by the FutureTask
334            boolean tx = TransactionHelper.startTransaction();
335            try {
336                reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString());
337                for (EventListenerDescriptor listener : listeners) {
338                    EventBundle filtered = listener.filterBundle(reconnected);
339                    if (filtered.isEmpty()) {
340                        continue;
341                    }
342                    SequenceTracer.start("run listener " + listener.getName());
343                    if (log.isDebugEnabled()) {
344                        log.debug("Events postcommit bulk execution start for listener: " + listener.getName());
345                    }
346                    long t1 = System.currentTimeMillis();
347                    try {
348
349                        listener.asPostCommitListener().handleEvent(filtered);
350
351                        if (Thread.currentThread().isInterrupted()) {
352                            log.error("Events postcommit bulk execution interrupted for listener: "
353                                    + listener.getName() + ", will rollback and abort bulk processing");
354                            interrupt = true;
355                        }
356                    } catch (RuntimeException e) {
357                        log.error(
358                                "Events postcommit bulk execution encountered exception for listener: "
359                                        + listener.getName(), e);
360                        return Boolean.FALSE; // report error
361                    } finally {
362                        long elapsed = System.currentTimeMillis() - t1;
363                        if (log.isDebugEnabled()) {
364                            log.debug("Events postcommit bulk execution end for listener: " + listener.getName()
365                                    + " in " + elapsed + "ms");
366                        }
367                        SequenceTracer.stop("listener done " + elapsed + " ms");
368                    }
369                    if (interrupt) {
370                        break;
371                    }
372                }
373                ok = !interrupt;
374            } finally {
375                try {
376                    if (reconnected != null) {
377                        reconnected.disconnect();
378                    }
379                } finally {
380                    if (tx) {
381                        if (!ok) {
382                            TransactionHelper.setTransactionRollbackOnly();
383                            log.error("Rolling back transaction");
384                        }
385                        TransactionHelper.commitOrRollbackTransaction();
386                    }
387                }
388                long elapsed = System.currentTimeMillis() - t0;
389                SequenceTracer.stop("BulkPostcommit done " + elapsed + " ms");
390                if (log.isDebugEnabled()) {
391                    log.debug("Events postcommit bulk execution finished in " + elapsed + "ms");
392                }
393            }
394            return Boolean.TRUE; // no error to report
395        }
396    }
397}