001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.rmi.dgc.VMID; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.stream.Collectors; 032 033import javax.naming.NamingException; 034import javax.transaction.RollbackException; 035import javax.transaction.Status; 036import javax.transaction.Synchronization; 037import javax.transaction.SystemException; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.common.utils.Path; 042import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.api.RecoverableClientException; 045import org.nuxeo.ecm.core.event.Event; 046import org.nuxeo.ecm.core.event.EventBundle; 047import org.nuxeo.ecm.core.event.EventContext; 048import org.nuxeo.ecm.core.event.EventListener; 049import org.nuxeo.ecm.core.event.EventService; 050import org.nuxeo.ecm.core.event.EventServiceAdmin; 051import org.nuxeo.ecm.core.event.EventServiceComponent; 052import org.nuxeo.ecm.core.event.EventStats; 053import org.nuxeo.ecm.core.event.PostCommitEventListener; 054import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor; 055import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry; 056import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher; 057import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor; 058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry; 059import org.nuxeo.ecm.core.event.stream.DomainEventProducer; 060import org.nuxeo.ecm.core.event.stream.DomainEventProducerDescriptor; 061import org.nuxeo.lib.stream.computation.Record; 062import org.nuxeo.lib.stream.computation.Settings; 063import org.nuxeo.runtime.api.Framework; 064import org.nuxeo.runtime.codec.CodecService; 065import org.nuxeo.runtime.model.DescriptorRegistry; 066import org.nuxeo.runtime.stream.StreamService; 067import org.nuxeo.runtime.transaction.TransactionHelper; 068 069import io.opencensus.trace.AttributeValue; 070import io.opencensus.trace.Span; 071import io.opencensus.trace.Tracer; 072import io.opencensus.trace.Tracing; 073 074/** 075 * Implementation of the event service. 076 */ 077public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 078 079 public static final VMID VMID = new VMID(); 080 081 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 082 083 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 084 @Override 085 protected CompositeEventBundle initialValue() { 086 return new CompositeEventBundle(); 087 } 088 }; 089 090 private static class CompositeEventBundle { 091 092 boolean registeredSynchronization; 093 094 final Map<String, EventBundle> byRepository = new HashMap<>(); 095 096 void push(Event event) { 097 String repositoryName = event.getContext().getRepositoryName(); 098 if (!byRepository.containsKey(repositoryName)) { 099 byRepository.put(repositoryName, new EventBundleImpl()); 100 } 101 byRepository.get(repositoryName).push(event); 102 } 103 104 } 105 106 protected final EventListenerList listenerDescriptors; 107 108 protected PostCommitEventExecutor postCommitExec; 109 110 protected volatile AsyncEventExecutor asyncExec; 111 112 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>(); 113 114 protected boolean blockAsyncProcessing = false; 115 116 protected boolean blockSyncPostCommitProcessing = false; 117 118 protected boolean bulkModeEnabled = false; 119 120 protected EventPipeRegistry registeredPipes = new EventPipeRegistry(); 121 122 protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry(); 123 124 protected EventBundleDispatcher pipeDispatcher; 125 126 // @since 11.4 127 protected DescriptorRegistry domainEventProducers = new DescriptorRegistry(); 128 129 // @since 11.4 130 protected static final String REGISTRY_TARGET_NAME = "EventService"; 131 132 public EventServiceImpl() { 133 listenerDescriptors = new EventListenerList(); 134 postCommitExec = new PostCommitEventExecutor(); 135 asyncExec = new AsyncEventExecutor(); 136 } 137 138 public void init() { 139 asyncExec.init(); 140 141 EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor(); 142 if (dispatcherDescriptor != null) { 143 List<EventPipeDescriptor> pipes = registeredPipes.getPipes(); 144 if (!pipes.isEmpty()) { 145 pipeDispatcher = dispatcherDescriptor.getInstance(); 146 pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters()); 147 } 148 } 149 initDomainEventStreams(); 150 } 151 152 public EventBundleDispatcher getEventBundleDispatcher() { 153 return pipeDispatcher; 154 } 155 156 public void addDomainEventProducer(DomainEventProducerDescriptor descriptor) { 157 if (descriptor.isEnabled()) { 158 domainEventProducers.register(REGISTRY_TARGET_NAME, EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP, 159 descriptor); 160 log.debug("Registered domain event producer: " + descriptor.getName()); 161 } else { 162 domainEventProducers.unregister(REGISTRY_TARGET_NAME, EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP, 163 descriptor); 164 log.debug("Unregistered domain event producer (disabled): " + descriptor.getName()); 165 } 166 } 167 168 public void removeDomainEventProducer(DomainEventProducerDescriptor descriptor) { 169 domainEventProducers.unregister(REGISTRY_TARGET_NAME, EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP, 170 descriptor); 171 log.debug("Unregistered domain event producer: " + descriptor.getName()); 172 } 173 174 public void shutdown(long timeoutMillis) throws InterruptedException { 175 postCommitExec.shutdown(timeoutMillis); 176 Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect( 177 Collectors.toSet()); 178 if (!notTerminated.isEmpty()) { 179 throw new RuntimeException("Asynch services are still running : " + notTerminated); 180 } 181 182 if (!asyncExec.shutdown(timeoutMillis)) { 183 throw new RuntimeException("Async executor is still running, timeout expired"); 184 } 185 if (pipeDispatcher != null) { 186 pipeDispatcher.shutdown(); 187 } 188 } 189 190 public void registerForAsyncWait(AsyncWaitHook callback) { 191 asyncWaitHooks.add(callback); 192 } 193 194 public void unregisterForAsyncWait(AsyncWaitHook callback) { 195 asyncWaitHooks.remove(callback); 196 } 197 198 @Override 199 public void waitForAsyncCompletion() { 200 waitForAsyncCompletion(Long.MAX_VALUE); 201 } 202 203 @Override 204 public void waitForAsyncCompletion(long timeout) { 205 Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream() 206 .filter(hook -> !hook.waitForAsyncCompletion()) 207 .collect(Collectors.toSet()); 208 if (!notCompleted.isEmpty()) { 209 throw new RuntimeException("Async tasks are still running : " + notCompleted); 210 } 211 try { 212 if (!asyncExec.waitForCompletion(timeout)) { 213 throw new RuntimeException("Async event listeners thread pool is not terminated"); 214 } 215 } catch (InterruptedException e) { 216 Thread.currentThread().interrupt(); 217 // TODO change signature 218 throw new RuntimeException(e); 219 } 220 if (pipeDispatcher != null) { 221 try { 222 pipeDispatcher.waitForCompletion(timeout); 223 } catch (InterruptedException e) { 224 Thread.currentThread().interrupt(); 225 throw new RuntimeException(e); 226 } 227 } 228 } 229 230 @Override 231 public void addEventListener(EventListenerDescriptor listener) { 232 listenerDescriptors.add(listener); 233 log.debug("Registered event listener: " + listener.getName()); 234 } 235 236 public void addEventPipe(EventPipeDescriptor pipeDescriptor) { 237 registeredPipes.addContribution(pipeDescriptor); 238 log.debug("Registered event pipe: " + pipeDescriptor.getName()); 239 } 240 241 public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 242 dispatchers.addContrib(dispatcherDescriptor); 243 log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName()); 244 } 245 246 @Override 247 public void removeEventListener(EventListenerDescriptor listener) { 248 listenerDescriptors.removeDescriptor(listener); 249 log.debug("Unregistered event listener: " + listener.getName()); 250 } 251 252 public void removeEventPipe(EventPipeDescriptor pipeDescriptor) { 253 registeredPipes.removeContribution(pipeDescriptor); 254 log.debug("Unregistered event pipe: " + pipeDescriptor.getName()); 255 } 256 257 public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 258 dispatchers.removeContrib(dispatcherDescriptor); 259 log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName()); 260 } 261 262 @Override 263 public void fireEvent(String name, EventContext context) { 264 fireEvent(new EventImpl(name, context)); 265 } 266 267 @Override 268 public void fireEvent(Event event) { 269 270 String ename = event.getName(); 271 EventStats stats = Framework.getService(EventStats.class); 272 Tracer tracer = Tracing.getTracer(); 273 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 274 if (!desc.acceptEvent(ename)) { 275 continue; 276 } 277 try { 278 long t0 = System.currentTimeMillis(); 279 desc.asEventListener().handleEvent(event); 280 long elapsed = System.currentTimeMillis() - t0; 281 traceAddAnnotation(event, tracer, elapsed, desc.getName()); 282 if (stats != null) { 283 stats.logSyncExec(desc, elapsed); 284 } 285 if (event.isCanceled()) { 286 // break loop 287 return; 288 } 289 } catch (ConcurrentUpdateException e) { 290 // never swallow ConcurrentUpdateException 291 throw e; 292 } catch (RuntimeException e) { 293 // get message 294 String message = "Exception during " + desc.getName() + " sync listener execution, "; 295 if (event.isBubbleException()) { 296 message += "other listeners will be ignored"; 297 } else if (event.isMarkedForRollBack()) { 298 message += "transaction will be rolled back"; 299 if (event.getRollbackMessage() != null) { 300 message += " (" + event.getRollbackMessage() + ")"; 301 } 302 } else { 303 message += "continuing to run other listeners"; 304 } 305 // log 306 tracer.getCurrentSpan().addAnnotation("EventService#fireEvent " + event.getName() + ": " + message); 307 if (e instanceof RecoverableClientException) { 308 log.info(message + "\n" + e.getMessage()); 309 log.debug(message, e); 310 } else { 311 log.error(message, e); 312 } 313 // rethrow or swallow 314 if (TransactionHelper.isTransactionMarkedRollback()) { 315 throw e; 316 } else if (event.isBubbleException()) { 317 throw e; 318 } else if (event.isMarkedForRollBack()) { 319 // make sure the transaction is marked rollback-only, 320 // even if some code later incorrectly swallows the rethrown exception 321 TransactionHelper.setTransactionRollbackOnly(); 322 323 Exception ee; 324 if (event.getRollbackException() != null) { 325 ee = event.getRollbackException(); 326 } else { 327 ee = e; 328 } 329 330 if (ee instanceof NuxeoException) { 331 throw (NuxeoException) ee; 332 } else { 333 throw new NuxeoException(message, ee); 334 } 335 } else { 336 // swallow exception 337 } 338 } 339 } 340 341 if (!event.isInline()) { // record the event 342 // don't record the complete event, only a shallow copy 343 ShallowEvent shallowEvent = ShallowEvent.create(event); 344 if (event.isImmediate()) { 345 EventBundleImpl b = new EventBundleImpl(); 346 b.push(shallowEvent); 347 tracer.getCurrentSpan().addAnnotation("EventService#fireEvent firing immediate: " + event.getName()); 348 fireEventBundle(b); 349 } else { 350 recordEvent(shallowEvent); 351 } 352 } 353 } 354 355 protected void traceAddAnnotation(Event event, Tracer tracer, long elapsed, String listener) { 356 Map<String, AttributeValue> attributes = new HashMap<>(); 357 attributes.put("event", AttributeValue.stringAttributeValue(event.getName())); 358 attributes.put("listener", AttributeValue.stringAttributeValue(listener)); 359 attributes.put("duration_ms", AttributeValue.longAttributeValue(elapsed)); 360 EventContext eventContext = event.getContext(); 361 if (eventContext instanceof DocumentEventContext) { 362 DocumentEventContext docContext = (DocumentEventContext) eventContext; 363 if (docContext.getSourceDocument() != null) { 364 Path docPath = docContext.getSourceDocument().getPath(); 365 if (docPath != null) { 366 attributes.put("doc", AttributeValue.stringAttributeValue(docPath.toString())); 367 } 368 String id = docContext.getSourceDocument().getId(); 369 if (id != null) { 370 attributes.put("doc_id", AttributeValue.stringAttributeValue(id)); 371 } 372 } 373 } 374 tracer.getCurrentSpan().addAnnotation("EventService#fireEvent Event fired", attributes); 375 } 376 377 @Override 378 public void fireEventBundle(EventBundle event) { 379 Span span = Tracing.getTracer().getCurrentSpan(); 380 span.addAnnotation("EventService#fireEventBundle"); 381 try { 382 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 383 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 384 385 if (bulkModeEnabled) { 386 // run all listeners synchronously in one transaction 387 List<EventListenerDescriptor> listeners = new ArrayList<>(); 388 if (!blockSyncPostCommitProcessing) { 389 listeners = postCommitSync; 390 } 391 if (!blockAsyncProcessing) { 392 listeners.addAll(postCommitAsync); 393 } 394 if (!listeners.isEmpty()) { 395 postCommitExec.runBulk(listeners, event); 396 } 397 return; 398 } 399 400 // run sync listeners 401 if (blockSyncPostCommitProcessing) { 402 log.debug("Dropping PostCommit handler execution"); 403 } else { 404 if (!postCommitSync.isEmpty()) { 405 postCommitExec.run(postCommitSync, event); 406 } 407 } 408 409 if (blockAsyncProcessing) { 410 log.debug("Dopping bundle"); 411 return; 412 } 413 414 // fire async listeners 415 if (pipeDispatcher == null) { 416 asyncExec.run(postCommitAsync, event); 417 } else { 418 // rather than sending to the WorkManager: send to the Pipe 419 pipeDispatcher.sendEventBundle(event); 420 } 421 } finally { 422 span.addAnnotation("EventService#fireEventBundle.done"); 423 } 424 } 425 426 @Override 427 public void fireEventBundleSync(EventBundle event) { 428 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 429 desc.asPostCommitListener().handleEvent(event); 430 } 431 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 432 desc.asPostCommitListener().handleEvent(event); 433 } 434 } 435 436 @Override 437 public List<EventListener> getEventListeners() { 438 return listenerDescriptors.getInLineListeners(); 439 } 440 441 @Override 442 public List<PostCommitEventListener> getPostCommitEventListeners() { 443 List<PostCommitEventListener> result = new ArrayList<>(); 444 445 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 446 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 447 448 return result; 449 } 450 451 public EventListenerList getEventListenerList() { 452 return listenerDescriptors; 453 } 454 455 @Override 456 public EventListenerDescriptor getEventListener(String name) { 457 return listenerDescriptors.getDescriptor(name); 458 } 459 460 // methods for monitoring 461 462 @Override 463 public EventListenerList getListenerList() { 464 return listenerDescriptors; 465 } 466 467 @Override 468 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 469 if (!listenerDescriptors.hasListener(listenerName)) { 470 return; 471 } 472 473 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 474 if (desc.getName().equals(listenerName)) { 475 desc.setEnabled(enabled); 476 synchronized (this) { 477 listenerDescriptors.recomputeEnabledListeners(); 478 } 479 return; 480 } 481 } 482 483 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 484 if (desc.getName().equals(listenerName)) { 485 desc.setEnabled(enabled); 486 synchronized (this) { 487 listenerDescriptors.recomputeEnabledListeners(); 488 } 489 return; 490 } 491 } 492 493 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 494 if (desc.getName().equals(listenerName)) { 495 desc.setEnabled(enabled); 496 synchronized (this) { 497 listenerDescriptors.recomputeEnabledListeners(); 498 } 499 return; 500 } 501 } 502 } 503 504 @Override 505 public int getActiveThreadsCount() { 506 return asyncExec.getActiveCount(); 507 } 508 509 @Override 510 public int getEventsInQueueCount() { 511 return asyncExec.getUnfinishedCount(); 512 } 513 514 @Override 515 public boolean isBlockAsyncHandlers() { 516 return blockAsyncProcessing; 517 } 518 519 @Override 520 public boolean isBlockSyncPostCommitHandlers() { 521 return blockSyncPostCommitProcessing; 522 } 523 524 @Override 525 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 526 blockAsyncProcessing = blockAsyncHandlers; 527 } 528 529 @Override 530 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 531 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 532 } 533 534 @Override 535 public boolean isBulkModeEnabled() { 536 return bulkModeEnabled; 537 } 538 539 @Override 540 public void setBulkModeEnabled(boolean bulkModeEnabled) { 541 this.bulkModeEnabled = bulkModeEnabled; 542 } 543 544 protected void recordEvent(Event event) { 545 CompositeEventBundle b = threadBundles.get(); 546 b.push(event); 547 if (TransactionHelper.isTransactionActive()) { 548 if (!b.registeredSynchronization) { 549 // register as synchronization 550 try { 551 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 552 } catch (NamingException | SystemException | RollbackException e) { 553 throw new RuntimeException("Cannot register Synchronization", e); 554 } 555 b.registeredSynchronization = true; 556 } 557 } else if (event.isCommitEvent()) { 558 handleTxCommited(); 559 } 560 } 561 562 @Override 563 public void beforeCompletion() { 564 Span span = Tracing.getTracer().getCurrentSpan(); 565 span.addAnnotation("EventService#beforeCompletion"); 566 } 567 568 @Override 569 public void afterCompletion(int status) { 570 Span span = Tracing.getTracer().getCurrentSpan(); 571 if (status == Status.STATUS_COMMITTED) { 572 span.addAnnotation("EventService#afterCompletion committed"); 573 handleTxCommited(); 574 } else if (status == Status.STATUS_ROLLEDBACK) { 575 span.addAnnotation("EventService#afterCompletion ROLLBACK"); 576 handleTxRollbacked(); 577 } else { 578 log.error("Unexpected afterCompletion status: " + status); 579 } 580 span.addAnnotation("EventService#afterCompletion.done"); 581 } 582 583 protected void handleTxRollbacked() { 584 threadBundles.remove(); 585 } 586 587 protected void handleTxCommited() { 588 CompositeEventBundle b = threadBundles.get(); 589 threadBundles.remove(); 590 591 // notify post commit event listeners 592 for (EventBundle bundle : b.byRepository.values()) { 593 try { 594 fireEventBundle(bundle); 595 } catch (NuxeoException e) { 596 log.error("Error while processing " + bundle, e); 597 } 598 } 599 } 600 601 @Override 602 public List<DomainEventProducer> createDomainEventProducers() { 603 // TODO: optimize this by keeping an immutable list 604 List<DomainEventProducerDescriptor> descriptors = domainEventProducers.getDescriptors(REGISTRY_TARGET_NAME, 605 EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP); 606 return descriptors.stream().map(DomainEventProducerDescriptor::newInstance).collect(Collectors.toList()); 607 } 608 609 protected void initDomainEventStreams() { 610 List<DomainEventProducerDescriptor> descriptors = domainEventProducers.getDescriptors(REGISTRY_TARGET_NAME, 611 EventServiceComponent.DOMAIN_EVENT_PRODUCER_XP); 612 Settings settings = new Settings(1, 1); 613 List<String> streams = new ArrayList<>(); 614 CodecService codecService = Framework.getService(CodecService.class); 615 descriptors.forEach(descriptor -> { 616 String streamName = descriptor.getStream().name; 617 streams.add(streamName); 618 settings.setPartitions(streamName, descriptor.getStream().partitions); 619 String codec = descriptor.getStream().codec; 620 if (codec != null) { 621 settings.setCodec(streamName, codecService.getCodec(codec, Record.class)); 622 } 623 descriptor.getStream().filters.forEach(filter -> settings.addFilter(streamName, filter.getFilter())); 624 }); 625 StreamService streamService = Framework.getService(StreamService.class); 626 if (streamService == null) { 627 log.warn("StreamService is not available, you might want to use RuntimeStreamFeature.class test Feature"); 628 } else { 629 streamService.getStreamManager().register(streams, settings); 630 } 631 } 632 633}