001/* 002 * Copyright (c) 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Thierry Delprat 011 * Florent Guillaume 012 */ 013package org.nuxeo.ecm.core.event.impl; 014 015import java.util.List; 016import java.util.concurrent.Callable; 017import java.util.concurrent.ExecutionException; 018import java.util.concurrent.ExecutorService; 019import java.util.concurrent.FutureTask; 020import java.util.concurrent.RejectedExecutionException; 021import java.util.concurrent.SynchronousQueue; 022import java.util.concurrent.ThreadFactory; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.ecm.core.event.EventBundle; 031import org.nuxeo.ecm.core.event.EventStats; 032import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 033import org.nuxeo.runtime.api.Framework; 034import org.nuxeo.runtime.transaction.TransactionHelper; 035 036/** 037 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage 038 * transactions). 039 * <p> 040 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners. 041 */ 042public class PostCommitEventExecutor { 043 044 private static final Log log = LogFactory.getLog(PostCommitEventExecutor.class); 045 046 public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs"; 047 048 public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s 049 050 public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min 051 052 private Integer defaultTimeoutMs; 053 054 public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min 055 056 public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout"; 057 058 private static final long KEEP_ALIVE_TIME_SECOND = 10; 059 060 private static final int MAX_POOL_SIZE = 100; 061 062 protected final ExecutorService executor; 063 064 /** 065 * Creates non-daemon threads at normal priority. 066 */ 067 private static class NamedThreadFactory implements ThreadFactory { 068 069 private final AtomicInteger threadNumber = new AtomicInteger(); 070 071 private final ThreadGroup group; 072 073 private final String prefix; 074 075 public NamedThreadFactory(String prefix) { 076 SecurityManager sm = System.getSecurityManager(); 077 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 078 this.prefix = prefix; 079 } 080 081 @Override 082 public Thread newThread(Runnable r) { 083 String name = prefix + threadNumber.incrementAndGet(); 084 Thread thread = new Thread(group, r, name); 085 // do not set daemon 086 thread.setPriority(Thread.NORM_PRIORITY); 087 return thread; 088 } 089 } 090 091 public PostCommitEventExecutor() { 092 // use as much thread as needed up to MAX_POOL_SIZE 093 // keep them alive a moment for reuse 094 // have all threads torn down when there is no work to do 095 ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-"); 096 executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS, 097 new SynchronousQueue<Runnable>(), threadFactory); 098 ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); 099 } 100 101 protected int getDefaultTimeoutMs() { 102 if (defaultTimeoutMs == null) { 103 if (Framework.getProperty(TIMEOUT_MS_PROP) != null) { 104 defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP)); 105 } else if (Framework.isTestModeSet()) { 106 defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS; 107 } else { 108 defaultTimeoutMs = DEFAULT_TIMEOUT_MS; 109 } 110 } 111 return defaultTimeoutMs; 112 } 113 114 public void shutdown(long timeoutMillis) throws InterruptedException { 115 executor.shutdown(); 116 executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS); 117 if (!executor.isTerminated()) { 118 executor.shutdownNow(); 119 } 120 } 121 122 public void run(List<EventListenerDescriptor> listeners, EventBundle event) { 123 run(listeners, event, getDefaultTimeoutMs(), false); 124 } 125 126 public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) { 127 String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S); 128 run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true); 129 } 130 131 public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) { 132 // check that there's at list one listener interested 133 boolean some = false; 134 for (EventListenerDescriptor listener : listeners) { 135 if (listener.acceptBundle(bundle)) { 136 some = true; 137 break; 138 } 139 } 140 if (!some) { 141 if (log.isDebugEnabled()) { 142 log.debug("Events postcommit execution has nothing to do"); 143 } 144 return; 145 } 146 147 if (log.isDebugEnabled()) { 148 log.debug(String.format("Events postcommit execution starting with timeout %sms%s", 149 Long.valueOf(timeoutMillis), bulk ? " in bulk mode" : "")); 150 } 151 152 Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) : new EventBundleBulkRunner( 153 listeners, bundle); 154 FutureTask<Boolean> futureTask = new FutureTask<Boolean>(callable); 155 try { 156 executor.execute(futureTask); 157 } catch (RejectedExecutionException e) { 158 log.error("Events postcommit execution rejected", e); 159 return; 160 } 161 try { 162 // wait for runner to be finished, with timeout 163 Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS); 164 if (Boolean.FALSE.equals(ok)) { 165 log.error("Events postcommit bulk execution aborted due to previous error"); 166 } 167 } catch (InterruptedException e) { 168 // restore interrupted status 169 Thread.currentThread().interrupt(); 170 // interrupt thread 171 futureTask.cancel(true); // mayInterruptIfRunning=true 172 } catch (TimeoutException e) { 173 if (!bulk) { 174 log.warn(String.format("Events postcommit execution exceeded timeout of %sms, leaving thread running", 175 Long.valueOf(timeoutMillis))); 176 // don't cancel task, let it run 177 } else { 178 log.error(String.format( 179 "Events postcommit bulk execution exceeded timeout of %sms, interrupting thread", 180 Long.valueOf(timeoutMillis))); 181 futureTask.cancel(true); // mayInterruptIfRunning=true 182 } 183 } catch (ExecutionException e) { 184 log.error("Events postcommit execution encountered unexpected exception", e.getCause()); 185 } 186 187 if (log.isDebugEnabled()) { 188 log.debug("Events postcommit execution finished"); 189 } 190 } 191 192 /** 193 * Lets the listeners process the event bundle. 194 * <p> 195 * For each listener, the event bundle is reconnected to a session and a transaction is started. 196 * <p> 197 * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for 198 * the other listeners. 199 * <p> 200 * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left 201 * running separately from the main thread that initiated post-commit processing). 202 */ 203 protected static class EventBundleRunner implements Callable<Boolean> { 204 205 protected final List<EventListenerDescriptor> listeners; 206 207 protected final EventBundle bundle; 208 209 public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 210 this.listeners = listeners; 211 this.bundle = bundle; 212 } 213 214 @Override 215 public Boolean call() { 216 if (log.isDebugEnabled()) { 217 log.debug("Events postcommit execution starting in thread: " + Thread.currentThread().getName()); 218 } 219 long t0 = System.currentTimeMillis(); 220 EventStats stats = Framework.getLocalService(EventStats.class); 221 222 for (EventListenerDescriptor listener : listeners) { 223 EventBundle filtered = listener.filterBundle(bundle); 224 if (filtered.isEmpty()) { 225 continue; 226 } 227 if (log.isDebugEnabled()) { 228 log.debug("Events postcommit execution start for listener: " + listener.getName()); 229 } 230 long t1 = System.currentTimeMillis(); 231 232 boolean ok = false; 233 ReconnectedEventBundle reconnected = null; 234 // transaction timeout is managed by the FutureTask 235 boolean tx = TransactionHelper.startTransaction(); 236 try { 237 reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString()); 238 239 listener.asPostCommitListener().handleEvent(reconnected); 240 241 if (Thread.currentThread().isInterrupted()) { 242 log.error("Events postcommit execution interrupted for listener: " + listener.getName()); 243 ok = false; 244 } else { 245 ok = true; 246 } 247 } catch (RuntimeException e) { 248 log.error("Events postcommit execution encountered exception for listener: " + listener.getName(), 249 e); 250 // don't rethrow, but rollback (ok=false) and continue loop 251 } finally { 252 try { 253 if (reconnected != null) { 254 reconnected.disconnect(); 255 } 256 } finally { 257 if (tx) { 258 if (!ok) { 259 TransactionHelper.setTransactionRollbackOnly(); 260 log.error("Rolling back transaction"); 261 } 262 TransactionHelper.commitOrRollbackTransaction(); 263 } 264 if (stats != null) { 265 stats.logAsyncExec(listener, System.currentTimeMillis() - t1); 266 } 267 if (log.isDebugEnabled()) { 268 log.debug("Events postcommit execution end for listener: " + listener.getName() + " in " 269 + (System.currentTimeMillis() - t1) + "ms"); 270 } 271 } 272 } 273 // even if interrupted due to timeout, we continue the loop 274 } 275 if (log.isDebugEnabled()) { 276 log.debug("Events postcommit execution finished in " + (System.currentTimeMillis() - t0) + "ms"); 277 } 278 return Boolean.TRUE; // no error to report 279 } 280 } 281 282 /** 283 * Lets the listeners process the event bundle in bulk mode. 284 * <p> 285 * The event bundle is reconnected to a single session and a single transaction is started for all the listeners. 286 * <p> 287 * In case of exception in a listener, the transaction is rolled back and processing stops. 288 * <p> 289 * In case of timeout, the transaction is rolled back and processing stops. 290 */ 291 protected static class EventBundleBulkRunner implements Callable<Boolean> { 292 293 protected final List<EventListenerDescriptor> listeners; 294 295 protected final EventBundle bundle; 296 297 public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 298 this.listeners = listeners; 299 this.bundle = bundle; 300 } 301 302 @Override 303 public Boolean call() { 304 if (log.isDebugEnabled()) { 305 log.debug("Events postcommit bulk execution starting in thread: " + Thread.currentThread().getName()); 306 } 307 long t0 = System.currentTimeMillis(); 308 309 boolean ok = false; 310 boolean interrupt = false; 311 ReconnectedEventBundle reconnected = null; 312 // transaction timeout is managed by the FutureTask 313 boolean tx = TransactionHelper.startTransaction(); 314 try { 315 reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString()); 316 for (EventListenerDescriptor listener : listeners) { 317 EventBundle filtered = listener.filterBundle(reconnected); 318 if (filtered.isEmpty()) { 319 continue; 320 } 321 if (log.isDebugEnabled()) { 322 log.debug("Events postcommit bulk execution start for listener: " + listener.getName()); 323 } 324 long t1 = System.currentTimeMillis(); 325 try { 326 327 listener.asPostCommitListener().handleEvent(filtered); 328 329 if (Thread.currentThread().isInterrupted()) { 330 log.error("Events postcommit bulk execution interrupted for listener: " 331 + listener.getName() + ", will rollback and abort bulk processing"); 332 interrupt = true; 333 } 334 } catch (RuntimeException e) { 335 log.error( 336 "Events postcommit bulk execution encountered exception for listener: " 337 + listener.getName(), e); 338 return Boolean.FALSE; // report error 339 } finally { 340 if (log.isDebugEnabled()) { 341 log.debug("Events postcommit bulk execution end for listener: " + listener.getName() 342 + " in " + (System.currentTimeMillis() - t1) + "ms"); 343 } 344 } 345 if (interrupt) { 346 break; 347 } 348 } 349 ok = !interrupt; 350 } finally { 351 try { 352 if (reconnected != null) { 353 reconnected.disconnect(); 354 } 355 } finally { 356 if (tx) { 357 if (!ok) { 358 TransactionHelper.setTransactionRollbackOnly(); 359 log.error("Rolling back transaction"); 360 } 361 TransactionHelper.commitOrRollbackTransaction(); 362 } 363 } 364 if (log.isDebugEnabled()) { 365 log.debug("Events postcommit bulk execution finished in " + (System.currentTimeMillis() - t0) 366 + "ms"); 367 } 368 } 369 return Boolean.TRUE; // no error to report 370 } 371 } 372}