001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.event.impl; 021 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.FutureTask; 029import java.util.concurrent.RejectedExecutionException; 030import java.util.concurrent.SynchronousQueue; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.atomic.AtomicInteger; 036 037import org.apache.logging.log4j.LogManager; 038import org.apache.logging.log4j.Logger; 039import org.nuxeo.ecm.core.event.EventBundle; 040import org.nuxeo.ecm.core.event.EventStats; 041import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.transaction.TransactionHelper; 044 045import io.opencensus.common.Scope; 046import io.opencensus.trace.AttributeValue; 047import io.opencensus.trace.BlankSpan; 048import io.opencensus.trace.Link; 049import io.opencensus.trace.Span; 050import io.opencensus.trace.SpanContext; 051import io.opencensus.trace.Status; 052import io.opencensus.trace.Tracer; 053import io.opencensus.trace.Tracing; 054import io.opencensus.trace.propagation.BinaryFormat; 055import io.opencensus.trace.propagation.SpanContextParseException; 056 057/** 058 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage 059 * transactions). 060 * <p> 061 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners. 062 */ 063public class PostCommitEventExecutor { 064 065 private static final Logger log = LogManager.getLogger(PostCommitEventExecutor.class); 066 067 public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs"; 068 069 public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s 070 071 public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min 072 073 private Integer defaultTimeoutMs; 074 075 public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min 076 077 public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout"; 078 079 private static final long KEEP_ALIVE_TIME_SECOND = 10; 080 081 private static final int MAX_POOL_SIZE = 100; 082 083 protected final ExecutorService executor; 084 085 /** 086 * Creates non-daemon threads at normal priority. 087 */ 088 private static class NamedThreadFactory implements ThreadFactory { 089 090 private final AtomicInteger threadNumber = new AtomicInteger(); 091 092 private final ThreadGroup group; 093 094 private final String prefix; 095 096 public NamedThreadFactory(String prefix) { 097 SecurityManager sm = System.getSecurityManager(); 098 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 099 this.prefix = prefix; 100 } 101 102 @Override 103 public Thread newThread(Runnable r) { 104 String name = prefix + threadNumber.incrementAndGet(); 105 Thread thread = new Thread(group, r, name); 106 // do not set daemon 107 thread.setPriority(Thread.NORM_PRIORITY); 108 return thread; 109 } 110 } 111 112 public PostCommitEventExecutor() { 113 // use as much thread as needed up to MAX_POOL_SIZE 114 // keep them alive a moment for reuse 115 // have all threads torn down when there is no work to do 116 ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-"); 117 executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS, 118 new SynchronousQueue<Runnable>(), threadFactory); 119 ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); 120 } 121 122 protected int getDefaultTimeoutMs() { 123 if (defaultTimeoutMs == null) { 124 if (Framework.getProperty(TIMEOUT_MS_PROP) != null) { 125 defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP)); 126 } else if (Framework.isTestModeSet()) { 127 defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS; 128 } else { 129 defaultTimeoutMs = DEFAULT_TIMEOUT_MS; 130 } 131 } 132 return defaultTimeoutMs; 133 } 134 135 public void shutdown(long timeoutMillis) throws InterruptedException { 136 executor.shutdown(); 137 executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS); 138 if (!executor.isTerminated()) { 139 executor.shutdownNow(); 140 } 141 } 142 143 public void run(List<EventListenerDescriptor> listeners, EventBundle event) { 144 log.warn( 145 "Running post commit event listeners: {}. Post commit event listener execution will soon be deprecated," 146 + " see NXP-27986. As explained in NXP-26911, please update your post commit event listener" 147 + " contributions to make the listeners asynchronous with <listener async=\"true\"...>.\n" 148 + " You can disable this warning by following the instructions provided in NXP-26911.", 149 listeners); 150 run(listeners, event, getDefaultTimeoutMs(), false); 151 } 152 153 public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) { 154 String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S); 155 run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true); 156 } 157 158 public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) { 159 // check that there's at list one listener interested 160 boolean some = false; 161 for (EventListenerDescriptor listener : listeners) { 162 if (listener.acceptBundle(bundle)) { 163 some = true; 164 break; 165 } 166 } 167 if (!some) { 168 log.debug("Events postcommit execution has nothing to do"); 169 return; 170 } 171 172 log.debug("Events postcommit execution starting with timeout {}ms{}", () -> Long.valueOf(timeoutMillis), 173 () -> bulk ? " in bulk mode" : ""); 174 175 Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) 176 : new EventBundleBulkRunner(listeners, bundle); 177 FutureTask<Boolean> futureTask = new FutureTask<>(callable); 178 try { 179 executor.execute(futureTask); 180 } catch (RejectedExecutionException e) { 181 log.error("Events postcommit execution rejected", e); 182 return; 183 } 184 try { 185 // wait for runner to be finished, with timeout 186 Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS); 187 if (Boolean.FALSE.equals(ok)) { 188 log.error("Events postcommit bulk execution aborted due to previous error"); 189 } 190 } catch (InterruptedException e) { 191 Thread.currentThread().interrupt(); 192 // interrupt thread 193 futureTask.cancel(true); // mayInterruptIfRunning=true 194 } catch (TimeoutException e) { 195 if (!bulk) { 196 log.info("Events postcommit execution exceeded timeout of {}ms, leaving thread running", 197 () -> Long.valueOf(timeoutMillis)); 198 // don't cancel task, let it run 199 } else { 200 log.error("Events postcommit bulk execution exceeded timeout of {}ms, interrupting thread", 201 () -> Long.valueOf(timeoutMillis)); 202 futureTask.cancel(true); // mayInterruptIfRunning=true 203 } 204 } catch (ExecutionException e) { 205 log.error("Events postcommit execution encountered unexpected exception", e::getCause); 206 } 207 208 log.debug("Events postcommit execution finished"); 209 } 210 211 /** 212 * Lets the listeners process the event bundle. 213 * <p> 214 * For each listener, the event bundle is reconnected to a session and a transaction is started. 215 * <p> 216 * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for 217 * the other listeners. 218 * <p> 219 * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left 220 * running separately from the main thread that initiated post-commit processing). 221 */ 222 protected static class EventBundleRunner implements Callable<Boolean> { 223 224 protected final List<EventListenerDescriptor> listeners; 225 226 protected final EventBundle bundle; 227 228 protected final byte[] traceContext; 229 230 protected String callerThread; 231 232 public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 233 this.listeners = listeners; 234 this.bundle = bundle; 235 callerThread = Thread.currentThread().getName(); 236 traceContext = Tracing.getPropagationComponent() 237 .getBinaryFormat() 238 .toByteArray(Tracing.getTracer().getCurrentSpan().getContext()); 239 } 240 241 @Override 242 public Boolean call() { 243 log.debug("Events postcommit execution starting in thread: {}", () -> Thread.currentThread().getName()); 244 long t0 = System.currentTimeMillis(); 245 EventStats stats = Framework.getService(EventStats.class); 246 Span span = getTracingSpan("postcommit/EventBundleBulkRunner"); 247 try (Scope scope = Tracing.getTracer().withSpan(span)) { 248 for (EventListenerDescriptor listener : listeners) { 249 EventBundle filtered = listener.filterBundle(bundle); 250 if (filtered.isEmpty()) { 251 continue; 252 } 253 log.debug("Events postcommit execution start for listener: {}", listener::getName); 254 long t1 = System.currentTimeMillis(); 255 256 boolean ok = false; 257 ReconnectedEventBundle reconnected = null; 258 // transaction timeout is managed by the FutureTask 259 boolean tx = TransactionHelper.startTransaction(); 260 try { 261 reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString()); 262 263 listener.asPostCommitListener().handleEvent(reconnected); 264 265 ok = true; 266 // don't check for interrupted flag, the event completed normally, no reason to rollback 267 } catch (RuntimeException e) { 268 log.error("Events postcommit execution encountered exception for listener: {}", 269 listener::getName, () -> e); 270 // don't rethrow, but rollback (ok=false) and continue loop 271 span.setStatus(Status.UNKNOWN); 272 } finally { 273 try { 274 if (reconnected != null) { 275 reconnected.disconnect(); 276 } 277 } finally { 278 if (tx) { 279 if (!ok) { 280 TransactionHelper.setTransactionRollbackOnly(); 281 log.error("Rolling back transaction"); 282 } 283 TransactionHelper.commitOrRollbackTransaction(); 284 } 285 long elapsed = System.currentTimeMillis() - t1; 286 if (stats != null) { 287 stats.logAsyncExec(listener, elapsed); 288 } 289 log.debug("Events postcommit execution end for listener: {} in {}ms", listener::getName, 290 () -> elapsed); 291 span.addAnnotation("PostCommitEventExecutor#Listener " + listener.getName() + " " + elapsed + " ms"); 292 } 293 } 294 // even if interrupted due to timeout, we continue the loop 295 } 296 span.setStatus(Status.OK); 297 } finally { 298 span.end(); 299 } 300 301 long elapsed = System.currentTimeMillis() - t0; 302 log.debug("Events postcommit execution finished in {}ms", elapsed); 303 return Boolean.TRUE; // no error to report 304 } 305 306 protected Span getTracingSpan(String spanName) { 307 if (traceContext == null) { 308 return BlankSpan.INSTANCE; 309 } 310 Tracer tracer = Tracing.getTracer(); 311 BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat(); 312 try { 313 SpanContext spanContext = binaryFormat.fromByteArray(traceContext); 314 Span span = tracer.spanBuilderWithRemoteParent(spanName, spanContext).startSpan(); 315 span.addLink(Link.fromSpanContext(spanContext, Link.Type.PARENT_LINKED_SPAN)); 316 Map<String, AttributeValue> map = new HashMap<>(); 317 map.put("tx.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName())); 318 map.put("bundle.event_count", AttributeValue.longAttributeValue(bundle.size())); 319 map.put("bundle.caller_thread", AttributeValue.stringAttributeValue(callerThread)); 320 span.putAttributes(map); 321 return span; 322 } catch (SpanContextParseException e) { 323 log.warn("Invalid trace context: " + traceContext.length, e); 324 return BlankSpan.INSTANCE; 325 } 326 } 327 } 328 329 /** 330 * Lets the listeners process the event bundle in bulk mode. 331 * <p> 332 * The event bundle is reconnected to a single session and a single transaction is started for all the listeners. 333 * <p> 334 * In case of exception in a listener, the transaction is rolled back and processing stops. 335 * <p> 336 * In case of timeout, the transaction is rolled back and processing stops. 337 */ 338 protected static class EventBundleBulkRunner implements Callable<Boolean> { 339 340 protected final List<EventListenerDescriptor> listeners; 341 342 protected final EventBundle bundle; 343 344 protected final String callerThread; 345 346 protected final byte[] traceContext; 347 348 public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 349 this.listeners = listeners; 350 this.bundle = bundle; 351 callerThread = Thread.currentThread().getName(); 352 traceContext = Tracing.getPropagationComponent() 353 .getBinaryFormat() 354 .toByteArray(Tracing.getTracer().getCurrentSpan().getContext()); 355 } 356 357 @Override 358 public Boolean call() { 359 Span span = getTracingSpan("postcommit/EventBundleBulkRunner"); 360 log.debug("Events postcommit bulk execution starting in thread: {}", 361 () -> Thread.currentThread().getName()); 362 long t0 = System.currentTimeMillis(); 363 364 boolean ok = false; 365 boolean interrupt = false; 366 ReconnectedEventBundle reconnected = null; 367 // transaction timeout is managed by the FutureTask 368 boolean tx = TransactionHelper.startTransaction(); 369 try (Scope scope = Tracing.getTracer().withSpan(span)) { 370 reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString()); 371 for (EventListenerDescriptor listener : listeners) { 372 EventBundle filtered = listener.filterBundle(reconnected); 373 if (filtered.isEmpty()) { 374 continue; 375 } 376 log.debug("Events postcommit bulk execution start for listener: {}", listener::getName); 377 long t1 = System.currentTimeMillis(); 378 try { 379 380 listener.asPostCommitListener().handleEvent(filtered); 381 382 if (Thread.currentThread().isInterrupted()) { 383 log.error("Events postcommit bulk execution interrupted for listener: {}, will rollback and" 384 + " abort bulk processing", listener::getName); 385 interrupt = true; 386 } 387 } catch (RuntimeException e) { 388 log.error("Events postcommit bulk execution encountered exception for listener: {}", 389 listener::getName, () -> e); 390 span.setStatus(Status.UNKNOWN); 391 return Boolean.FALSE; // report error 392 } finally { 393 long elapsed = System.currentTimeMillis() - t1; 394 log.debug("Events postcommit bulk execution end for listener: {} in {}ms", listener::getName, 395 () -> elapsed); 396 span.addAnnotation("PostCommitEventExecutor Listener " + listener.getName() + " " + elapsed + " ms"); 397 } 398 if (interrupt) { 399 break; 400 } 401 } 402 ok = !interrupt; 403 } finally { 404 try { 405 if (reconnected != null) { 406 reconnected.disconnect(); 407 } 408 } finally { 409 if (tx) { 410 if (!ok) { 411 TransactionHelper.setTransactionRollbackOnly(); 412 log.error("Rolling back transaction"); 413 } 414 TransactionHelper.commitOrRollbackTransaction(); 415 } 416 } 417 long elapsed = System.currentTimeMillis() - t0; 418 log.debug("Events postcommit bulk execution finished in {}ms", elapsed); 419 span.end(); 420 } 421 return Boolean.TRUE; // no error to report 422 } 423 424 protected Span getTracingSpan(String spanName) { 425 if (traceContext == null) { 426 return BlankSpan.INSTANCE; 427 } 428 Tracer tracer = Tracing.getTracer(); 429 BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat(); 430 try { 431 SpanContext spanContext = binaryFormat.fromByteArray(traceContext); 432 Span span = tracer.spanBuilderWithRemoteParent(spanName, spanContext).startSpan(); 433 span.addLink(Link.fromSpanContext(spanContext, Link.Type.PARENT_LINKED_SPAN)); 434 Map<String, AttributeValue> map = new HashMap<>(); 435 map.put("tx.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName())); 436 map.put("bundle.event_count", AttributeValue.longAttributeValue(bundle.size())); 437 map.put("bundle.caller_thread", AttributeValue.stringAttributeValue(callerThread)); 438 span.putAttributes(map); 439 return span; 440 } catch (SpanContextParseException e) { 441 log.warn("Invalid trace context: " + traceContext.length, e); 442 return BlankSpan.INSTANCE; 443 } 444 } 445 } 446}