001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.event.impl; 021 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.FutureTask; 027import java.util.concurrent.RejectedExecutionException; 028import java.util.concurrent.SynchronousQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.common.logging.SequenceTracer; 038import org.nuxeo.ecm.core.event.EventBundle; 039import org.nuxeo.ecm.core.event.EventStats; 040import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 041import org.nuxeo.runtime.api.Framework; 042import org.nuxeo.runtime.transaction.TransactionHelper; 043 044/** 045 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage 046 * transactions). 047 * <p> 048 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners. 049 */ 050public class PostCommitEventExecutor { 051 052 private static final Log log = LogFactory.getLog(PostCommitEventExecutor.class); 053 054 public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs"; 055 056 public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s 057 058 public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min 059 060 private Integer defaultTimeoutMs; 061 062 public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min 063 064 public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout"; 065 066 private static final long KEEP_ALIVE_TIME_SECOND = 10; 067 068 private static final int MAX_POOL_SIZE = 100; 069 070 protected final ExecutorService executor; 071 072 /** 073 * Creates non-daemon threads at normal priority. 074 */ 075 private static class NamedThreadFactory implements ThreadFactory { 076 077 private final AtomicInteger threadNumber = new AtomicInteger(); 078 079 private final ThreadGroup group; 080 081 private final String prefix; 082 083 public NamedThreadFactory(String prefix) { 084 SecurityManager sm = System.getSecurityManager(); 085 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 086 this.prefix = prefix; 087 } 088 089 @Override 090 public Thread newThread(Runnable r) { 091 String name = prefix + threadNumber.incrementAndGet(); 092 Thread thread = new Thread(group, r, name); 093 // do not set daemon 094 thread.setPriority(Thread.NORM_PRIORITY); 095 return thread; 096 } 097 } 098 099 public PostCommitEventExecutor() { 100 // use as much thread as needed up to MAX_POOL_SIZE 101 // keep them alive a moment for reuse 102 // have all threads torn down when there is no work to do 103 ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-"); 104 executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS, 105 new SynchronousQueue<Runnable>(), threadFactory); 106 ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); 107 } 108 109 protected int getDefaultTimeoutMs() { 110 if (defaultTimeoutMs == null) { 111 if (Framework.getProperty(TIMEOUT_MS_PROP) != null) { 112 defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP)); 113 } else if (Framework.isTestModeSet()) { 114 defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS; 115 } else { 116 defaultTimeoutMs = DEFAULT_TIMEOUT_MS; 117 } 118 } 119 return defaultTimeoutMs; 120 } 121 122 public void shutdown(long timeoutMillis) throws InterruptedException { 123 executor.shutdown(); 124 executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS); 125 if (!executor.isTerminated()) { 126 executor.shutdownNow(); 127 } 128 } 129 130 public void run(List<EventListenerDescriptor> listeners, EventBundle event) { 131 run(listeners, event, getDefaultTimeoutMs(), false); 132 } 133 134 public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) { 135 String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S); 136 run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true); 137 } 138 139 public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) { 140 // check that there's at list one listener interested 141 boolean some = false; 142 for (EventListenerDescriptor listener : listeners) { 143 if (listener.acceptBundle(bundle)) { 144 some = true; 145 break; 146 } 147 } 148 if (!some) { 149 if (log.isDebugEnabled()) { 150 log.debug("Events postcommit execution has nothing to do"); 151 } 152 return; 153 } 154 155 if (log.isDebugEnabled()) { 156 log.debug(String.format("Events postcommit execution starting with timeout %sms%s", 157 Long.valueOf(timeoutMillis), bulk ? " in bulk mode" : "")); 158 } 159 160 Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) : new EventBundleBulkRunner( 161 listeners, bundle); 162 FutureTask<Boolean> futureTask = new FutureTask<Boolean>(callable); 163 try { 164 executor.execute(futureTask); 165 } catch (RejectedExecutionException e) { 166 log.error("Events postcommit execution rejected", e); 167 return; 168 } 169 try { 170 // wait for runner to be finished, with timeout 171 Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS); 172 if (Boolean.FALSE.equals(ok)) { 173 log.error("Events postcommit bulk execution aborted due to previous error"); 174 } 175 } catch (InterruptedException e) { 176 // restore interrupted status 177 Thread.currentThread().interrupt(); 178 // interrupt thread 179 futureTask.cancel(true); // mayInterruptIfRunning=true 180 } catch (TimeoutException e) { 181 if (!bulk) { 182 log.info(String.format("Events postcommit execution exceeded timeout of %sms, leaving thread running", 183 Long.valueOf(timeoutMillis))); 184 // don't cancel task, let it run 185 } else { 186 log.error(String.format( 187 "Events postcommit bulk execution exceeded timeout of %sms, interrupting thread", 188 Long.valueOf(timeoutMillis))); 189 futureTask.cancel(true); // mayInterruptIfRunning=true 190 } 191 } catch (ExecutionException e) { 192 log.error("Events postcommit execution encountered unexpected exception", e.getCause()); 193 } 194 195 if (log.isDebugEnabled()) { 196 log.debug("Events postcommit execution finished"); 197 } 198 } 199 200 /** 201 * Lets the listeners process the event bundle. 202 * <p> 203 * For each listener, the event bundle is reconnected to a session and a transaction is started. 204 * <p> 205 * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for 206 * the other listeners. 207 * <p> 208 * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left 209 * running separately from the main thread that initiated post-commit processing). 210 */ 211 protected static class EventBundleRunner implements Callable<Boolean> { 212 213 protected final List<EventListenerDescriptor> listeners; 214 215 protected final EventBundle bundle; 216 217 protected String callerThread; 218 219 public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 220 this.listeners = listeners; 221 this.bundle = bundle; 222 callerThread = SequenceTracer.getThreadName(); 223 } 224 225 @Override 226 public Boolean call() { 227 if (log.isDebugEnabled()) { 228 log.debug("Events postcommit execution starting in thread: " + Thread.currentThread().getName()); 229 } 230 SequenceTracer.startFrom(callerThread, "Postcommit", "#ff410f"); 231 long t0 = System.currentTimeMillis(); 232 EventStats stats = Framework.getService(EventStats.class); 233 234 for (EventListenerDescriptor listener : listeners) { 235 EventBundle filtered = listener.filterBundle(bundle); 236 if (filtered.isEmpty()) { 237 continue; 238 } 239 if (log.isDebugEnabled()) { 240 log.debug("Events postcommit execution start for listener: " + listener.getName()); 241 } 242 SequenceTracer.start("run listener " + listener.getName()); 243 long t1 = System.currentTimeMillis(); 244 245 boolean ok = false; 246 ReconnectedEventBundle reconnected = null; 247 // transaction timeout is managed by the FutureTask 248 boolean tx = TransactionHelper.startTransaction(); 249 try { 250 reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString()); 251 252 listener.asPostCommitListener().handleEvent(reconnected); 253 254 if (Thread.currentThread().isInterrupted()) { 255 log.error("Events postcommit execution interrupted for listener: " + listener.getName()); 256 SequenceTracer.destroy("interrupted"); 257 ok = false; 258 } else { 259 ok = true; 260 } 261 } catch (RuntimeException e) { 262 log.error("Events postcommit execution encountered exception for listener: " + listener.getName(), 263 e); 264 // don't rethrow, but rollback (ok=false) and continue loop 265 } finally { 266 try { 267 if (reconnected != null) { 268 reconnected.disconnect(); 269 } 270 } finally { 271 if (tx) { 272 if (!ok) { 273 TransactionHelper.setTransactionRollbackOnly(); 274 log.error("Rolling back transaction"); 275 } 276 TransactionHelper.commitOrRollbackTransaction(); 277 } 278 long elapsed = System.currentTimeMillis() - t1; 279 if (stats != null) { 280 stats.logAsyncExec(listener, elapsed); 281 } 282 if (log.isDebugEnabled()) { 283 log.debug("Events postcommit execution end for listener: " + listener.getName() + " in " 284 + elapsed + "ms"); 285 } 286 SequenceTracer.stop("listener done " + elapsed + " ms"); 287 } 288 } 289 // even if interrupted due to timeout, we continue the loop 290 } 291 long elapsed = System.currentTimeMillis() - t0; 292 if (log.isDebugEnabled()) { 293 log.debug("Events postcommit execution finished in " + elapsed + "ms"); 294 } 295 SequenceTracer.stop("postcommit done" + elapsed + " ms"); 296 return Boolean.TRUE; // no error to report 297 } 298 } 299 300 /** 301 * Lets the listeners process the event bundle in bulk mode. 302 * <p> 303 * The event bundle is reconnected to a single session and a single transaction is started for all the listeners. 304 * <p> 305 * In case of exception in a listener, the transaction is rolled back and processing stops. 306 * <p> 307 * In case of timeout, the transaction is rolled back and processing stops. 308 */ 309 protected static class EventBundleBulkRunner implements Callable<Boolean> { 310 311 protected final List<EventListenerDescriptor> listeners; 312 313 protected final EventBundle bundle; 314 protected final String callerThread; 315 316 public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 317 this.listeners = listeners; 318 this.bundle = bundle; 319 callerThread = SequenceTracer.getThreadName(); 320 } 321 322 @Override 323 public Boolean call() { 324 SequenceTracer.startFrom(callerThread, "BulkPostcommit", "#ff410f"); 325 if (log.isDebugEnabled()) { 326 log.debug("Events postcommit bulk execution starting in thread: " + Thread.currentThread().getName()); 327 } 328 long t0 = System.currentTimeMillis(); 329 330 boolean ok = false; 331 boolean interrupt = false; 332 ReconnectedEventBundle reconnected = null; 333 // transaction timeout is managed by the FutureTask 334 boolean tx = TransactionHelper.startTransaction(); 335 try { 336 reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString()); 337 for (EventListenerDescriptor listener : listeners) { 338 EventBundle filtered = listener.filterBundle(reconnected); 339 if (filtered.isEmpty()) { 340 continue; 341 } 342 SequenceTracer.start("run listener " + listener.getName()); 343 if (log.isDebugEnabled()) { 344 log.debug("Events postcommit bulk execution start for listener: " + listener.getName()); 345 } 346 long t1 = System.currentTimeMillis(); 347 try { 348 349 listener.asPostCommitListener().handleEvent(filtered); 350 351 if (Thread.currentThread().isInterrupted()) { 352 log.error("Events postcommit bulk execution interrupted for listener: " 353 + listener.getName() + ", will rollback and abort bulk processing"); 354 interrupt = true; 355 } 356 } catch (RuntimeException e) { 357 log.error( 358 "Events postcommit bulk execution encountered exception for listener: " 359 + listener.getName(), e); 360 return Boolean.FALSE; // report error 361 } finally { 362 long elapsed = System.currentTimeMillis() - t1; 363 if (log.isDebugEnabled()) { 364 log.debug("Events postcommit bulk execution end for listener: " + listener.getName() 365 + " in " + elapsed + "ms"); 366 } 367 SequenceTracer.stop("listener done " + elapsed + " ms"); 368 } 369 if (interrupt) { 370 break; 371 } 372 } 373 ok = !interrupt; 374 } finally { 375 try { 376 if (reconnected != null) { 377 reconnected.disconnect(); 378 } 379 } finally { 380 if (tx) { 381 if (!ok) { 382 TransactionHelper.setTransactionRollbackOnly(); 383 log.error("Rolling back transaction"); 384 } 385 TransactionHelper.commitOrRollbackTransaction(); 386 } 387 } 388 long elapsed = System.currentTimeMillis() - t0; 389 SequenceTracer.stop("BulkPostcommit done " + elapsed + " ms"); 390 if (log.isDebugEnabled()) { 391 log.debug("Events postcommit bulk execution finished in " + elapsed + "ms"); 392 } 393 } 394 return Boolean.TRUE; // no error to report 395 } 396 } 397}