001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.event.impl; 021 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.FutureTask; 027import java.util.concurrent.RejectedExecutionException; 028import java.util.concurrent.SynchronousQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.common.logging.SequenceTracer; 038import org.nuxeo.ecm.core.event.EventBundle; 039import org.nuxeo.ecm.core.event.EventStats; 040import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 041import org.nuxeo.runtime.api.Framework; 042import org.nuxeo.runtime.transaction.TransactionHelper; 043 044/** 045 * Executor that passes an event bundle to post-commit asynchronous listeners (in a separated thread in order to manage 046 * transactions). 047 * <p> 048 * Allows a bulk mode where transaction management is not per-listener done once for the whole set of listeners. 049 */ 050public class PostCommitEventExecutor { 051 052 private static final Log log = LogFactory.getLog(PostCommitEventExecutor.class); 053 054 public static final String TIMEOUT_MS_PROP = "org.nuxeo.ecm.core.event.tx.PostCommitExecutor.timeoutMs"; 055 056 public static final int DEFAULT_TIMEOUT_MS = 300; // 0.3s 057 058 public static final int DEFAULT_TIMEOUT_TEST_MS = 60000; // 1 min 059 060 private Integer defaultTimeoutMs; 061 062 public static final String DEFAULT_BULK_TIMEOUT_S = "600"; // 10min 063 064 public static final String BULK_TIMEOUT_PROP = "org.nuxeo.ecm.core.event.tx.BulkExecutor.timeout"; 065 066 private static final long KEEP_ALIVE_TIME_SECOND = 10; 067 068 private static final int MAX_POOL_SIZE = 100; 069 070 protected final ExecutorService executor; 071 072 /** 073 * Creates non-daemon threads at normal priority. 074 */ 075 private static class NamedThreadFactory implements ThreadFactory { 076 077 private final AtomicInteger threadNumber = new AtomicInteger(); 078 079 private final ThreadGroup group; 080 081 private final String prefix; 082 083 public NamedThreadFactory(String prefix) { 084 SecurityManager sm = System.getSecurityManager(); 085 group = sm == null ? Thread.currentThread().getThreadGroup() : sm.getThreadGroup(); 086 this.prefix = prefix; 087 } 088 089 @Override 090 public Thread newThread(Runnable r) { 091 String name = prefix + threadNumber.incrementAndGet(); 092 Thread thread = new Thread(group, r, name); 093 // do not set daemon 094 thread.setPriority(Thread.NORM_PRIORITY); 095 return thread; 096 } 097 } 098 099 public PostCommitEventExecutor() { 100 // use as much thread as needed up to MAX_POOL_SIZE 101 // keep them alive a moment for reuse 102 // have all threads torn down when there is no work to do 103 ThreadFactory threadFactory = new NamedThreadFactory("Nuxeo-Event-PostCommit-"); 104 executor = new ThreadPoolExecutor(0, MAX_POOL_SIZE, KEEP_ALIVE_TIME_SECOND, TimeUnit.SECONDS, 105 new SynchronousQueue<Runnable>(), threadFactory); 106 ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); 107 } 108 109 protected int getDefaultTimeoutMs() { 110 if (defaultTimeoutMs == null) { 111 if (Framework.getProperty(TIMEOUT_MS_PROP) != null) { 112 defaultTimeoutMs = Integer.parseInt(Framework.getProperty(TIMEOUT_MS_PROP)); 113 } else if (Framework.isTestModeSet()) { 114 defaultTimeoutMs = DEFAULT_TIMEOUT_TEST_MS; 115 } else { 116 defaultTimeoutMs = DEFAULT_TIMEOUT_MS; 117 } 118 } 119 return defaultTimeoutMs; 120 } 121 122 public void shutdown(long timeoutMillis) throws InterruptedException { 123 executor.shutdown(); 124 executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS); 125 if (!executor.isTerminated()) { 126 executor.shutdownNow(); 127 } 128 } 129 130 public void run(List<EventListenerDescriptor> listeners, EventBundle event) { 131 run(listeners, event, getDefaultTimeoutMs(), false); 132 } 133 134 public void runBulk(List<EventListenerDescriptor> listeners, EventBundle event) { 135 String timeoutSeconds = Framework.getProperty(BULK_TIMEOUT_PROP, DEFAULT_BULK_TIMEOUT_S); 136 run(listeners, event, Long.parseLong(timeoutSeconds) * 1000, true); 137 } 138 139 public void run(List<EventListenerDescriptor> listeners, EventBundle bundle, long timeoutMillis, boolean bulk) { 140 // check that there's at list one listener interested 141 boolean some = false; 142 for (EventListenerDescriptor listener : listeners) { 143 if (listener.acceptBundle(bundle)) { 144 some = true; 145 break; 146 } 147 } 148 if (!some) { 149 if (log.isDebugEnabled()) { 150 log.debug("Events postcommit execution has nothing to do"); 151 } 152 return; 153 } 154 155 if (log.isDebugEnabled()) { 156 log.debug(String.format("Events postcommit execution starting with timeout %sms%s", 157 Long.valueOf(timeoutMillis), bulk ? " in bulk mode" : "")); 158 } 159 160 Callable<Boolean> callable = !bulk ? new EventBundleRunner(listeners, bundle) : new EventBundleBulkRunner( 161 listeners, bundle); 162 FutureTask<Boolean> futureTask = new FutureTask<Boolean>(callable); 163 try { 164 executor.execute(futureTask); 165 } catch (RejectedExecutionException e) { 166 log.error("Events postcommit execution rejected", e); 167 return; 168 } 169 try { 170 // wait for runner to be finished, with timeout 171 Boolean ok = futureTask.get(timeoutMillis, TimeUnit.MILLISECONDS); 172 if (Boolean.FALSE.equals(ok)) { 173 log.error("Events postcommit bulk execution aborted due to previous error"); 174 } 175 } catch (InterruptedException e) { 176 Thread.currentThread().interrupt(); 177 // interrupt thread 178 futureTask.cancel(true); // mayInterruptIfRunning=true 179 } catch (TimeoutException e) { 180 if (!bulk) { 181 log.info(String.format("Events postcommit execution exceeded timeout of %sms, leaving thread running", 182 Long.valueOf(timeoutMillis))); 183 // don't cancel task, let it run 184 } else { 185 log.error(String.format( 186 "Events postcommit bulk execution exceeded timeout of %sms, interrupting thread", 187 Long.valueOf(timeoutMillis))); 188 futureTask.cancel(true); // mayInterruptIfRunning=true 189 } 190 } catch (ExecutionException e) { 191 log.error("Events postcommit execution encountered unexpected exception", e.getCause()); 192 } 193 194 if (log.isDebugEnabled()) { 195 log.debug("Events postcommit execution finished"); 196 } 197 } 198 199 /** 200 * Lets the listeners process the event bundle. 201 * <p> 202 * For each listener, the event bundle is reconnected to a session and a transaction is started. 203 * <p> 204 * In case of exception in a listener, the transaction is rolled back for that listener but processing continues for 205 * the other listeners. 206 * <p> 207 * In case of timeout, an error is logged but processing continues for the other listeners (the thread is left 208 * running separately from the main thread that initiated post-commit processing). 209 */ 210 protected static class EventBundleRunner implements Callable<Boolean> { 211 212 protected final List<EventListenerDescriptor> listeners; 213 214 protected final EventBundle bundle; 215 216 protected String callerThread; 217 218 public EventBundleRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 219 this.listeners = listeners; 220 this.bundle = bundle; 221 callerThread = SequenceTracer.getThreadName(); 222 } 223 224 @Override 225 public Boolean call() { 226 if (log.isDebugEnabled()) { 227 log.debug("Events postcommit execution starting in thread: " + Thread.currentThread().getName()); 228 } 229 SequenceTracer.startFrom(callerThread, "Postcommit", "#ff410f"); 230 long t0 = System.currentTimeMillis(); 231 EventStats stats = Framework.getService(EventStats.class); 232 233 for (EventListenerDescriptor listener : listeners) { 234 EventBundle filtered = listener.filterBundle(bundle); 235 if (filtered.isEmpty()) { 236 continue; 237 } 238 if (log.isDebugEnabled()) { 239 log.debug("Events postcommit execution start for listener: " + listener.getName()); 240 } 241 SequenceTracer.start("run listener " + listener.getName()); 242 long t1 = System.currentTimeMillis(); 243 244 boolean ok = false; 245 ReconnectedEventBundle reconnected = null; 246 // transaction timeout is managed by the FutureTask 247 boolean tx = TransactionHelper.startTransaction(); 248 try { 249 reconnected = new ReconnectedEventBundleImpl(filtered, listeners.toString()); 250 251 listener.asPostCommitListener().handleEvent(reconnected); 252 253 ok = true; 254 // don't check for interrupted flag, the event completed normally, no reason to rollback 255 } catch (RuntimeException e) { 256 log.error("Events postcommit execution encountered exception for listener: " + listener.getName(), 257 e); 258 // don't rethrow, but rollback (ok=false) and continue loop 259 } finally { 260 try { 261 if (reconnected != null) { 262 reconnected.disconnect(); 263 } 264 } finally { 265 if (tx) { 266 if (!ok) { 267 TransactionHelper.setTransactionRollbackOnly(); 268 log.error("Rolling back transaction"); 269 } 270 TransactionHelper.commitOrRollbackTransaction(); 271 } 272 long elapsed = System.currentTimeMillis() - t1; 273 if (stats != null) { 274 stats.logAsyncExec(listener, elapsed); 275 } 276 if (log.isDebugEnabled()) { 277 log.debug("Events postcommit execution end for listener: " + listener.getName() + " in " 278 + elapsed + "ms"); 279 } 280 SequenceTracer.stop("listener done " + elapsed + " ms"); 281 } 282 } 283 // even if interrupted due to timeout, we continue the loop 284 } 285 long elapsed = System.currentTimeMillis() - t0; 286 if (log.isDebugEnabled()) { 287 log.debug("Events postcommit execution finished in " + elapsed + "ms"); 288 } 289 SequenceTracer.stop("postcommit done" + elapsed + " ms"); 290 return Boolean.TRUE; // no error to report 291 } 292 } 293 294 /** 295 * Lets the listeners process the event bundle in bulk mode. 296 * <p> 297 * The event bundle is reconnected to a single session and a single transaction is started for all the listeners. 298 * <p> 299 * In case of exception in a listener, the transaction is rolled back and processing stops. 300 * <p> 301 * In case of timeout, the transaction is rolled back and processing stops. 302 */ 303 protected static class EventBundleBulkRunner implements Callable<Boolean> { 304 305 protected final List<EventListenerDescriptor> listeners; 306 307 protected final EventBundle bundle; 308 protected final String callerThread; 309 310 public EventBundleBulkRunner(List<EventListenerDescriptor> listeners, EventBundle bundle) { 311 this.listeners = listeners; 312 this.bundle = bundle; 313 callerThread = SequenceTracer.getThreadName(); 314 } 315 316 @Override 317 public Boolean call() { 318 SequenceTracer.startFrom(callerThread, "BulkPostcommit", "#ff410f"); 319 if (log.isDebugEnabled()) { 320 log.debug("Events postcommit bulk execution starting in thread: " + Thread.currentThread().getName()); 321 } 322 long t0 = System.currentTimeMillis(); 323 324 boolean ok = false; 325 boolean interrupt = false; 326 ReconnectedEventBundle reconnected = null; 327 // transaction timeout is managed by the FutureTask 328 boolean tx = TransactionHelper.startTransaction(); 329 try { 330 reconnected = new ReconnectedEventBundleImpl(bundle, listeners.toString()); 331 for (EventListenerDescriptor listener : listeners) { 332 EventBundle filtered = listener.filterBundle(reconnected); 333 if (filtered.isEmpty()) { 334 continue; 335 } 336 SequenceTracer.start("run listener " + listener.getName()); 337 if (log.isDebugEnabled()) { 338 log.debug("Events postcommit bulk execution start for listener: " + listener.getName()); 339 } 340 long t1 = System.currentTimeMillis(); 341 try { 342 343 listener.asPostCommitListener().handleEvent(filtered); 344 345 if (Thread.currentThread().isInterrupted()) { 346 log.error("Events postcommit bulk execution interrupted for listener: " 347 + listener.getName() + ", will rollback and abort bulk processing"); 348 interrupt = true; 349 } 350 } catch (RuntimeException e) { 351 log.error( 352 "Events postcommit bulk execution encountered exception for listener: " 353 + listener.getName(), e); 354 return Boolean.FALSE; // report error 355 } finally { 356 long elapsed = System.currentTimeMillis() - t1; 357 if (log.isDebugEnabled()) { 358 log.debug("Events postcommit bulk execution end for listener: " + listener.getName() 359 + " in " + elapsed + "ms"); 360 } 361 SequenceTracer.stop("listener done " + elapsed + " ms"); 362 } 363 if (interrupt) { 364 break; 365 } 366 } 367 ok = !interrupt; 368 } finally { 369 try { 370 if (reconnected != null) { 371 reconnected.disconnect(); 372 } 373 } finally { 374 if (tx) { 375 if (!ok) { 376 TransactionHelper.setTransactionRollbackOnly(); 377 log.error("Rolling back transaction"); 378 } 379 TransactionHelper.commitOrRollbackTransaction(); 380 } 381 } 382 long elapsed = System.currentTimeMillis() - t0; 383 SequenceTracer.stop("BulkPostcommit done " + elapsed + " ms"); 384 if (log.isDebugEnabled()) { 385 log.debug("Events postcommit bulk execution finished in " + elapsed + "ms"); 386 } 387 } 388 return Boolean.TRUE; // no error to report 389 } 390 } 391}