001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Thierry Delprat
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.event.impl;
021
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.FutureTask;
029import java.util.concurrent.RejectedExecutionException;
030import java.util.concurrent.SynchronousQueue;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.TimeoutException;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import org.apache.logging.log4j.LogManager;
038import org.apache.logging.log4j.Logger;
039import org.nuxeo.ecm.core.event.EventBundle;
040import org.nuxeo.ecm.core.event.EventStats;
041import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.transaction.TransactionHelper;
044
045import io.opencensus.common.Scope;
046import io.opencensus.trace.AttributeValue;
047import io.opencensus.trace.BlankSpan;
048import io.opencensus.trace.Link;
049import io.opencensus.trace.Span;
050import io.opencensus.trace.SpanContext;
051import io.opencensus.trace.Status;
052import io.opencensus.trace.Tracer;
053import io.opencensus.trace.Tracing;
054import io.opencensus.trace.propagation.BinaryFormat;
055import io.opencensus.trace.propagation.SpanContextParseException;
056
057/**
058 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage
059 * transactions).
060 * <p>
061 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners.
062 */
063public class PostCommitEventExecutor {
064
065    private static final Logger log = LogManager.getLogger(PostCommitEventExecutor.class);
066
067    public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs";
068
069    public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s
070
071    public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min
072
073    private Integer defaultTimeoutMs;
074
075    public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min
076
077    public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout";
078
079    private static final long KEEP_ALIVE_TIME_SECOND = 10;
080
081    private static final int MAX_POOL_SIZE = 100;
082
083    protected final ExecutorService executor;
084
085    /**
086     * Creates non-daemon threads at normal priority.
087     */
088    private static class NamedThreadFactory implements ThreadFactory {
089
090        private final AtomicInteger threadNumber = new AtomicInteger();
091
092        private final ThreadGroup group;
093
094        private final String prefix;
095
096        public NamedThreadFactory(String prefix) {
097            SecurityManager sm = System.getSecurityManager();
098            group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup();
099            this.prefix = prefix;
100        }
101
102        @Override
103        public Thread newThread(Runnable r) {
104            String name = prefix + threadNumber.incrementAndGet();
105            Thread thread = new Thread(group, r, name);
106            // do not set daemon
107            thread.setPriority(Thread.NORM_PRIORITY);
108            return thread;
109        }
110    }
111
112    public PostCommitEventExecutor() {
113        // use as much thread as needed up to MAX_POOL_SIZE
114        // keep them alive a moment for reuse
115        // have all threads torn down when there is no work to do
116        ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-");
117        executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS,
118                new SynchronousQueue<Runnable>(), threadFactory);
119        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
120    }
121
122    protected int getDefaultTimeoutMs() {
123        if (defaultTimeoutMs == null) {
124            if (Framework.getProperty(TIMEOUT_MS_PROP) != null) {
125                defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP));
126            } else if (Framework.isTestModeSet()) {
127                defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS;
128            } else {
129                defaultTimeoutMs = DEFAULT_TIMEOUT_MS;
130            }
131        }
132        return defaultTimeoutMs;
133    }
134
135    public void shutdown(long timeoutMillis) throws InterruptedException {
136        executor.shutdown();
137        executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
138        if (!executor.isTerminated()) {
139            executor.shutdownNow();
140        }
141    }
142
143    public void run(List<EventListenerDescriptor> listeners, EventBundle event) {
144        log.warn(
145                "Running post commit event listeners: {}. Post commit event listener execution will soon be deprecated,"
146                        + " see NXP-27986. As explained in NXP-26911, please update your post commit event listener"
147                        + " contributions to make the listeners asynchronous with <listener async=\"true\"...>.\n"
148                        + " You can disable this warning by following the instructions provided in NXP-26911.",
149                listeners);
150        run(listeners, event, getDefaultTimeoutMs(), false);
151    }
152
153    public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) {
154        String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S);
155        run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true);
156    }
157
158    public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) {
159        // check that there's at list one listener interested
160        boolean some = false;
161        for (EventListenerDescriptor listener : listeners) {
162            if (listener.acceptBundle(bundle)) {
163                some = true;
164                break;
165            }
166        }
167        if (!some) {
168            log.debug("Events postcommit execution has nothing to do");
169            return;
170        }
171
172        log.debug("Events postcommit execution starting with timeout {}ms{}", () -> Long.valueOf(timeoutMillis),
173                () -> bulk ? " in bulk mode" : "");
174
175        Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle)
176                : new EventBundleBulkRunner(listeners, bundle);
177        FutureTask<Boolean> futureTask = new FutureTask<>(callable);
178        try {
179            executor.execute(futureTask);
180        } catch (RejectedExecutionException e) {
181            log.error("Events postcommit execution rejected", e);
182            return;
183        }
184        try {
185            // wait for runner to be finished, with timeout
186            Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS);
187            if (Boolean.FALSE.equals(ok)) {
188                log.error("Events postcommit bulk execution aborted due to previous error");
189            }
190        } catch (InterruptedException e) {
191            Thread.currentThread().interrupt();
192            // interrupt thread
193            futureTask.cancel(true); // mayInterruptIfRunning=true
194        } catch (TimeoutException e) {
195            if (!bulk) {
196                log.info("Events postcommit execution exceeded timeout of {}ms, leaving thread running",
197                        () -> Long.valueOf(timeoutMillis));
198                // don't cancel task, let it run
199            } else {
200                log.error("Events postcommit bulk execution exceeded timeout of {}ms, interrupting thread",
201                        () -> Long.valueOf(timeoutMillis));
202                futureTask.cancel(true); // mayInterruptIfRunning=true
203            }
204        } catch (ExecutionException e) {
205            log.error("Events postcommit execution encountered unexpected exception", e::getCause);
206        }
207
208        log.debug("Events postcommit execution finished");
209    }
210
211    /**
212     * Lets the listeners process the event bundle.
213     * <p>
214     * For each listener, the event bundle is reconnected to a session and a transaction is started.
215     * <p>
216     * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for
217     * the other listeners.
218     * <p>
219     * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left
220     * running separately from the main thread that initiated post-commit processing).
221     */
222    protected static class EventBundleRunner implements Callable<Boolean> {
223
224        protected final List<EventListenerDescriptor> listeners;
225
226        protected final EventBundle bundle;
227
228        protected final byte[] traceContext;
229
230        protected String callerThread;
231
232        public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
233            this.listeners = listeners;
234            this.bundle = bundle;
235            callerThread = Thread.currentThread().getName();
236            traceContext = Tracing.getPropagationComponent()
237                                  .getBinaryFormat()
238                                  .toByteArray(Tracing.getTracer().getCurrentSpan().getContext());
239        }
240
241        @Override
242        public Boolean call() {
243            log.debug("Events postcommit execution starting in thread: {}", () -> Thread.currentThread().getName());
244            long t0 = System.currentTimeMillis();
245            EventStats stats = Framework.getService(EventStats.class);
246            Span span = getTracingSpan("postcommit/EventBundleBulkRunner");
247            try (Scope scope = Tracing.getTracer().withSpan(span)) {
248                for (EventListenerDescriptor listener : listeners) {
249                    EventBundle filtered = listener.filterBundle(bundle);
250                    if (filtered.isEmpty()) {
251                        continue;
252                    }
253                    log.debug("Events postcommit execution start for listener: {}", listener::getName);
254                    long t1 = System.currentTimeMillis();
255
256                    boolean ok = false;
257                    ReconnectedEventBundle reconnected = null;
258                    // transaction timeout is managed by the FutureTask
259                    boolean tx = TransactionHelper.startTransaction();
260                    try {
261                        reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString());
262
263                        listener.asPostCommitListener().handleEvent(reconnected);
264
265                        ok = true;
266                        // don't check for interrupted flag, the event completed normally, no reason to rollback
267                    } catch (RuntimeException e) {
268                        log.error("Events postcommit execution encountered exception for listener: {}",
269                                listener::getName, () -> e);
270                        // don't rethrow, but rollback (ok=false) and continue loop
271                        span.setStatus(Status.UNKNOWN);
272                    } finally {
273                        try {
274                            if (reconnected != null) {
275                                reconnected.disconnect();
276                            }
277                        } finally {
278                            if (tx) {
279                                if (!ok) {
280                                    TransactionHelper.setTransactionRollbackOnly();
281                                    log.error("Rolling back transaction");
282                                }
283                                TransactionHelper.commitOrRollbackTransaction();
284                            }
285                            long elapsed = System.currentTimeMillis() - t1;
286                            if (stats != null) {
287                                stats.logAsyncExec(listener, elapsed);
288                            }
289                            log.debug("Events postcommit execution end for listener: {} in {}ms", listener::getName,
290                                    () -> elapsed);
291                            span.addAnnotation("PostCommitEventExecutor#Listener " + listener.getName() + " " + elapsed + " ms");
292                        }
293                    }
294                    // even if interrupted due to timeout, we continue the loop
295                }
296                span.setStatus(Status.OK);
297            } finally {
298                span.end();
299            }
300
301            long elapsed = System.currentTimeMillis() - t0;
302            log.debug("Events postcommit execution finished in {}ms", elapsed);
303            return Boolean.TRUE; // no error to report
304        }
305
306        protected Span getTracingSpan(String spanName) {
307            if (traceContext == null) {
308                return BlankSpan.INSTANCE;
309            }
310            Tracer tracer = Tracing.getTracer();
311            BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat();
312            try {
313                SpanContext spanContext = binaryFormat.fromByteArray(traceContext);
314                Span span = tracer.spanBuilderWithRemoteParent(spanName, spanContext).startSpan();
315                span.addLink(Link.fromSpanContext(spanContext, Link.Type.PARENT_LINKED_SPAN));
316                Map<String, AttributeValue> map = new HashMap<>();
317                map.put("tx.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName()));
318                map.put("bundle.event_count", AttributeValue.longAttributeValue(bundle.size()));
319                map.put("bundle.caller_thread", AttributeValue.stringAttributeValue(callerThread));
320                span.putAttributes(map);
321                return span;
322            } catch (SpanContextParseException e) {
323                log.warn("Invalid trace context: " + traceContext.length, e);
324                return BlankSpan.INSTANCE;
325            }
326        }
327    }
328
329    /**
330     * Lets the listeners process the event bundle in bulk mode.
331     * <p>
332     * The event bundle is reconnected to a single session and a single transaction is started for all the listeners.
333     * <p>
334     * In case of exception in a listener, the transaction is rolled back and processing stops.
335     * <p>
336     * In case of timeout, the transaction is rolled back and processing stops.
337     */
338    protected static class EventBundleBulkRunner implements Callable<Boolean> {
339
340        protected final List<EventListenerDescriptor> listeners;
341
342        protected final EventBundle bundle;
343
344        protected final String callerThread;
345
346        protected final byte[] traceContext;
347
348        public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) {
349            this.listeners = listeners;
350            this.bundle = bundle;
351            callerThread = Thread.currentThread().getName();
352            traceContext = Tracing.getPropagationComponent()
353                                  .getBinaryFormat()
354                                  .toByteArray(Tracing.getTracer().getCurrentSpan().getContext());
355        }
356
357        @Override
358        public Boolean call() {
359            Span span = getTracingSpan("postcommit/EventBundleBulkRunner");
360            log.debug("Events postcommit bulk execution starting in thread: {}",
361                    () -> Thread.currentThread().getName());
362            long t0 = System.currentTimeMillis();
363
364            boolean ok = false;
365            boolean interrupt = false;
366            ReconnectedEventBundle reconnected = null;
367            // transaction timeout is managed by the FutureTask
368            boolean tx = TransactionHelper.startTransaction();
369            try (Scope scope = Tracing.getTracer().withSpan(span)) {
370                reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString());
371                for (EventListenerDescriptor listener : listeners) {
372                    EventBundle filtered = listener.filterBundle(reconnected);
373                    if (filtered.isEmpty()) {
374                        continue;
375                    }
376                    log.debug("Events postcommit bulk execution start for listener: {}", listener::getName);
377                    long t1 = System.currentTimeMillis();
378                    try {
379
380                        listener.asPostCommitListener().handleEvent(filtered);
381
382                        if (Thread.currentThread().isInterrupted()) {
383                            log.error("Events postcommit bulk execution interrupted for listener: {}, will rollback and"
384                                    + " abort bulk processing", listener::getName);
385                            interrupt = true;
386                        }
387                    } catch (RuntimeException e) {
388                        log.error("Events postcommit bulk execution encountered exception for listener: {}",
389                                listener::getName, () -> e);
390                        span.setStatus(Status.UNKNOWN);
391                        return Boolean.FALSE; // report error
392                    } finally {
393                        long elapsed = System.currentTimeMillis() - t1;
394                        log.debug("Events postcommit bulk execution end for listener: {} in {}ms", listener::getName,
395                                () -> elapsed);
396                        span.addAnnotation("PostCommitEventExecutor Listener " + listener.getName() + " " + elapsed + " ms");
397                    }
398                    if (interrupt) {
399                        break;
400                    }
401                }
402                ok = !interrupt;
403            } finally {
404                try {
405                    if (reconnected != null) {
406                        reconnected.disconnect();
407                    }
408                } finally {
409                    if (tx) {
410                        if (!ok) {
411                            TransactionHelper.setTransactionRollbackOnly();
412                            log.error("Rolling back transaction");
413                        }
414                        TransactionHelper.commitOrRollbackTransaction();
415                    }
416                }
417                long elapsed = System.currentTimeMillis() - t0;
418                log.debug("Events postcommit bulk execution finished in {}ms", elapsed);
419                span.end();
420            }
421            return Boolean.TRUE; // no error to report
422        }
423
424        protected Span getTracingSpan(String spanName) {
425            if (traceContext == null) {
426                return BlankSpan.INSTANCE;
427            }
428            Tracer tracer = Tracing.getTracer();
429            BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat();
430            try {
431                SpanContext spanContext = binaryFormat.fromByteArray(traceContext);
432                Span span = tracer.spanBuilderWithRemoteParent(spanName, spanContext).startSpan();
433                span.addLink(Link.fromSpanContext(spanContext, Link.Type.PARENT_LINKED_SPAN));
434                Map<String, AttributeValue> map = new HashMap<>();
435                map.put("tx.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName()));
436                map.put("bundle.event_count", AttributeValue.longAttributeValue(bundle.size()));
437                map.put("bundle.caller_thread", AttributeValue.stringAttributeValue(callerThread));
438                span.putAttributes(map);
439                return span;
440            } catch (SpanContextParseException e) {
441                log.warn("Invalid trace context: " + traceContext.length, e);
442                return BlankSpan.INSTANCE;
443            }
444        }
445    }
446}