001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.common.logging.SequenceTracer; 027import org.nuxeo.ecm.core.api.NuxeoException; 028import org.nuxeo.ecm.core.api.RecoverableClientException; 029import org.nuxeo.ecm.core.event.Event; 030import org.nuxeo.ecm.core.event.EventBundle; 031import org.nuxeo.ecm.core.event.EventContext; 032import org.nuxeo.ecm.core.event.EventListener; 033import org.nuxeo.ecm.core.event.EventService; 034import org.nuxeo.ecm.core.event.EventServiceAdmin; 035import org.nuxeo.ecm.core.event.EventStats; 036import org.nuxeo.ecm.core.event.PostCommitEventListener; 037import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 038import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig; 039import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor; 040import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry; 041import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher; 042import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor; 043import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047import javax.naming.NamingException; 048import javax.transaction.RollbackException; 049import javax.transaction.Status; 050import javax.transaction.Synchronization; 051import javax.transaction.SystemException; 052import java.rmi.dgc.VMID; 053import java.util.ArrayList; 054import java.util.HashMap; 055import java.util.List; 056import java.util.Map; 057import java.util.Set; 058import java.util.concurrent.CopyOnWriteArrayList; 059import java.util.stream.Collectors; 060 061/** 062 * Implementation of the event service. 063 */ 064public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 065 066 public static final VMID VMID = new VMID(); 067 068 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 069 070 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 071 @Override 072 protected CompositeEventBundle initialValue() { 073 return new CompositeEventBundle(); 074 } 075 }; 076 077 private static class CompositeEventBundle { 078 079 boolean registeredSynchronization; 080 081 final Map<String, EventBundle> byRepository = new HashMap<String, EventBundle>(); 082 083 void push(Event event) { 084 String repositoryName = event.getContext().getRepositoryName(); 085 if (!byRepository.containsKey(repositoryName)) { 086 byRepository.put(repositoryName, new EventBundleImpl()); 087 } 088 byRepository.get(repositoryName).push(event); 089 } 090 091 } 092 093 protected final EventListenerList listenerDescriptors; 094 095 protected PostCommitEventExecutor postCommitExec; 096 097 protected volatile AsyncEventExecutor asyncExec; 098 099 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<AsyncWaitHook>(); 100 101 protected boolean blockAsyncProcessing = false; 102 103 protected boolean blockSyncPostCommitProcessing = false; 104 105 protected boolean bulkModeEnabled = false; 106 107 protected EventPipeRegistry registeredPipes = new EventPipeRegistry(); 108 109 protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry(); 110 111 protected EventBundleDispatcher pipeDispatcher; 112 113 public EventServiceImpl() { 114 listenerDescriptors = new EventListenerList(); 115 postCommitExec = new PostCommitEventExecutor(); 116 asyncExec = new AsyncEventExecutor(); 117 } 118 119 public void init() { 120 asyncExec.init(); 121 122 EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor(); 123 if (dispatcherDescriptor != null) { 124 List<EventPipeDescriptor> pipes = registeredPipes.getPipes(); 125 if (!pipes.isEmpty()) { 126 pipeDispatcher = dispatcherDescriptor.getInstance(); 127 pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters()); 128 } 129 } 130 } 131 132 public EventBundleDispatcher getEventBundleDispatcher() { 133 return pipeDispatcher; 134 } 135 136 public void shutdown(long timeoutMillis) throws InterruptedException { 137 postCommitExec.shutdown(timeoutMillis); 138 Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream() 139 .filter(hook -> hook.shutdown() == false) 140 .collect(Collectors.toSet()); 141 if (!notTerminated.isEmpty()) { 142 throw new RuntimeException("Asynch services are still running : " + notTerminated); 143 } 144 145 if (asyncExec.shutdown(timeoutMillis) == false) { 146 throw new RuntimeException("Async executor is still running, timeout expired"); 147 } 148 if (pipeDispatcher != null) { 149 pipeDispatcher.shutdown(); 150 } 151 } 152 153 public void registerForAsyncWait(AsyncWaitHook callback) { 154 asyncWaitHooks.add(callback); 155 } 156 157 public void unregisterForAsyncWait(AsyncWaitHook callback) { 158 asyncWaitHooks.remove(callback); 159 } 160 161 /** 162 * @deprecated use {@link #waitForAsyncCompletion()} instead. 163 */ 164 @Deprecated 165 public int getActiveAsyncTaskCount() { 166 return asyncExec.getUnfinishedCount(); 167 } 168 169 @Override 170 public void waitForAsyncCompletion() { 171 waitForAsyncCompletion(Long.MAX_VALUE); 172 } 173 174 @Override 175 public void waitForAsyncCompletion(long timeout) { 176 Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream() 177 .filter(hook -> !hook.waitForAsyncCompletion()) 178 .collect(Collectors.toSet()); 179 if (!notCompleted.isEmpty()) { 180 throw new RuntimeException("Async tasks are still running : " + notCompleted); 181 } 182 try { 183 if (!asyncExec.waitForCompletion(timeout)) { 184 throw new RuntimeException("Async event listeners thread pool is not terminated"); 185 } 186 } catch (InterruptedException e) { 187 Thread.currentThread().interrupt(); 188 // TODO change signature 189 throw new RuntimeException(e); 190 } 191 if (pipeDispatcher != null) { 192 try { 193 pipeDispatcher.waitForCompletion(timeout); 194 } catch (InterruptedException e) { 195 Thread.currentThread().interrupt(); 196 throw new RuntimeException(e); 197 } 198 } 199 } 200 201 @Override 202 public void addEventListener(EventListenerDescriptor listener) { 203 listenerDescriptors.add(listener); 204 log.debug("Registered event listener: " + listener.getName()); 205 } 206 207 public void addEventPipe(EventPipeDescriptor pipeDescriptor) { 208 registeredPipes.addContribution(pipeDescriptor); 209 log.debug("Registered event pipe: " + pipeDescriptor.getName()); 210 } 211 212 public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 213 dispatchers.addContrib(dispatcherDescriptor); 214 log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName()); 215 } 216 217 @Override 218 public void removeEventListener(EventListenerDescriptor listener) { 219 listenerDescriptors.removeDescriptor(listener); 220 log.debug("Unregistered event listener: " + listener.getName()); 221 } 222 223 public void removeEventPipe(EventPipeDescriptor pipeDescriptor) { 224 registeredPipes.removeContribution(pipeDescriptor); 225 log.debug("Unregistered event pipe: " + pipeDescriptor.getName()); 226 } 227 228 public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 229 dispatchers.removeContrib(dispatcherDescriptor); 230 log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName()); 231 } 232 233 @Override 234 public void fireEvent(String name, EventContext context) { 235 fireEvent(new EventImpl(name, context)); 236 } 237 238 @Override 239 public void fireEvent(Event event) { 240 241 String ename = event.getName(); 242 EventStats stats = Framework.getService(EventStats.class); 243 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 244 if (!desc.acceptEvent(ename)) { 245 continue; 246 } 247 try { 248 long t0 = System.currentTimeMillis(); 249 SequenceTracer.start("Fire sync event " + event.getName()); 250 desc.asEventListener().handleEvent(event); 251 long elapsed = System.currentTimeMillis() - t0; 252 SequenceTracer.stop("done in " + elapsed + " ms"); 253 if (stats != null) { 254 stats.logSyncExec(desc, elapsed); 255 } 256 if (event.isCanceled()) { 257 // break loop 258 return; 259 } 260 } catch (RuntimeException e) { 261 // get message 262 SequenceTracer.destroy("failure"); 263 String message = "Exception during " + desc.getName() + " sync listener execution, "; 264 if (event.isBubbleException()) { 265 message += "other listeners will be ignored"; 266 } else if (event.isMarkedForRollBack()) { 267 message += "transaction will be rolled back"; 268 if (event.getRollbackMessage() != null) { 269 message += " (" + event.getRollbackMessage() + ")"; 270 } 271 } else { 272 message += "continuing to run other listeners"; 273 } 274 // log 275 if (e instanceof RecoverableClientException) { 276 log.info(message + "\n" + e.getMessage()); 277 log.debug(message, e); 278 } else { 279 log.error(message, e); 280 } 281 // rethrow or swallow 282 if (event.isBubbleException()) { 283 throw e; 284 } else if (event.isMarkedForRollBack()) { 285 Exception ee; 286 if (event.getRollbackException() != null) { 287 ee = event.getRollbackException(); 288 } else { 289 ee = e; 290 } 291 // when marked for rollback, throw a generic 292 // RuntimeException to make sure nobody catches it 293 throw new RuntimeException(message, ee); 294 } else { 295 // swallow exception 296 } 297 } 298 } 299 300 if (!event.isInline()) { // record the event 301 // don't record the complete event, only a shallow copy 302 ShallowEvent shallowEvent = ShallowEvent.create(event); 303 if (event.isImmediate()) { 304 EventBundleImpl b = new EventBundleImpl(); 305 b.push(shallowEvent); 306 fireEventBundle(b); 307 } else { 308 recordEvent(shallowEvent); 309 } 310 } 311 } 312 313 @Override 314 public void fireEventBundle(EventBundle event) { 315 boolean comesFromJMS = false; 316 317 if (event instanceof ReconnectedEventBundle) { 318 if (((ReconnectedEventBundle) event).comesFromJMS()) { 319 comesFromJMS = true; 320 } 321 } 322 323 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 324 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 325 326 if (bulkModeEnabled) { 327 // run all listeners synchronously in one transaction 328 List<EventListenerDescriptor> listeners = new ArrayList<EventListenerDescriptor>(); 329 if (!blockSyncPostCommitProcessing) { 330 listeners = postCommitSync; 331 } 332 if (!blockAsyncProcessing) { 333 listeners.addAll(postCommitAsync); 334 } 335 if (!listeners.isEmpty()) { 336 postCommitExec.runBulk(listeners, event); 337 } 338 return; 339 } 340 341 // run sync listeners 342 if (blockSyncPostCommitProcessing) { 343 log.debug("Dropping PostCommit handler execution"); 344 } else if (comesFromJMS) { 345 // when called from JMS we must skip sync listeners 346 // - postComit listeners should be on the core 347 // - there is no transaction started by JMS listener 348 log.debug("Deactivating sync post-commit listener since we are called from JMS"); 349 } else { 350 if (!postCommitSync.isEmpty()) { 351 postCommitExec.run(postCommitSync, event); 352 } 353 } 354 355 if (blockAsyncProcessing) { 356 log.debug("Dopping bundle"); 357 return; 358 } 359 360 // fire async listeners 361 if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) { 362 log.debug("Skipping async exec, this will be triggered via JMS"); 363 } else { 364 if (pipeDispatcher == null) { 365 asyncExec.run(postCommitAsync, event); 366 } else { 367 // rather than sending to the WorkManager: send to the Pipe 368 pipeDispatcher.sendEventBundle(event); 369 } 370 } 371 } 372 373 @Override 374 public void fireEventBundleSync(EventBundle event) { 375 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 376 desc.asPostCommitListener().handleEvent(event); 377 } 378 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 379 desc.asPostCommitListener().handleEvent(event); 380 } 381 } 382 383 @Override 384 public List<EventListener> getEventListeners() { 385 return listenerDescriptors.getInLineListeners(); 386 } 387 388 @Override 389 public List<PostCommitEventListener> getPostCommitEventListeners() { 390 List<PostCommitEventListener> result = new ArrayList<PostCommitEventListener>(); 391 392 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 393 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 394 395 return result; 396 } 397 398 public EventListenerList getEventListenerList() { 399 return listenerDescriptors; 400 } 401 402 @Override 403 public EventListenerDescriptor getEventListener(String name) { 404 return listenerDescriptors.getDescriptor(name); 405 } 406 407 // methods for monitoring 408 409 @Override 410 public EventListenerList getListenerList() { 411 return listenerDescriptors; 412 } 413 414 @Override 415 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 416 if (!listenerDescriptors.hasListener(listenerName)) { 417 return; 418 } 419 420 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 421 if (desc.getName().equals(listenerName)) { 422 desc.setEnabled(enabled); 423 synchronized (this) { 424 listenerDescriptors.recomputeEnabledListeners(); 425 } 426 return; 427 } 428 } 429 430 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 431 if (desc.getName().equals(listenerName)) { 432 desc.setEnabled(enabled); 433 synchronized (this) { 434 listenerDescriptors.recomputeEnabledListeners(); 435 } 436 return; 437 } 438 } 439 440 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 441 if (desc.getName().equals(listenerName)) { 442 desc.setEnabled(enabled); 443 synchronized (this) { 444 listenerDescriptors.recomputeEnabledListeners(); 445 } 446 return; 447 } 448 } 449 } 450 451 @Override 452 public int getActiveThreadsCount() { 453 return asyncExec.getActiveCount(); 454 } 455 456 @Override 457 public int getEventsInQueueCount() { 458 return asyncExec.getUnfinishedCount(); 459 } 460 461 @Override 462 public boolean isBlockAsyncHandlers() { 463 return blockAsyncProcessing; 464 } 465 466 @Override 467 public boolean isBlockSyncPostCommitHandlers() { 468 return blockSyncPostCommitProcessing; 469 } 470 471 @Override 472 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 473 blockAsyncProcessing = blockAsyncHandlers; 474 } 475 476 @Override 477 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 478 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 479 } 480 481 @Override 482 public boolean isBulkModeEnabled() { 483 return bulkModeEnabled; 484 } 485 486 @Override 487 public void setBulkModeEnabled(boolean bulkModeEnabled) { 488 this.bulkModeEnabled = bulkModeEnabled; 489 } 490 491 protected void recordEvent(Event event) { 492 CompositeEventBundle b = threadBundles.get(); 493 b.push(event); 494 if (TransactionHelper.isTransactionActive()) { 495 if (!b.registeredSynchronization) { 496 // register as synchronization 497 try { 498 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 499 } catch (NamingException | SystemException | RollbackException e) { 500 throw new RuntimeException("Cannot register Synchronization", e); 501 } 502 b.registeredSynchronization = true; 503 } 504 } else if (event.isCommitEvent()) { 505 handleTxCommited(); 506 } 507 } 508 509 @Override 510 public void beforeCompletion() { 511 } 512 513 @Override 514 public void afterCompletion(int status) { 515 if (status == Status.STATUS_COMMITTED) { 516 handleTxCommited(); 517 } else if (status == Status.STATUS_ROLLEDBACK) { 518 handleTxRollbacked(); 519 } else { 520 log.error("Unexpected afterCompletion status: " + status); 521 } 522 } 523 524 protected void handleTxRollbacked() { 525 threadBundles.remove(); 526 } 527 528 protected void handleTxCommited() { 529 CompositeEventBundle b = threadBundles.get(); 530 threadBundles.remove(); 531 532 // notify post commit event listeners 533 for (EventBundle bundle : b.byRepository.values()) { 534 try { 535 fireEventBundle(bundle); 536 } catch (NuxeoException e) { 537 log.error("Error while processing " + bundle, e); 538 } 539 } 540 } 541 542}