001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.rmi.dgc.VMID; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.stream.Collectors; 032 033import javax.naming.NamingException; 034import javax.transaction.RollbackException; 035import javax.transaction.Status; 036import javax.transaction.Synchronization; 037import javax.transaction.SystemException; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.common.logging.SequenceTracer; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.api.RecoverableClientException; 044import org.nuxeo.ecm.core.event.Event; 045import org.nuxeo.ecm.core.event.EventBundle; 046import org.nuxeo.ecm.core.event.EventContext; 047import org.nuxeo.ecm.core.event.EventListener; 048import org.nuxeo.ecm.core.event.EventService; 049import org.nuxeo.ecm.core.event.EventServiceAdmin; 050import org.nuxeo.ecm.core.event.EventStats; 051import org.nuxeo.ecm.core.event.PostCommitEventListener; 052import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 053import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig; 054import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor; 055import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry; 056import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher; 057import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor; 058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.transaction.TransactionHelper; 061 062/** 063 * Implementation of the event service. 064 */ 065public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 066 067 public static final VMID VMID = new VMID(); 068 069 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 070 071 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 072 @Override 073 protected CompositeEventBundle initialValue() { 074 return new CompositeEventBundle(); 075 } 076 }; 077 078 private static class CompositeEventBundle { 079 080 boolean registeredSynchronization; 081 082 final Map<String, EventBundle> byRepository = new HashMap<>(); 083 084 void push(Event event) { 085 String repositoryName = event.getContext().getRepositoryName(); 086 if (!byRepository.containsKey(repositoryName)) { 087 byRepository.put(repositoryName, new EventBundleImpl()); 088 } 089 byRepository.get(repositoryName).push(event); 090 } 091 092 } 093 094 protected final EventListenerList listenerDescriptors; 095 096 protected PostCommitEventExecutor postCommitExec; 097 098 protected volatile AsyncEventExecutor asyncExec; 099 100 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>(); 101 102 protected boolean blockAsyncProcessing = false; 103 104 protected boolean blockSyncPostCommitProcessing = false; 105 106 protected boolean bulkModeEnabled = false; 107 108 protected EventPipeRegistry registeredPipes = new EventPipeRegistry(); 109 110 protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry(); 111 112 protected EventBundleDispatcher pipeDispatcher; 113 114 public EventServiceImpl() { 115 listenerDescriptors = new EventListenerList(); 116 postCommitExec = new PostCommitEventExecutor(); 117 asyncExec = new AsyncEventExecutor(); 118 } 119 120 public void init() { 121 asyncExec.init(); 122 123 EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor(); 124 if (dispatcherDescriptor != null) { 125 List<EventPipeDescriptor> pipes = registeredPipes.getPipes(); 126 if (!pipes.isEmpty()) { 127 pipeDispatcher = dispatcherDescriptor.getInstance(); 128 pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters()); 129 } 130 } 131 } 132 133 public EventBundleDispatcher getEventBundleDispatcher() { 134 return pipeDispatcher; 135 } 136 137 public void shutdown(long timeoutMillis) throws InterruptedException { 138 postCommitExec.shutdown(timeoutMillis); 139 Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect( 140 Collectors.toSet()); 141 if (!notTerminated.isEmpty()) { 142 throw new RuntimeException("Asynch services are still running : " + notTerminated); 143 } 144 145 if (!asyncExec.shutdown(timeoutMillis)) { 146 throw new RuntimeException("Async executor is still running, timeout expired"); 147 } 148 if (pipeDispatcher != null) { 149 pipeDispatcher.shutdown(); 150 } 151 } 152 153 public void registerForAsyncWait(AsyncWaitHook callback) { 154 asyncWaitHooks.add(callback); 155 } 156 157 public void unregisterForAsyncWait(AsyncWaitHook callback) { 158 asyncWaitHooks.remove(callback); 159 } 160 161 @Override 162 public void waitForAsyncCompletion() { 163 waitForAsyncCompletion(Long.MAX_VALUE); 164 } 165 166 @Override 167 public void waitForAsyncCompletion(long timeout) { 168 Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream() 169 .filter(hook -> !hook.waitForAsyncCompletion()) 170 .collect(Collectors.toSet()); 171 if (!notCompleted.isEmpty()) { 172 throw new RuntimeException("Async tasks are still running : " + notCompleted); 173 } 174 try { 175 if (!asyncExec.waitForCompletion(timeout)) { 176 throw new RuntimeException("Async event listeners thread pool is not terminated"); 177 } 178 } catch (InterruptedException e) { 179 Thread.currentThread().interrupt(); 180 // TODO change signature 181 throw new RuntimeException(e); 182 } 183 if (pipeDispatcher != null) { 184 try { 185 pipeDispatcher.waitForCompletion(timeout); 186 } catch (InterruptedException e) { 187 Thread.currentThread().interrupt(); 188 throw new RuntimeException(e); 189 } 190 } 191 } 192 193 @Override 194 public void addEventListener(EventListenerDescriptor listener) { 195 listenerDescriptors.add(listener); 196 log.debug("Registered event listener: " + listener.getName()); 197 } 198 199 public void addEventPipe(EventPipeDescriptor pipeDescriptor) { 200 registeredPipes.addContribution(pipeDescriptor); 201 log.debug("Registered event pipe: " + pipeDescriptor.getName()); 202 } 203 204 public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 205 dispatchers.addContrib(dispatcherDescriptor); 206 log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName()); 207 } 208 209 @Override 210 public void removeEventListener(EventListenerDescriptor listener) { 211 listenerDescriptors.removeDescriptor(listener); 212 log.debug("Unregistered event listener: " + listener.getName()); 213 } 214 215 public void removeEventPipe(EventPipeDescriptor pipeDescriptor) { 216 registeredPipes.removeContribution(pipeDescriptor); 217 log.debug("Unregistered event pipe: " + pipeDescriptor.getName()); 218 } 219 220 public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 221 dispatchers.removeContrib(dispatcherDescriptor); 222 log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName()); 223 } 224 225 @Override 226 public void fireEvent(String name, EventContext context) { 227 fireEvent(new EventImpl(name, context)); 228 } 229 230 @Override 231 public void fireEvent(Event event) { 232 233 String ename = event.getName(); 234 EventStats stats = Framework.getService(EventStats.class); 235 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 236 if (!desc.acceptEvent(ename)) { 237 continue; 238 } 239 try { 240 long t0 = System.currentTimeMillis(); 241 SequenceTracer.start("Fire sync event " + event.getName()); 242 desc.asEventListener().handleEvent(event); 243 long elapsed = System.currentTimeMillis() - t0; 244 SequenceTracer.stop("done in " + elapsed + " ms"); 245 if (stats != null) { 246 stats.logSyncExec(desc, elapsed); 247 } 248 if (event.isCanceled()) { 249 // break loop 250 return; 251 } 252 } catch (RuntimeException e) { 253 // get message 254 SequenceTracer.destroy("failure"); 255 String message = "Exception during " + desc.getName() + " sync listener execution, "; 256 if (event.isBubbleException()) { 257 message += "other listeners will be ignored"; 258 } else if (event.isMarkedForRollBack()) { 259 message += "transaction will be rolled back"; 260 if (event.getRollbackMessage() != null) { 261 message += " (" + event.getRollbackMessage() + ")"; 262 } 263 } else { 264 message += "continuing to run other listeners"; 265 } 266 // log 267 if (e instanceof RecoverableClientException) { 268 log.info(message + "\n" + e.getMessage()); 269 log.debug(message, e); 270 } else { 271 log.error(message, e); 272 } 273 // rethrow or swallow 274 if (event.isBubbleException()) { 275 throw e; 276 } else if (event.isMarkedForRollBack()) { 277 Exception ee; 278 if (event.getRollbackException() != null) { 279 ee = event.getRollbackException(); 280 } else { 281 ee = e; 282 } 283 // when marked for rollback, throw a generic 284 // RuntimeException to make sure nobody catches it 285 throw new RuntimeException(message, ee); 286 } else { 287 // swallow exception 288 } 289 } 290 } 291 292 if (!event.isInline()) { // record the event 293 // don't record the complete event, only a shallow copy 294 ShallowEvent shallowEvent = ShallowEvent.create(event); 295 if (event.isImmediate()) { 296 EventBundleImpl b = new EventBundleImpl(); 297 b.push(shallowEvent); 298 fireEventBundle(b); 299 } else { 300 recordEvent(shallowEvent); 301 } 302 } 303 } 304 305 @Override 306 public void fireEventBundle(EventBundle event) { 307 boolean comesFromJMS = false; 308 309 if (event instanceof ReconnectedEventBundle) { 310 if (((ReconnectedEventBundle) event).comesFromJMS()) { 311 comesFromJMS = true; 312 } 313 } 314 315 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 316 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 317 318 if (bulkModeEnabled) { 319 // run all listeners synchronously in one transaction 320 List<EventListenerDescriptor> listeners = new ArrayList<>(); 321 if (!blockSyncPostCommitProcessing) { 322 listeners = postCommitSync; 323 } 324 if (!blockAsyncProcessing) { 325 listeners.addAll(postCommitAsync); 326 } 327 if (!listeners.isEmpty()) { 328 postCommitExec.runBulk(listeners, event); 329 } 330 return; 331 } 332 333 // run sync listeners 334 if (blockSyncPostCommitProcessing) { 335 log.debug("Dropping PostCommit handler execution"); 336 } else if (comesFromJMS) { 337 // when called from JMS we must skip sync listeners 338 // - postComit listeners should be on the core 339 // - there is no transaction started by JMS listener 340 log.debug("Deactivating sync post-commit listener since we are called from JMS"); 341 } else { 342 if (!postCommitSync.isEmpty()) { 343 postCommitExec.run(postCommitSync, event); 344 } 345 } 346 347 if (blockAsyncProcessing) { 348 log.debug("Dopping bundle"); 349 return; 350 } 351 352 // fire async listeners 353 if (AsyncProcessorConfig.forceJMSUsage() && !comesFromJMS) { 354 log.debug("Skipping async exec, this will be triggered via JMS"); 355 } else { 356 if (pipeDispatcher == null) { 357 asyncExec.run(postCommitAsync, event); 358 } else { 359 // rather than sending to the WorkManager: send to the Pipe 360 pipeDispatcher.sendEventBundle(event); 361 } 362 } 363 } 364 365 @Override 366 public void fireEventBundleSync(EventBundle event) { 367 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 368 desc.asPostCommitListener().handleEvent(event); 369 } 370 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 371 desc.asPostCommitListener().handleEvent(event); 372 } 373 } 374 375 @Override 376 public List<EventListener> getEventListeners() { 377 return listenerDescriptors.getInLineListeners(); 378 } 379 380 @Override 381 public List<PostCommitEventListener> getPostCommitEventListeners() { 382 List<PostCommitEventListener> result = new ArrayList<>(); 383 384 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 385 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 386 387 return result; 388 } 389 390 public EventListenerList getEventListenerList() { 391 return listenerDescriptors; 392 } 393 394 @Override 395 public EventListenerDescriptor getEventListener(String name) { 396 return listenerDescriptors.getDescriptor(name); 397 } 398 399 // methods for monitoring 400 401 @Override 402 public EventListenerList getListenerList() { 403 return listenerDescriptors; 404 } 405 406 @Override 407 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 408 if (!listenerDescriptors.hasListener(listenerName)) { 409 return; 410 } 411 412 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 413 if (desc.getName().equals(listenerName)) { 414 desc.setEnabled(enabled); 415 synchronized (this) { 416 listenerDescriptors.recomputeEnabledListeners(); 417 } 418 return; 419 } 420 } 421 422 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 423 if (desc.getName().equals(listenerName)) { 424 desc.setEnabled(enabled); 425 synchronized (this) { 426 listenerDescriptors.recomputeEnabledListeners(); 427 } 428 return; 429 } 430 } 431 432 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 433 if (desc.getName().equals(listenerName)) { 434 desc.setEnabled(enabled); 435 synchronized (this) { 436 listenerDescriptors.recomputeEnabledListeners(); 437 } 438 return; 439 } 440 } 441 } 442 443 @Override 444 public int getActiveThreadsCount() { 445 return asyncExec.getActiveCount(); 446 } 447 448 @Override 449 public int getEventsInQueueCount() { 450 return asyncExec.getUnfinishedCount(); 451 } 452 453 @Override 454 public boolean isBlockAsyncHandlers() { 455 return blockAsyncProcessing; 456 } 457 458 @Override 459 public boolean isBlockSyncPostCommitHandlers() { 460 return blockSyncPostCommitProcessing; 461 } 462 463 @Override 464 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 465 blockAsyncProcessing = blockAsyncHandlers; 466 } 467 468 @Override 469 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 470 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 471 } 472 473 @Override 474 public boolean isBulkModeEnabled() { 475 return bulkModeEnabled; 476 } 477 478 @Override 479 public void setBulkModeEnabled(boolean bulkModeEnabled) { 480 this.bulkModeEnabled = bulkModeEnabled; 481 } 482 483 protected void recordEvent(Event event) { 484 CompositeEventBundle b = threadBundles.get(); 485 b.push(event); 486 if (TransactionHelper.isTransactionActive()) { 487 if (!b.registeredSynchronization) { 488 // register as synchronization 489 try { 490 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 491 } catch (NamingException | SystemException | RollbackException e) { 492 throw new RuntimeException("Cannot register Synchronization", e); 493 } 494 b.registeredSynchronization = true; 495 } 496 } else if (event.isCommitEvent()) { 497 handleTxCommited(); 498 } 499 } 500 501 @Override 502 public void beforeCompletion() { 503 } 504 505 @Override 506 public void afterCompletion(int status) { 507 if (status == Status.STATUS_COMMITTED) { 508 handleTxCommited(); 509 } else if (status == Status.STATUS_ROLLEDBACK) { 510 handleTxRollbacked(); 511 } else { 512 log.error("Unexpected afterCompletion status: " + status); 513 } 514 } 515 516 protected void handleTxRollbacked() { 517 threadBundles.remove(); 518 } 519 520 protected void handleTxCommited() { 521 CompositeEventBundle b = threadBundles.get(); 522 threadBundles.remove(); 523 524 // notify post commit event listeners 525 for (EventBundle bundle : b.byRepository.values()) { 526 try { 527 fireEventBundle(bundle); 528 } catch (NuxeoException e) { 529 log.error("Error while processing " + bundle, e); 530 } 531 } 532 } 533 534}