001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.rmi.dgc.VMID; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.stream.Collectors; 032 033import javax.naming.NamingException; 034import javax.transaction.RollbackException; 035import javax.transaction.Status; 036import javax.transaction.Synchronization; 037import javax.transaction.SystemException; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.common.logging.SequenceTracer; 042import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.api.RecoverableClientException; 045import org.nuxeo.ecm.core.event.Event; 046import org.nuxeo.ecm.core.event.EventBundle; 047import org.nuxeo.ecm.core.event.EventContext; 048import org.nuxeo.ecm.core.event.EventListener; 049import org.nuxeo.ecm.core.event.EventService; 050import org.nuxeo.ecm.core.event.EventServiceAdmin; 051import org.nuxeo.ecm.core.event.EventStats; 052import org.nuxeo.ecm.core.event.PostCommitEventListener; 053import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 054import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig; 055import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor; 056import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry; 057import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher; 058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor; 059import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry; 060import org.nuxeo.runtime.api.Framework; 061import org.nuxeo.runtime.transaction.TransactionHelper; 062 063/** 064 * Implementation of the event service. 065 */ 066public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 067 068 public static final VMID VMID = new VMID(); 069 070 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 071 072 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 073 @Override 074 protected CompositeEventBundle initialValue() { 075 return new CompositeEventBundle(); 076 } 077 }; 078 079 private static class CompositeEventBundle { 080 081 boolean registeredSynchronization; 082 083 final Map<String, EventBundle> byRepository = new HashMap<>(); 084 085 void push(Event event) { 086 String repositoryName = event.getContext().getRepositoryName(); 087 if (!byRepository.containsKey(repositoryName)) { 088 byRepository.put(repositoryName, new EventBundleImpl()); 089 } 090 byRepository.get(repositoryName).push(event); 091 } 092 093 } 094 095 protected final EventListenerList listenerDescriptors; 096 097 protected PostCommitEventExecutor postCommitExec; 098 099 protected volatile AsyncEventExecutor asyncExec; 100 101 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>(); 102 103 protected boolean blockAsyncProcessing = false; 104 105 protected boolean blockSyncPostCommitProcessing = false; 106 107 protected boolean bulkModeEnabled = false; 108 109 protected EventPipeRegistry registeredPipes = new EventPipeRegistry(); 110 111 protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry(); 112 113 protected EventBundleDispatcher pipeDispatcher; 114 115 public EventServiceImpl() { 116 listenerDescriptors = new EventListenerList(); 117 postCommitExec = new PostCommitEventExecutor(); 118 asyncExec = new AsyncEventExecutor(); 119 } 120 121 public void init() { 122 asyncExec.init(); 123 124 EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor(); 125 if (dispatcherDescriptor != null) { 126 List<EventPipeDescriptor> pipes = registeredPipes.getPipes(); 127 if (!pipes.isEmpty()) { 128 pipeDispatcher = dispatcherDescriptor.getInstance(); 129 pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters()); 130 } 131 } 132 } 133 134 public EventBundleDispatcher getEventBundleDispatcher() { 135 return pipeDispatcher; 136 } 137 138 public void shutdown(long timeoutMillis) throws InterruptedException { 139 postCommitExec.shutdown(timeoutMillis); 140 Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect( 141 Collectors.toSet()); 142 if (!notTerminated.isEmpty()) { 143 throw new RuntimeException("Asynch services are still running : " + notTerminated); 144 } 145 146 if (!asyncExec.shutdown(timeoutMillis)) { 147 throw new RuntimeException("Async executor is still running, timeout expired"); 148 } 149 if (pipeDispatcher != null) { 150 pipeDispatcher.shutdown(); 151 } 152 } 153 154 public void registerForAsyncWait(AsyncWaitHook callback) { 155 asyncWaitHooks.add(callback); 156 } 157 158 public void unregisterForAsyncWait(AsyncWaitHook callback) { 159 asyncWaitHooks.remove(callback); 160 } 161 162 @Override 163 public void waitForAsyncCompletion() { 164 waitForAsyncCompletion(Long.MAX_VALUE); 165 } 166 167 @Override 168 public void waitForAsyncCompletion(long timeout) { 169 Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream() 170 .filter(hook -> !hook.waitForAsyncCompletion()) 171 .collect(Collectors.toSet()); 172 if (!notCompleted.isEmpty()) { 173 throw new RuntimeException("Async tasks are still running : " + notCompleted); 174 } 175 try { 176 if (!asyncExec.waitForCompletion(timeout)) { 177 throw new RuntimeException("Async event listeners thread pool is not terminated"); 178 } 179 } catch (InterruptedException e) { 180 Thread.currentThread().interrupt(); 181 // TODO change signature 182 throw new RuntimeException(e); 183 } 184 if (pipeDispatcher != null) { 185 try { 186 pipeDispatcher.waitForCompletion(timeout); 187 } catch (InterruptedException e) { 188 Thread.currentThread().interrupt(); 189 throw new RuntimeException(e); 190 } 191 } 192 } 193 194 @Override 195 public void addEventListener(EventListenerDescriptor listener) { 196 listenerDescriptors.add(listener); 197 log.debug("Registered event listener: " + listener.getName()); 198 } 199 200 public void addEventPipe(EventPipeDescriptor pipeDescriptor) { 201 registeredPipes.addContribution(pipeDescriptor); 202 log.debug("Registered event pipe: " + pipeDescriptor.getName()); 203 } 204 205 public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 206 dispatchers.addContrib(dispatcherDescriptor); 207 log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName()); 208 } 209 210 @Override 211 public void removeEventListener(EventListenerDescriptor listener) { 212 listenerDescriptors.removeDescriptor(listener); 213 log.debug("Unregistered event listener: " + listener.getName()); 214 } 215 216 public void removeEventPipe(EventPipeDescriptor pipeDescriptor) { 217 registeredPipes.removeContribution(pipeDescriptor); 218 log.debug("Unregistered event pipe: " + pipeDescriptor.getName()); 219 } 220 221 public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 222 dispatchers.removeContrib(dispatcherDescriptor); 223 log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName()); 224 } 225 226 @Override 227 public void fireEvent(String name, EventContext context) { 228 fireEvent(new EventImpl(name, context)); 229 } 230 231 @Override 232 public void fireEvent(Event event) { 233 234 String ename = event.getName(); 235 EventStats stats = Framework.getService(EventStats.class); 236 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 237 if (!desc.acceptEvent(ename)) { 238 continue; 239 } 240 try { 241 long t0 = System.currentTimeMillis(); 242 SequenceTracer.start("Fire sync event " + event.getName()); 243 desc.asEventListener().handleEvent(event); 244 long elapsed = System.currentTimeMillis() - t0; 245 SequenceTracer.stop("done in " + elapsed + " ms"); 246 if (stats != null) { 247 stats.logSyncExec(desc, elapsed); 248 } 249 if (event.isCanceled()) { 250 // break loop 251 return; 252 } 253 } catch (ConcurrentUpdateException e) { 254 // never swallow ConcurrentUpdateException 255 throw e; 256 } catch (RuntimeException e) { 257 // get message 258 SequenceTracer.destroy("failure"); 259 String message = "Exception during " + desc.getName() + " sync listener execution, "; 260 if (event.isBubbleException()) { 261 message += "other listeners will be ignored"; 262 } else if (event.isMarkedForRollBack()) { 263 message += "transaction will be rolled back"; 264 if (event.getRollbackMessage() != null) { 265 message += " (" + event.getRollbackMessage() + ")"; 266 } 267 } else { 268 message += "continuing to run other listeners"; 269 } 270 // log 271 if (e instanceof RecoverableClientException) { 272 log.info(message + "\n" + e.getMessage()); 273 log.debug(message, e); 274 } else { 275 log.error(message, e); 276 } 277 // rethrow or swallow 278 if (TransactionHelper.isTransactionMarkedRollback()) { 279 throw e; 280 } else if (event.isBubbleException()) { 281 throw e; 282 } else if (event.isMarkedForRollBack()) { 283 Exception ee; 284 if (event.getRollbackException() != null) { 285 ee = event.getRollbackException(); 286 } else { 287 ee = e; 288 } 289 // when marked for rollback, throw a generic 290 // RuntimeException to make sure nobody catches it 291 throw new RuntimeException(message, ee); 292 } else { 293 // swallow exception 294 } 295 } 296 } 297 298 if (!event.isInline()) { // record the event 299 // don't record the complete event, only a shallow copy 300 ShallowEvent shallowEvent = ShallowEvent.create(event); 301 if (event.isImmediate()) { 302 EventBundleImpl b = new EventBundleImpl(); 303 b.push(shallowEvent); 304 fireEventBundle(b); 305 } else { 306 recordEvent(shallowEvent); 307 } 308 } 309 } 310 311 @Override 312 public void fireEventBundle(EventBundle event) { 313 boolean comesFromJMS = false; 314 315 if (event instanceof ReconnectedEventBundle) { 316 if (((ReconnectedEventBundle) event).comesFromJMS()) { 317 comesFromJMS = true; 318 } 319 } 320 321 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 322 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 323 324 if (bulkModeEnabled) { 325 // run all listeners synchronously in one transaction 326 List<EventListenerDescriptor> listeners = new ArrayList<>(); 327 if (!blockSyncPostCommitProcessing) { 328 listeners = postCommitSync; 329 } 330 if (!blockAsyncProcessing) { 331 listeners.addAll(postCommitAsync); 332 } 333 if (!listeners.isEmpty()) { 334 postCommitExec.runBulk(listeners, event); 335 } 336 return; 337 } 338 339 // run sync listeners 340 if (blockSyncPostCommitProcessing) { 341 log.debug("Dropping PostCommit handler execution"); 342 } else if (comesFromJMS) { 343 // when called from JMS we must skip sync listeners 344 // - postComit listeners should be on the core 345 // - there is no transaction started by JMS listener 346 log.debug("Deactivating sync post-commit listener since we are called from JMS"); 347 } else { 348 if (!postCommitSync.isEmpty()) { 349 postCommitExec.run(postCommitSync, event); 350 } 351 } 352 353 if (blockAsyncProcessing) { 354 log.debug("Dopping bundle"); 355 return; 356 } 357 358 // fire async listeners 359 if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) { 360 log.debug("Skipping async exec, this will be triggered via JMS"); 361 } else { 362 if (pipeDispatcher == null) { 363 asyncExec.run(postCommitAsync, event); 364 } else { 365 // rather than sending to the WorkManager: send to the Pipe 366 pipeDispatcher.sendEventBundle(event); 367 } 368 } 369 } 370 371 @Override 372 public void fireEventBundleSync(EventBundle event) { 373 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 374 desc.asPostCommitListener().handleEvent(event); 375 } 376 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 377 desc.asPostCommitListener().handleEvent(event); 378 } 379 } 380 381 @Override 382 public List<EventListener> getEventListeners() { 383 return listenerDescriptors.getInLineListeners(); 384 } 385 386 @Override 387 public List<PostCommitEventListener> getPostCommitEventListeners() { 388 List<PostCommitEventListener> result = new ArrayList<>(); 389 390 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 391 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 392 393 return result; 394 } 395 396 public EventListenerList getEventListenerList() { 397 return listenerDescriptors; 398 } 399 400 @Override 401 public EventListenerDescriptor getEventListener(String name) { 402 return listenerDescriptors.getDescriptor(name); 403 } 404 405 // methods for monitoring 406 407 @Override 408 public EventListenerList getListenerList() { 409 return listenerDescriptors; 410 } 411 412 @Override 413 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 414 if (!listenerDescriptors.hasListener(listenerName)) { 415 return; 416 } 417 418 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 419 if (desc.getName().equals(listenerName)) { 420 desc.setEnabled(enabled); 421 synchronized (this) { 422 listenerDescriptors.recomputeEnabledListeners(); 423 } 424 return; 425 } 426 } 427 428 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 429 if (desc.getName().equals(listenerName)) { 430 desc.setEnabled(enabled); 431 synchronized (this) { 432 listenerDescriptors.recomputeEnabledListeners(); 433 } 434 return; 435 } 436 } 437 438 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 439 if (desc.getName().equals(listenerName)) { 440 desc.setEnabled(enabled); 441 synchronized (this) { 442 listenerDescriptors.recomputeEnabledListeners(); 443 } 444 return; 445 } 446 } 447 } 448 449 @Override 450 public int getActiveThreadsCount() { 451 return asyncExec.getActiveCount(); 452 } 453 454 @Override 455 public int getEventsInQueueCount() { 456 return asyncExec.getUnfinishedCount(); 457 } 458 459 @Override 460 public boolean isBlockAsyncHandlers() { 461 return blockAsyncProcessing; 462 } 463 464 @Override 465 public boolean isBlockSyncPostCommitHandlers() { 466 return blockSyncPostCommitProcessing; 467 } 468 469 @Override 470 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 471 blockAsyncProcessing = blockAsyncHandlers; 472 } 473 474 @Override 475 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 476 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 477 } 478 479 @Override 480 public boolean isBulkModeEnabled() { 481 return bulkModeEnabled; 482 } 483 484 @Override 485 public void setBulkModeEnabled(boolean bulkModeEnabled) { 486 this.bulkModeEnabled = bulkModeEnabled; 487 } 488 489 protected void recordEvent(Event event) { 490 CompositeEventBundle b = threadBundles.get(); 491 b.push(event); 492 if (TransactionHelper.isTransactionActive()) { 493 if (!b.registeredSynchronization) { 494 // register as synchronization 495 try { 496 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 497 } catch (NamingException | SystemException | RollbackException e) { 498 throw new RuntimeException("Cannot register Synchronization", e); 499 } 500 b.registeredSynchronization = true; 501 } 502 } else if (event.isCommitEvent()) { 503 handleTxCommited(); 504 } 505 } 506 507 @Override 508 public void beforeCompletion() { 509 } 510 511 @Override 512 public void afterCompletion(int status) { 513 if (status == Status.STATUS_COMMITTED) { 514 handleTxCommited(); 515 } else if (status == Status.STATUS_ROLLEDBACK) { 516 handleTxRollbacked(); 517 } else { 518 log.error("Unexpected afterCompletion status: " + status); 519 } 520 } 521 522 protected void handleTxRollbacked() { 523 threadBundles.remove(); 524 } 525 526 protected void handleTxCommited() { 527 CompositeEventBundle b = threadBundles.get(); 528 threadBundles.remove(); 529 530 // notify post commit event listeners 531 for (EventBundle bundle : b.byRepository.values()) { 532 try { 533 fireEventBundle(bundle); 534 } catch (NuxeoException e) { 535 log.error("Error while processing " + bundle, e); 536 } 537 } 538 } 539 540}