001/* 002 * Copyright (c) 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Bogdan Stefanescu 011 * Thierry Delprat 012 * Florent Guillaume 013 */ 014package org.nuxeo.ecm.core.event.impl; 015 016import java.rmi.dgc.VMID; 017import java.util.ArrayList; 018import java.util.HashMap; 019import java.util.HashSet; 020import java.util.List; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.CopyOnWriteArrayList; 024 025import javax.naming.NamingException; 026import javax.transaction.RollbackException; 027import javax.transaction.Status; 028import javax.transaction.Synchronization; 029import javax.transaction.SystemException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.core.api.NuxeoException; 034import org.nuxeo.ecm.core.api.RecoverableClientException; 035import org.nuxeo.ecm.core.event.Event; 036import org.nuxeo.ecm.core.event.EventBundle; 037import org.nuxeo.ecm.core.event.EventContext; 038import org.nuxeo.ecm.core.event.EventListener; 039import org.nuxeo.ecm.core.event.EventService; 040import org.nuxeo.ecm.core.event.EventServiceAdmin; 041import org.nuxeo.ecm.core.event.EventStats; 042import org.nuxeo.ecm.core.event.PostCommitEventListener; 043import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 044import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.transaction.TransactionHelper; 047 048/** 049 * Implementation of the event service. 050 */ 051public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 052 053 public static final VMID VMID = new VMID(); 054 055 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 056 057 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 058 @Override 059 protected CompositeEventBundle initialValue() { 060 return new CompositeEventBundle(); 061 } 062 }; 063 064 private static class CompositeEventBundle { 065 066 boolean registeredSynchronization; 067 068 final Map<String, EventBundle> byRepository = new HashMap<String, EventBundle>(); 069 070 void push(Event event) { 071 String repositoryName = event.getContext().getRepositoryName(); 072 if (!byRepository.containsKey(repositoryName)) { 073 byRepository.put(repositoryName, new EventBundleImpl()); 074 } 075 byRepository.get(repositoryName).push(event); 076 } 077 078 } 079 080 protected final EventListenerList listenerDescriptors; 081 082 protected PostCommitEventExecutor postCommitExec; 083 084 protected volatile AsyncEventExecutor asyncExec; 085 086 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<AsyncWaitHook>(); 087 088 protected boolean blockAsyncProcessing = false; 089 090 protected boolean blockSyncPostCommitProcessing = false; 091 092 protected boolean bulkModeEnabled = false; 093 094 public EventServiceImpl() { 095 listenerDescriptors = new EventListenerList(); 096 postCommitExec = new PostCommitEventExecutor(); 097 asyncExec = new AsyncEventExecutor(); 098 } 099 100 public void init() { 101 asyncExec.init(); 102 } 103 104 public void shutdown(long timeoutMillis) throws InterruptedException { 105 postCommitExec.shutdown(timeoutMillis); 106 Set<AsyncWaitHook> notTerminated = new HashSet<AsyncWaitHook>(); 107 for (AsyncWaitHook hook : asyncWaitHooks) { 108 if (hook.shutdown() == false) { 109 notTerminated.add(hook); 110 } 111 } 112 if (!notTerminated.isEmpty()) { 113 throw new RuntimeException("Asynch services are still running : " + notTerminated); 114 } 115 if (asyncExec.shutdown(timeoutMillis) == false) { 116 throw new RuntimeException("Async executor is still running, timeout expired"); 117 } 118 } 119 120 public void registerForAsyncWait(AsyncWaitHook callback) { 121 asyncWaitHooks.add(callback); 122 } 123 124 public void unregisterForAsyncWait(AsyncWaitHook callback) { 125 asyncWaitHooks.remove(callback); 126 } 127 128 /** 129 * @deprecated use {@link #waitForAsyncCompletion()} instead. 130 */ 131 @Deprecated 132 public int getActiveAsyncTaskCount() { 133 return asyncExec.getUnfinishedCount(); 134 } 135 136 @Override 137 public void waitForAsyncCompletion() { 138 waitForAsyncCompletion(Long.MAX_VALUE); 139 } 140 141 @Override 142 public void waitForAsyncCompletion(long timeout) { 143 Set<AsyncWaitHook> notCompleted = new HashSet<AsyncWaitHook>(); 144 for (AsyncWaitHook hook : asyncWaitHooks) { 145 if (!hook.waitForAsyncCompletion()) { 146 notCompleted.add(hook); 147 } 148 } 149 if (!notCompleted.isEmpty()) { 150 throw new RuntimeException("Async tasks are still running : " + notCompleted); 151 } 152 try { 153 if (!asyncExec.waitForCompletion(timeout)) { 154 throw new RuntimeException("Async event listeners thread pool is not terminated"); 155 } 156 } catch (InterruptedException e) { 157 Thread.currentThread().interrupt(); 158 // TODO change signature 159 throw new RuntimeException(e); 160 } 161 } 162 163 @Override 164 public void addEventListener(EventListenerDescriptor listener) { 165 listenerDescriptors.add(listener); 166 log.debug("Registered event listener: " + listener.getName()); 167 } 168 169 @Override 170 public void removeEventListener(EventListenerDescriptor listener) { 171 listenerDescriptors.removeDescriptor(listener); 172 log.debug("Unregistered event listener: " + listener.getName()); 173 } 174 175 @Override 176 public void fireEvent(String name, EventContext context) { 177 fireEvent(new EventImpl(name, context)); 178 } 179 180 @Override 181 public void fireEvent(Event event) { 182 183 String ename = event.getName(); 184 EventStats stats = Framework.getService(EventStats.class); 185 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 186 if (!desc.acceptEvent(ename)) { 187 continue; 188 } 189 try { 190 long t0 = System.currentTimeMillis(); 191 desc.asEventListener().handleEvent(event); 192 if (stats != null) { 193 stats.logSyncExec(desc, System.currentTimeMillis() - t0); 194 } 195 if (event.isCanceled()) { 196 // break loop 197 return; 198 } 199 } catch (RuntimeException e) { 200 // get message 201 String message = "Exception during " + desc.getName() + " sync listener execution, "; 202 if (event.isBubbleException()) { 203 message += "other listeners will be ignored"; 204 } else if (event.isMarkedForRollBack()) { 205 message += "transaction will be rolled back"; 206 if (event.getRollbackMessage() != null) { 207 message += " (" + event.getRollbackMessage() + ")"; 208 } 209 } else { 210 message += "continuing to run other listeners"; 211 } 212 // log 213 if (e instanceof RecoverableClientException) { 214 log.info(message + "\n" + e.getMessage()); 215 log.debug(message, e); 216 } else { 217 log.error(message, e); 218 } 219 // rethrow or swallow 220 if (event.isBubbleException()) { 221 throw e; 222 } else if (event.isMarkedForRollBack()) { 223 Exception ee; 224 if (event.getRollbackException() != null) { 225 ee = event.getRollbackException(); 226 } else { 227 ee = e; 228 } 229 // when marked for rollback, throw a generic 230 // RuntimeException to make sure nobody catches it 231 throw new RuntimeException(message, ee); 232 } else { 233 // swallow exception 234 } 235 } 236 } 237 238 if (!event.isInline()) { // record the event 239 // don't record the complete event, only a shallow copy 240 ShallowEvent shallowEvent = ShallowEvent.create(event); 241 if (event.isImmediate()) { 242 EventBundleImpl b = new EventBundleImpl(); 243 b.push(shallowEvent); 244 fireEventBundle(b); 245 } else { 246 recordEvent(shallowEvent); 247 } 248 } 249 } 250 251 @Override 252 public void fireEventBundle(EventBundle event) { 253 boolean comesFromJMS = false; 254 255 if (event instanceof ReconnectedEventBundle) { 256 if (((ReconnectedEventBundle) event).comesFromJMS()) { 257 comesFromJMS = true; 258 } 259 } 260 261 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 262 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 263 264 if (bulkModeEnabled) { 265 // run all listeners synchronously in one transaction 266 List<EventListenerDescriptor> listeners = new ArrayList<EventListenerDescriptor>(); 267 if (!blockSyncPostCommitProcessing) { 268 listeners = postCommitSync; 269 } 270 if (!blockAsyncProcessing) { 271 listeners.addAll(postCommitAsync); 272 } 273 if (!listeners.isEmpty()) { 274 postCommitExec.runBulk(listeners, event); 275 } 276 return; 277 } 278 279 // run sync listeners 280 if (blockSyncPostCommitProcessing) { 281 log.debug("Dropping PostCommit handler execution"); 282 } else if (comesFromJMS) { 283 // when called from JMS we must skip sync listeners 284 // - postComit listeners should be on the core 285 // - there is no transaction started by JMS listener 286 log.debug("Deactivating sync post-commit listener since we are called from JMS"); 287 } else { 288 if (!postCommitSync.isEmpty()) { 289 postCommitExec.run(postCommitSync, event); 290 } 291 } 292 293 if (blockAsyncProcessing) { 294 log.debug("Dopping bundle"); 295 return; 296 } 297 298 // fire async listeners 299 if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) { 300 log.debug("Skipping async exec, this will be triggered via JMS"); 301 } else { 302 asyncExec.run(postCommitAsync, event); 303 } 304 } 305 306 @Override 307 public void fireEventBundleSync(EventBundle event) { 308 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 309 desc.asPostCommitListener().handleEvent(event); 310 } 311 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 312 desc.asPostCommitListener().handleEvent(event); 313 } 314 } 315 316 @Override 317 public List<EventListener> getEventListeners() { 318 return listenerDescriptors.getInLineListeners(); 319 } 320 321 @Override 322 public List<PostCommitEventListener> getPostCommitEventListeners() { 323 List<PostCommitEventListener> result = new ArrayList<PostCommitEventListener>(); 324 325 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 326 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 327 328 return result; 329 } 330 331 public EventListenerList getEventListenerList() { 332 return listenerDescriptors; 333 } 334 335 @Override 336 public EventListenerDescriptor getEventListener(String name) { 337 return listenerDescriptors.getDescriptor(name); 338 } 339 340 // methods for monitoring 341 342 @Override 343 public EventListenerList getListenerList() { 344 return listenerDescriptors; 345 } 346 347 @Override 348 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 349 if (!listenerDescriptors.hasListener(listenerName)) { 350 return; 351 } 352 353 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 354 if (desc.getName().equals(listenerName)) { 355 desc.setEnabled(enabled); 356 synchronized (this) { 357 listenerDescriptors.recomputeEnabledListeners(); 358 } 359 return; 360 } 361 } 362 363 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 364 if (desc.getName().equals(listenerName)) { 365 desc.setEnabled(enabled); 366 synchronized (this) { 367 listenerDescriptors.recomputeEnabledListeners(); 368 } 369 return; 370 } 371 } 372 373 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 374 if (desc.getName().equals(listenerName)) { 375 desc.setEnabled(enabled); 376 synchronized (this) { 377 listenerDescriptors.recomputeEnabledListeners(); 378 } 379 return; 380 } 381 } 382 } 383 384 @Override 385 public int getActiveThreadsCount() { 386 return asyncExec.getActiveCount(); 387 } 388 389 @Override 390 public int getEventsInQueueCount() { 391 return asyncExec.getUnfinishedCount(); 392 } 393 394 @Override 395 public boolean isBlockAsyncHandlers() { 396 return blockAsyncProcessing; 397 } 398 399 @Override 400 public boolean isBlockSyncPostCommitHandlers() { 401 return blockSyncPostCommitProcessing; 402 } 403 404 @Override 405 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 406 blockAsyncProcessing = blockAsyncHandlers; 407 } 408 409 @Override 410 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 411 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 412 } 413 414 @Override 415 public boolean isBulkModeEnabled() { 416 return bulkModeEnabled; 417 } 418 419 @Override 420 public void setBulkModeEnabled(boolean bulkModeEnabled) { 421 this.bulkModeEnabled = bulkModeEnabled; 422 } 423 424 protected void recordEvent(Event event) { 425 CompositeEventBundle b = threadBundles.get(); 426 b.push(event); 427 if (TransactionHelper.isTransactionActive()) { 428 if (!b.registeredSynchronization) { 429 // register as synchronization 430 try { 431 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 432 } catch (NamingException | SystemException | RollbackException e) { 433 throw new RuntimeException("Cannot register Synchronization", e); 434 } 435 b.registeredSynchronization = true; 436 } 437 } else if (event.isCommitEvent()) { 438 handleTxCommited(); 439 } 440 } 441 442 @Override 443 public void beforeCompletion() { 444 } 445 446 @Override 447 public void afterCompletion(int status) { 448 if (status == Status.STATUS_COMMITTED) { 449 handleTxCommited(); 450 } else if (status == Status.STATUS_ROLLEDBACK) { 451 handleTxRollbacked(); 452 } else { 453 log.error("Unexpected afterCompletion status: " + status); 454 } 455 } 456 457 protected void handleTxRollbacked() { 458 threadBundles.remove(); 459 } 460 461 protected void handleTxCommited() { 462 CompositeEventBundle b = threadBundles.get(); 463 threadBundles.remove(); 464 465 // notify post commit event listeners 466 for (EventBundle bundle : b.byRepository.values()) { 467 try { 468 fireEventBundle(bundle); 469 } catch (NuxeoException e) { 470 log.error("Error while processing " + bundle, e); 471 } 472 } 473 } 474 475}