001/*
002 * Copyright (c) 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Thierry Delprat
011 *     Florent Guillaume
012 */
013package org.nuxeo.ecm.core.event.impl;
014
015import java.util.List;
016import java.util.concurrent.Callable;
017import java.util.concurrent.ExecutionException;
018import java.util.concurrent.ExecutorService;
019import java.util.concurrent.FutureTask;
020import java.util.concurrent.RejectedExecutionException;
021import java.util.concurrent.SynchronousQueue;
022import java.util.concurrent.ThreadFactory;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.ecm.core.event.EventBundle;
031import org.nuxeo.ecm.core.event.EventStats;
032import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.transaction.TransactionHelper;
035
036/**
037 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage
038 * transactions).
039 * <p>
040 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners.
041 */
042public class PostCommitEventExecutor {
043
044    private static final Log log = LogFactory.getLog(PostCommitEventExecutor.class);
045
046    public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs";
047
048    public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s
049
050    public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min
051
052    private Integer defaultTimeoutMs;
053
054    public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min
055
056    public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout";
057
058    private static final long KEEP_ALIVE_TIME_SECOND = 10;
059
060    private static final int MAX_POOL_SIZE = 100;
061
062    protected final ExecutorService executor;
063
064    /**
065     * Creates non-daemon threads at normal priority.
066     */
067    private static class NamedThreadFactory implements ThreadFactory {
068
069        private final AtomicInteger threadNumber = new AtomicInteger();
070
071        private final ThreadGroup group;
072
073        private final String prefix;
074
075        public NamedThreadFactory(String prefix) {
076            SecurityManager sm = System.getSecurityManager();
077            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
078            this.prefix = prefix;
079        }
080
081        @Override
082        public Thread newThread(Runnable r) {
083            String name = prefix + threadNumber.incrementAndGet();
084            Thread thread = new Thread(group, r, name);
085            // do not set daemon
086            thread.setPriority(Thread.NORM_PRIORITY);
087            return thread;
088        }
089    }
090
091    public PostCommitEventExecutor() {
092        // use as much thread as needed up to MAX_POOL_SIZE
093        // keep them alive a moment for reuse
094        // have all threads torn down when there is no work to do
095        ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-");
096        executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS,
097                new SynchronousQueue<Runnable>(), threadFactory);
098        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
099    }
100
101    protected int getDefaultTimeoutMs() {
102        if (defaultTimeoutMs == null) {
103            if (Framework.getProperty(TIMEOUT_MS_PROP) != null) {
104                defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP));
105            } else if (Framework.isTestModeSet()) {
106                defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS;
107            } else {
108                defaultTimeoutMs = DEFAULT_TIMEOUT_MS;
109            }
110        }
111        return defaultTimeoutMs;
112    }
113
114    public void shutdown(long timeoutMillis) throws InterruptedException {
115        executor.shutdown();
116        executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
117        if (!executor.isTerminated()) {
118            executor.shutdownNow();
119        }
120    }
121
122    public void run(List<EventListenerDescriptor> listeners, EventBundle event) {
123        run(listeners, event, getDefaultTimeoutMs(), false);
124    }
125
126    public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) {
127        String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S);
128        run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true);
129    }
130
131    public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) {
132        // check that there's at list one listener interested
133        boolean some = false;
134        for (EventListenerDescriptor listener : listeners) {
135            if (listener.acceptBundle(bundle)) {
136                some = true;
137                break;
138            }
139        }
140        if (!some) {
141            if (log.isDebugEnabled()) {
142                log.debug("Events postcommit execution has nothing to do");
143            }
144            return;
145        }
146
147        if (log.isDebugEnabled()) {
148            log.debug(String.format("Events postcommit execution starting with timeout %sms%s",
149                    Long.valueOf(timeoutMillis), bulk ? " in bulk mode" : ""));
150        }
151
152        Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) : new EventBundleBulkRunner(
153                listeners, bundle);
154        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(callable);
155        try {
156            executor.execute(futureTask);
157        } catch (RejectedExecutionException e) {
158            log.error("Events postcommit execution rejected", e);
159            return;
160        }
161        try {
162            // wait for runner to be finished, with timeout
163            Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS);
164            if (Boolean.FALSE.equals(ok)) {
165                log.error("Events postcommit bulk execution aborted due to previous error");
166            }
167        } catch (InterruptedException e) {
168            // restore interrupted status
169            Thread.currentThread().interrupt();
170            // interrupt thread
171            futureTask.cancel(true); // mayInterruptIfRunning=true
172        } catch (TimeoutException e) {
173            if (!bulk) {
174                log.warn(String.format("Events postcommit execution exceeded timeout of %sms, leaving thread running",
175                        Long.valueOf(timeoutMillis)));
176                // don't cancel task, let it run
177            } else {
178                log.error(String.format(
179                        "Events postcommit bulk execution exceeded timeout of %sms, interrupting thread",
180                        Long.valueOf(timeoutMillis)));
181                futureTask.cancel(true); // mayInterruptIfRunning=true
182            }
183        } catch (ExecutionException e) {
184            log.error("Events postcommit execution encountered unexpected exception", e.getCause());
185        }
186
187        if (log.isDebugEnabled()) {
188            log.debug("Events postcommit execution finished");
189        }
190    }
191
192    /**
193     * Lets the listeners process the event bundle.
194     * <p>
195     * For each listener, the event bundle is reconnected to a session and a transaction is started.
196     * <p>
197     * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for
198     * the other listeners.
199     * <p>
200     * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left
201     * running separately from the main thread that initiated post-commit processing).
202     */
203    protected static class EventBundleRunner implements Callable<Boolean> {
204
205        protected final List<EventListenerDescriptor> listeners;
206
207        protected final EventBundle bundle;
208
209        public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
210            this.listeners = listeners;
211            this.bundle = bundle;
212        }
213
214        @Override
215        public Boolean call() {
216            if (log.isDebugEnabled()) {
217                log.debug("Events postcommit execution starting in thread: " + Thread.currentThread().getName());
218            }
219            long t0 = System.currentTimeMillis();
220            EventStats stats = Framework.getLocalService(EventStats.class);
221
222            for (EventListenerDescriptor listener : listeners) {
223                EventBundle filtered = listener.filterBundle(bundle);
224                if (filtered.isEmpty()) {
225                    continue;
226                }
227                if (log.isDebugEnabled()) {
228                    log.debug("Events postcommit execution start for listener: " + listener.getName());
229                }
230                long t1 = System.currentTimeMillis();
231
232                boolean ok = false;
233                ReconnectedEventBundle reconnected = null;
234                // transaction timeout is managed by the FutureTask
235                boolean tx = TransactionHelper.startTransaction();
236                try {
237                    reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString());
238
239                    listener.asPostCommitListener().handleEvent(reconnected);
240
241                    if (Thread.currentThread().isInterrupted()) {
242                        log.error("Events postcommit execution interrupted for listener: " + listener.getName());
243                        ok = false;
244                    } else {
245                        ok = true;
246                    }
247                } catch (RuntimeException e) {
248                    log.error("Events postcommit execution encountered exception for listener: " + listener.getName(),
249                            e);
250                    // don't rethrow, but rollback (ok=false) and continue loop
251                } finally {
252                    try {
253                        if (reconnected != null) {
254                            reconnected.disconnect();
255                        }
256                    } finally {
257                        if (tx) {
258                            if (!ok) {
259                                TransactionHelper.setTransactionRollbackOnly();
260                                log.error("Rolling back transaction");
261                            }
262                            TransactionHelper.commitOrRollbackTransaction();
263                        }
264                        if (stats != null) {
265                            stats.logAsyncExec(listener, System.currentTimeMillis() - t1);
266                        }
267                        if (log.isDebugEnabled()) {
268                            log.debug("Events postcommit execution end for listener: " + listener.getName() + " in "
269                                    + (System.currentTimeMillis() - t1) + "ms");
270                        }
271                    }
272                }
273                // even if interrupted due to timeout, we continue the loop
274            }
275            if (log.isDebugEnabled()) {
276                log.debug("Events postcommit execution finished in " + (System.currentTimeMillis() - t0) + "ms");
277            }
278            return Boolean.TRUE; // no error to report
279        }
280    }
281
282    /**
283     * Lets the listeners process the event bundle in bulk mode.
284     * <p>
285     * The event bundle is reconnected to a single session and a single transaction is started for all the listeners.
286     * <p>
287     * In case of exception in a listener, the transaction is rolled back and processing stops.
288     * <p>
289     * In case of timeout, the transaction is rolled back and processing stops.
290     */
291    protected static class EventBundleBulkRunner implements Callable<Boolean> {
292
293        protected final List<EventListenerDescriptor> listeners;
294
295        protected final EventBundle bundle;
296
297        public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
298            this.listeners = listeners;
299            this.bundle = bundle;
300        }
301
302        @Override
303        public Boolean call() {
304            if (log.isDebugEnabled()) {
305                log.debug("Events postcommit bulk execution starting in thread: " + Thread.currentThread().getName());
306            }
307            long t0 = System.currentTimeMillis();
308
309            boolean ok = false;
310            boolean interrupt = false;
311            ReconnectedEventBundle reconnected = null;
312            // transaction timeout is managed by the FutureTask
313            boolean tx = TransactionHelper.startTransaction();
314            try {
315                reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString());
316                for (EventListenerDescriptor listener : listeners) {
317                    EventBundle filtered = listener.filterBundle(reconnected);
318                    if (filtered.isEmpty()) {
319                        continue;
320                    }
321                    if (log.isDebugEnabled()) {
322                        log.debug("Events postcommit bulk execution start for listener: " + listener.getName());
323                    }
324                    long t1 = System.currentTimeMillis();
325                    try {
326
327                        listener.asPostCommitListener().handleEvent(filtered);
328
329                        if (Thread.currentThread().isInterrupted()) {
330                            log.error("Events postcommit bulk execution interrupted for listener: "
331                                    + listener.getName() + ", will rollback and abort bulk processing");
332                            interrupt = true;
333                        }
334                    } catch (RuntimeException e) {
335                        log.error(
336                                "Events postcommit bulk execution encountered exception for listener: "
337                                        + listener.getName(), e);
338                        return Boolean.FALSE; // report error
339                    } finally {
340                        if (log.isDebugEnabled()) {
341                            log.debug("Events postcommit bulk execution end for listener: " + listener.getName()
342                                    + " in " + (System.currentTimeMillis() - t1) + "ms");
343                        }
344                    }
345                    if (interrupt) {
346                        break;
347                    }
348                }
349                ok = !interrupt;
350            } finally {
351                try {
352                    if (reconnected != null) {
353                        reconnected.disconnect();
354                    }
355                } finally {
356                    if (tx) {
357                        if (!ok) {
358                            TransactionHelper.setTransactionRollbackOnly();
359                            log.error("Rolling back transaction");
360                        }
361                        TransactionHelper.commitOrRollbackTransaction();
362                    }
363                }
364                if (log.isDebugEnabled()) {
365                    log.debug("Events postcommit bulk execution finished in " + (System.currentTimeMillis() - t0)
366                            + "ms");
367                }
368            }
369            return Boolean.TRUE; // no error to report
370        }
371    }
372}