001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.event.impl;
021
022import java.util.List;
023import java.util.concurrent.Callable;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.FutureTask;
027import java.util.concurrent.RejectedExecutionException;
028import java.util.concurrent.SynchronousQueue;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.common.logging.SequenceTracer;
038import org.nuxeo.ecm.core.event.EventBundle;
039import org.nuxeo.ecm.core.event.EventStats;
040import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.transaction.TransactionHelper;
043
044/**
045 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage
046 * transactions).
047 * <p>
048 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners.
049 */
050public class PostCommitEventExecutor {
051
052    private static final Log log = LogFactory.getLog(PostCommitEventExecutor.class);
053
054    public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs";
055
056    public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s
057
058    public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min
059
060    private Integer defaultTimeoutMs;
061
062    public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min
063
064    public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout";
065
066    private static final long KEEP_ALIVE_TIME_SECOND = 10;
067
068    private static final int MAX_POOL_SIZE = 100;
069
070    protected final ExecutorService executor;
071
072    /**
073     * Creates non-daemon threads at normal priority.
074     */
075    private static class NamedThreadFactory implements ThreadFactory {
076
077        private final AtomicInteger threadNumber = new AtomicInteger();
078
079        private final ThreadGroup group;
080
081        private final String prefix;
082
083        public NamedThreadFactory(String prefix) {
084            SecurityManager sm = System.getSecurityManager();
085            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
086            this.prefix = prefix;
087        }
088
089        @Override
090        public Thread newThread(Runnable r) {
091            String name = prefix + threadNumber.incrementAndGet();
092            Thread thread = new Thread(group, r, name);
093            // do not set daemon
094            thread.setPriority(Thread.NORM_PRIORITY);
095            return thread;
096        }
097    }
098
099    public PostCommitEventExecutor() {
100        // use as much thread as needed up to MAX_POOL_SIZE
101        // keep them alive a moment for reuse
102        // have all threads torn down when there is no work to do
103        ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-");
104        executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS,
105                new SynchronousQueue<Runnable>(), threadFactory);
106        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
107    }
108
109    protected int getDefaultTimeoutMs() {
110        if (defaultTimeoutMs == null) {
111            if (Framework.getProperty(TIMEOUT_MS_PROP) != null) {
112                defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP));
113            } else if (Framework.isTestModeSet()) {
114                defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS;
115            } else {
116                defaultTimeoutMs = DEFAULT_TIMEOUT_MS;
117            }
118        }
119        return defaultTimeoutMs;
120    }
121
122    public void shutdown(long timeoutMillis) throws InterruptedException {
123        executor.shutdown();
124        executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
125        if (!executor.isTerminated()) {
126            executor.shutdownNow();
127        }
128    }
129
130    public void run(List<EventListenerDescriptor> listeners, EventBundle event) {
131        run(listeners, event, getDefaultTimeoutMs(), false);
132    }
133
134    public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) {
135        String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S);
136        run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true);
137    }
138
139    public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) {
140        // check that there's at list one listener interested
141        boolean some = false;
142        for (EventListenerDescriptor listener : listeners) {
143            if (listener.acceptBundle(bundle)) {
144                some = true;
145                break;
146            }
147        }
148        if (!some) {
149            if (log.isDebugEnabled()) {
150                log.debug("Events postcommit execution has nothing to do");
151            }
152            return;
153        }
154
155        if (log.isDebugEnabled()) {
156            log.debug(String.format("Events postcommit execution starting with timeout %sms%s",
157                    Long.valueOf(timeoutMillis), bulk ? " in bulk mode" : ""));
158        }
159
160        Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) : new EventBundleBulkRunner(
161                listeners, bundle);
162        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(callable);
163        try {
164            executor.execute(futureTask);
165        } catch (RejectedExecutionException e) {
166            log.error("Events postcommit execution rejected", e);
167            return;
168        }
169        try {
170            // wait for runner to be finished, with timeout
171            Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS);
172            if (Boolean.FALSE.equals(ok)) {
173                log.error("Events postcommit bulk execution aborted due to previous error");
174            }
175        } catch (InterruptedException e) {
176            Thread.currentThread().interrupt();
177            // interrupt thread
178            futureTask.cancel(true); // mayInterruptIfRunning=true
179        } catch (TimeoutException e) {
180            if (!bulk) {
181                log.info(String.format("Events postcommit execution exceeded timeout of %sms, leaving thread running",
182                        Long.valueOf(timeoutMillis)));
183                // don't cancel task, let it run
184            } else {
185                log.error(String.format(
186                        "Events postcommit bulk execution exceeded timeout of %sms, interrupting thread",
187                        Long.valueOf(timeoutMillis)));
188                futureTask.cancel(true); // mayInterruptIfRunning=true
189            }
190        } catch (ExecutionException e) {
191            log.error("Events postcommit execution encountered unexpected exception", e.getCause());
192        }
193
194        if (log.isDebugEnabled()) {
195            log.debug("Events postcommit execution finished");
196        }
197    }
198
199    /**
200     * Lets the listeners process the event bundle.
201     * <p>
202     * For each listener, the event bundle is reconnected to a session and a transaction is started.
203     * <p>
204     * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for
205     * the other listeners.
206     * <p>
207     * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left
208     * running separately from the main thread that initiated post-commit processing).
209     */
210    protected static class EventBundleRunner implements Callable<Boolean> {
211
212        protected final List<EventListenerDescriptor> listeners;
213
214        protected final EventBundle bundle;
215
216        protected String callerThread;
217
218        public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
219            this.listeners = listeners;
220            this.bundle = bundle;
221            callerThread = SequenceTracer.getThreadName();
222        }
223
224        @Override
225        public Boolean call() {
226            if (log.isDebugEnabled()) {
227                log.debug("Events postcommit execution starting in thread: " + Thread.currentThread().getName());
228            }
229            SequenceTracer.startFrom(callerThread, "Postcommit", "#ff410f");
230            long t0 = System.currentTimeMillis();
231            EventStats stats = Framework.getService(EventStats.class);
232
233            for (EventListenerDescriptor listener : listeners) {
234                EventBundle filtered = listener.filterBundle(bundle);
235                if (filtered.isEmpty()) {
236                    continue;
237                }
238                if (log.isDebugEnabled()) {
239                    log.debug("Events postcommit execution start for listener: " + listener.getName());
240                }
241                SequenceTracer.start("run listener " + listener.getName());
242                long t1 = System.currentTimeMillis();
243
244                boolean ok = false;
245                ReconnectedEventBundle reconnected = null;
246                // transaction timeout is managed by the FutureTask
247                boolean tx = TransactionHelper.startTransaction();
248                try {
249                    reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString());
250
251                    listener.asPostCommitListener().handleEvent(reconnected);
252
253                    ok = true;
254                    // don't check for interrupted flag, the event completed normally, no reason to rollback
255                } catch (RuntimeException e) {
256                    log.error("Events postcommit execution encountered exception for listener: " + listener.getName(),
257                            e);
258                    // don't rethrow, but rollback (ok=false) and continue loop
259                } finally {
260                    try {
261                        if (reconnected != null) {
262                            reconnected.disconnect();
263                        }
264                    } finally {
265                        if (tx) {
266                            if (!ok) {
267                                TransactionHelper.setTransactionRollbackOnly();
268                                log.error("Rolling back transaction");
269                            }
270                            TransactionHelper.commitOrRollbackTransaction();
271                        }
272                        long elapsed = System.currentTimeMillis() - t1;
273                        if (stats != null) {
274                            stats.logAsyncExec(listener, elapsed);
275                        }
276                        if (log.isDebugEnabled()) {
277                            log.debug("Events postcommit execution end for listener: " + listener.getName() + " in "
278                                    + elapsed + "ms");
279                        }
280                        SequenceTracer.stop("listener done " + elapsed + " ms");
281                    }
282                }
283                // even if interrupted due to timeout, we continue the loop
284            }
285            long elapsed = System.currentTimeMillis() - t0;
286            if (log.isDebugEnabled()) {
287                log.debug("Events postcommit execution finished in " + elapsed + "ms");
288            }
289            SequenceTracer.stop("postcommit done" + elapsed + " ms");
290            return Boolean.TRUE; // no error to report
291        }
292    }
293
294    /**
295     * Lets the listeners process the event bundle in bulk mode.
296     * <p>
297     * The event bundle is reconnected to a single session and a single transaction is started for all the listeners.
298     * <p>
299     * In case of exception in a listener, the transaction is rolled back and processing stops.
300     * <p>
301     * In case of timeout, the transaction is rolled back and processing stops.
302     */
303    protected static class EventBundleBulkRunner implements Callable<Boolean> {
304
305        protected final List<EventListenerDescriptor> listeners;
306
307        protected final EventBundle bundle;
308        protected final String callerThread;
309
310        public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
311            this.listeners = listeners;
312            this.bundle = bundle;
313            callerThread = SequenceTracer.getThreadName();
314        }
315
316        @Override
317        public Boolean call() {
318            SequenceTracer.startFrom(callerThread, "BulkPostcommit", "#ff410f");
319            if (log.isDebugEnabled()) {
320                log.debug("Events postcommit bulk execution starting in thread: " + Thread.currentThread().getName());
321            }
322            long t0 = System.currentTimeMillis();
323
324            boolean ok = false;
325            boolean interrupt = false;
326            ReconnectedEventBundle reconnected = null;
327            // transaction timeout is managed by the FutureTask
328            boolean tx = TransactionHelper.startTransaction();
329            try {
330                reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString());
331                for (EventListenerDescriptor listener : listeners) {
332                    EventBundle filtered = listener.filterBundle(reconnected);
333                    if (filtered.isEmpty()) {
334                        continue;
335                    }
336                    SequenceTracer.start("run listener " + listener.getName());
337                    if (log.isDebugEnabled()) {
338                        log.debug("Events postcommit bulk execution start for listener: " + listener.getName());
339                    }
340                    long t1 = System.currentTimeMillis();
341                    try {
342
343                        listener.asPostCommitListener().handleEvent(filtered);
344
345                        if (Thread.currentThread().isInterrupted()) {
346                            log.error("Events postcommit bulk execution interrupted for listener: "
347                                    + listener.getName() + ", will rollback and abort bulk processing");
348                            interrupt = true;
349                        }
350                    } catch (RuntimeException e) {
351                        log.error(
352                                "Events postcommit bulk execution encountered exception for listener: "
353                                        + listener.getName(), e);
354                        return Boolean.FALSE; // report error
355                    } finally {
356                        long elapsed = System.currentTimeMillis() - t1;
357                        if (log.isDebugEnabled()) {
358                            log.debug("Events postcommit bulk execution end for listener: " + listener.getName()
359                                    + " in " + elapsed + "ms");
360                        }
361                        SequenceTracer.stop("listener done " + elapsed + " ms");
362                    }
363                    if (interrupt) {
364                        break;
365                    }
366                }
367                ok = !interrupt;
368            } finally {
369                try {
370                    if (reconnected != null) {
371                        reconnected.disconnect();
372                    }
373                } finally {
374                    if (tx) {
375                        if (!ok) {
376                            TransactionHelper.setTransactionRollbackOnly();
377                            log.error("Rolling back transaction");
378                        }
379                        TransactionHelper.commitOrRollbackTransaction();
380                    }
381                }
382                long elapsed = System.currentTimeMillis() - t0;
383                SequenceTracer.stop("BulkPostcommit done " + elapsed + " ms");
384                if (log.isDebugEnabled()) {
385                    log.debug("Events postcommit bulk execution finished in " + elapsed + "ms");
386                }
387            }
388            return Boolean.TRUE; // no error to report
389        }
390    }
391}