001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 */ 021package org.nuxeo.ecm.core.event.impl; 022 023import java.rmi.dgc.VMID; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031 032import javax.naming.NamingException; 033import javax.transaction.RollbackException; 034import javax.transaction.Status; 035import javax.transaction.Synchronization; 036import javax.transaction.SystemException; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.common.logging.SequenceTracer; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.api.RecoverableClientException; 043import org.nuxeo.ecm.core.event.Event; 044import org.nuxeo.ecm.core.event.EventBundle; 045import org.nuxeo.ecm.core.event.EventContext; 046import org.nuxeo.ecm.core.event.EventListener; 047import org.nuxeo.ecm.core.event.EventService; 048import org.nuxeo.ecm.core.event.EventServiceAdmin; 049import org.nuxeo.ecm.core.event.EventStats; 050import org.nuxeo.ecm.core.event.PostCommitEventListener; 051import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 052import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig; 053import org.nuxeo.runtime.api.Framework; 054import org.nuxeo.runtime.transaction.TransactionHelper; 055 056/** 057 * Implementation of the event service. 058 */ 059public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 060 061 public static final VMID VMID = new VMID(); 062 063 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 064 065 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 066 @Override 067 protected CompositeEventBundle initialValue() { 068 return new CompositeEventBundle(); 069 } 070 }; 071 072 private static class CompositeEventBundle { 073 074 boolean registeredSynchronization; 075 076 final Map<String, EventBundle> byRepository = new HashMap<String, EventBundle>(); 077 078 void push(Event event) { 079 String repositoryName = event.getContext().getRepositoryName(); 080 if (!byRepository.containsKey(repositoryName)) { 081 byRepository.put(repositoryName, new EventBundleImpl()); 082 } 083 byRepository.get(repositoryName).push(event); 084 } 085 086 } 087 088 protected final EventListenerList listenerDescriptors; 089 090 protected PostCommitEventExecutor postCommitExec; 091 092 protected volatile AsyncEventExecutor asyncExec; 093 094 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<AsyncWaitHook>(); 095 096 protected boolean blockAsyncProcessing = false; 097 098 protected boolean blockSyncPostCommitProcessing = false; 099 100 protected boolean bulkModeEnabled = false; 101 102 public EventServiceImpl() { 103 listenerDescriptors = new EventListenerList(); 104 postCommitExec = new PostCommitEventExecutor(); 105 asyncExec = new AsyncEventExecutor(); 106 } 107 108 public void init() { 109 asyncExec.init(); 110 } 111 112 public void shutdown(long timeoutMillis) throws InterruptedException { 113 postCommitExec.shutdown(timeoutMillis); 114 Set<AsyncWaitHook> notTerminated = new HashSet<AsyncWaitHook>(); 115 for (AsyncWaitHook hook : asyncWaitHooks) { 116 if (hook.shutdown() == false) { 117 notTerminated.add(hook); 118 } 119 } 120 if (!notTerminated.isEmpty()) { 121 throw new RuntimeException("Asynch services are still running : " + notTerminated); 122 } 123 if (asyncExec.shutdown(timeoutMillis) == false) { 124 throw new RuntimeException("Async executor is still running, timeout expired"); 125 } 126 } 127 128 public void registerForAsyncWait(AsyncWaitHook callback) { 129 asyncWaitHooks.add(callback); 130 } 131 132 public void unregisterForAsyncWait(AsyncWaitHook callback) { 133 asyncWaitHooks.remove(callback); 134 } 135 136 /** 137 * @deprecated use {@link #waitForAsyncCompletion()} instead. 138 */ 139 @Deprecated 140 public int getActiveAsyncTaskCount() { 141 return asyncExec.getUnfinishedCount(); 142 } 143 144 @Override 145 public void waitForAsyncCompletion() { 146 waitForAsyncCompletion(Long.MAX_VALUE); 147 } 148 149 @Override 150 public void waitForAsyncCompletion(long timeout) { 151 Set<AsyncWaitHook> notCompleted = new HashSet<AsyncWaitHook>(); 152 for (AsyncWaitHook hook : asyncWaitHooks) { 153 if (!hook.waitForAsyncCompletion()) { 154 notCompleted.add(hook); 155 } 156 } 157 if (!notCompleted.isEmpty()) { 158 throw new RuntimeException("Async tasks are still running : " + notCompleted); 159 } 160 try { 161 if (!asyncExec.waitForCompletion(timeout)) { 162 throw new RuntimeException("Async event listeners thread pool is not terminated"); 163 } 164 } catch (InterruptedException e) { 165 Thread.currentThread().interrupt(); 166 // TODO change signature 167 throw new RuntimeException(e); 168 } 169 } 170 171 @Override 172 public void addEventListener(EventListenerDescriptor listener) { 173 listenerDescriptors.add(listener); 174 log.debug("Registered event listener: " + listener.getName()); 175 } 176 177 @Override 178 public void removeEventListener(EventListenerDescriptor listener) { 179 listenerDescriptors.removeDescriptor(listener); 180 log.debug("Unregistered event listener: " + listener.getName()); 181 } 182 183 @Override 184 public void fireEvent(String name, EventContext context) { 185 fireEvent(new EventImpl(name, context)); 186 } 187 188 @Override 189 public void fireEvent(Event event) { 190 191 String ename = event.getName(); 192 EventStats stats = Framework.getService(EventStats.class); 193 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 194 if (!desc.acceptEvent(ename)) { 195 continue; 196 } 197 try { 198 long t0 = System.currentTimeMillis(); 199 SequenceTracer.start("Fire sync event " + event.getName()); 200 desc.asEventListener().handleEvent(event); 201 long elapsed = System.currentTimeMillis() - t0; 202 SequenceTracer.stop("done in " + elapsed + " ms"); 203 if (stats != null) { 204 stats.logSyncExec(desc, elapsed); 205 } 206 if (event.isCanceled()) { 207 // break loop 208 return; 209 } 210 } catch (RuntimeException e) { 211 // get message 212 SequenceTracer.destroy("failure"); 213 String message = "Exception during " + desc.getName() + " sync listener execution, "; 214 if (event.isBubbleException()) { 215 message += "other listeners will be ignored"; 216 } else if (event.isMarkedForRollBack()) { 217 message += "transaction will be rolled back"; 218 if (event.getRollbackMessage() != null) { 219 message += " (" + event.getRollbackMessage() + ")"; 220 } 221 } else { 222 message += "continuing to run other listeners"; 223 } 224 // log 225 if (e instanceof RecoverableClientException) { 226 log.info(message + "\n" + e.getMessage()); 227 log.debug(message, e); 228 } else { 229 log.error(message, e); 230 } 231 // rethrow or swallow 232 if (event.isBubbleException()) { 233 throw e; 234 } else if (event.isMarkedForRollBack()) { 235 Exception ee; 236 if (event.getRollbackException() != null) { 237 ee = event.getRollbackException(); 238 } else { 239 ee = e; 240 } 241 // when marked for rollback, throw a generic 242 // RuntimeException to make sure nobody catches it 243 throw new RuntimeException(message, ee); 244 } else { 245 // swallow exception 246 } 247 } 248 } 249 250 if (!event.isInline()) { // record the event 251 // don't record the complete event, only a shallow copy 252 ShallowEvent shallowEvent = ShallowEvent.create(event); 253 if (event.isImmediate()) { 254 EventBundleImpl b = new EventBundleImpl(); 255 b.push(shallowEvent); 256 fireEventBundle(b); 257 } else { 258 recordEvent(shallowEvent); 259 } 260 } 261 } 262 263 @Override 264 public void fireEventBundle(EventBundle event) { 265 boolean comesFromJMS = false; 266 267 if (event instanceof ReconnectedEventBundle) { 268 if (((ReconnectedEventBundle) event).comesFromJMS()) { 269 comesFromJMS = true; 270 } 271 } 272 273 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 274 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 275 276 if (bulkModeEnabled) { 277 // run all listeners synchronously in one transaction 278 List<EventListenerDescriptor> listeners = new ArrayList<EventListenerDescriptor>(); 279 if (!blockSyncPostCommitProcessing) { 280 listeners = postCommitSync; 281 } 282 if (!blockAsyncProcessing) { 283 listeners.addAll(postCommitAsync); 284 } 285 if (!listeners.isEmpty()) { 286 postCommitExec.runBulk(listeners, event); 287 } 288 return; 289 } 290 291 // run sync listeners 292 if (blockSyncPostCommitProcessing) { 293 log.debug("Dropping PostCommit handler execution"); 294 } else if (comesFromJMS) { 295 // when called from JMS we must skip sync listeners 296 // - postComit listeners should be on the core 297 // - there is no transaction started by JMS listener 298 log.debug("Deactivating sync post-commit listener since we are called from JMS"); 299 } else { 300 if (!postCommitSync.isEmpty()) { 301 postCommitExec.run(postCommitSync, event); 302 } 303 } 304 305 if (blockAsyncProcessing) { 306 log.debug("Dopping bundle"); 307 return; 308 } 309 310 // fire async listeners 311 if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) { 312 log.debug("Skipping async exec, this will be triggered via JMS"); 313 } else { 314 asyncExec.run(postCommitAsync, event); 315 } 316 } 317 318 @Override 319 public void fireEventBundleSync(EventBundle event) { 320 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 321 desc.asPostCommitListener().handleEvent(event); 322 } 323 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 324 desc.asPostCommitListener().handleEvent(event); 325 } 326 } 327 328 @Override 329 public List<EventListener> getEventListeners() { 330 return listenerDescriptors.getInLineListeners(); 331 } 332 333 @Override 334 public List<PostCommitEventListener> getPostCommitEventListeners() { 335 List<PostCommitEventListener> result = new ArrayList<PostCommitEventListener>(); 336 337 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 338 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 339 340 return result; 341 } 342 343 public EventListenerList getEventListenerList() { 344 return listenerDescriptors; 345 } 346 347 @Override 348 public EventListenerDescriptor getEventListener(String name) { 349 return listenerDescriptors.getDescriptor(name); 350 } 351 352 // methods for monitoring 353 354 @Override 355 public EventListenerList getListenerList() { 356 return listenerDescriptors; 357 } 358 359 @Override 360 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 361 if (!listenerDescriptors.hasListener(listenerName)) { 362 return; 363 } 364 365 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 366 if (desc.getName().equals(listenerName)) { 367 desc.setEnabled(enabled); 368 synchronized (this) { 369 listenerDescriptors.recomputeEnabledListeners(); 370 } 371 return; 372 } 373 } 374 375 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 376 if (desc.getName().equals(listenerName)) { 377 desc.setEnabled(enabled); 378 synchronized (this) { 379 listenerDescriptors.recomputeEnabledListeners(); 380 } 381 return; 382 } 383 } 384 385 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 386 if (desc.getName().equals(listenerName)) { 387 desc.setEnabled(enabled); 388 synchronized (this) { 389 listenerDescriptors.recomputeEnabledListeners(); 390 } 391 return; 392 } 393 } 394 } 395 396 @Override 397 public int getActiveThreadsCount() { 398 return asyncExec.getActiveCount(); 399 } 400 401 @Override 402 public int getEventsInQueueCount() { 403 return asyncExec.getUnfinishedCount(); 404 } 405 406 @Override 407 public boolean isBlockAsyncHandlers() { 408 return blockAsyncProcessing; 409 } 410 411 @Override 412 public boolean isBlockSyncPostCommitHandlers() { 413 return blockSyncPostCommitProcessing; 414 } 415 416 @Override 417 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 418 blockAsyncProcessing = blockAsyncHandlers; 419 } 420 421 @Override 422 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 423 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 424 } 425 426 @Override 427 public boolean isBulkModeEnabled() { 428 return bulkModeEnabled; 429 } 430 431 @Override 432 public void setBulkModeEnabled(boolean bulkModeEnabled) { 433 this.bulkModeEnabled = bulkModeEnabled; 434 } 435 436 protected void recordEvent(Event event) { 437 CompositeEventBundle b = threadBundles.get(); 438 b.push(event); 439 if (TransactionHelper.isTransactionActive()) { 440 if (!b.registeredSynchronization) { 441 // register as synchronization 442 try { 443 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 444 } catch (NamingException | SystemException | RollbackException e) { 445 throw new RuntimeException("Cannot register Synchronization", e); 446 } 447 b.registeredSynchronization = true; 448 } 449 } else if (event.isCommitEvent()) { 450 handleTxCommited(); 451 } 452 } 453 454 @Override 455 public void beforeCompletion() { 456 } 457 458 @Override 459 public void afterCompletion(int status) { 460 if (status == Status.STATUS_COMMITTED) { 461 handleTxCommited(); 462 } else if (status == Status.STATUS_ROLLEDBACK) { 463 handleTxRollbacked(); 464 } else { 465 log.error("Unexpected afterCompletion status: " + status); 466 } 467 } 468 469 protected void handleTxRollbacked() { 470 threadBundles.remove(); 471 } 472 473 protected void handleTxCommited() { 474 CompositeEventBundle b = threadBundles.get(); 475 threadBundles.remove(); 476 477 // notify post commit event listeners 478 for (EventBundle bundle : b.byRepository.values()) { 479 try { 480 fireEventBundle(bundle); 481 } catch (NuxeoException e) { 482 log.error("Error while processing " + bundle, e); 483 } 484 } 485 } 486 487}