001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.rmi.dgc.VMID; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.CopyOnWriteArrayList; 031import java.util.stream.Collectors; 032 033import javax.naming.NamingException; 034import javax.transaction.RollbackException; 035import javax.transaction.Status; 036import javax.transaction.Synchronization; 037import javax.transaction.SystemException; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.common.logging.SequenceTracer; 042import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.core.api.RecoverableClientException; 045import org.nuxeo.ecm.core.event.Event; 046import org.nuxeo.ecm.core.event.EventBundle; 047import org.nuxeo.ecm.core.event.EventContext; 048import org.nuxeo.ecm.core.event.EventListener; 049import org.nuxeo.ecm.core.event.EventService; 050import org.nuxeo.ecm.core.event.EventServiceAdmin; 051import org.nuxeo.ecm.core.event.EventStats; 052import org.nuxeo.ecm.core.event.PostCommitEventListener; 053import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 054import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor; 055import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry; 056import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher; 057import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor; 058import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry; 059import org.nuxeo.runtime.api.Framework; 060import org.nuxeo.runtime.transaction.TransactionHelper; 061 062/** 063 * Implementation of the event service. 064 */ 065public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization { 066 067 public static final VMID VMID = new VMID(); 068 069 private static final Log log = LogFactory.getLog(EventServiceImpl.class); 070 071 protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { 072 @Override 073 protected CompositeEventBundle initialValue() { 074 return new CompositeEventBundle(); 075 } 076 }; 077 078 private static class CompositeEventBundle { 079 080 boolean registeredSynchronization; 081 082 final Map<String, EventBundle> byRepository = new HashMap<>(); 083 084 void push(Event event) { 085 String repositoryName = event.getContext().getRepositoryName(); 086 if (!byRepository.containsKey(repositoryName)) { 087 byRepository.put(repositoryName, new EventBundleImpl()); 088 } 089 byRepository.get(repositoryName).push(event); 090 } 091 092 } 093 094 protected final EventListenerList listenerDescriptors; 095 096 protected PostCommitEventExecutor postCommitExec; 097 098 protected volatile AsyncEventExecutor asyncExec; 099 100 protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList<>(); 101 102 protected boolean blockAsyncProcessing = false; 103 104 protected boolean blockSyncPostCommitProcessing = false; 105 106 protected boolean bulkModeEnabled = false; 107 108 protected EventPipeRegistry registeredPipes = new EventPipeRegistry(); 109 110 protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry(); 111 112 protected EventBundleDispatcher pipeDispatcher; 113 114 public EventServiceImpl() { 115 listenerDescriptors = new EventListenerList(); 116 postCommitExec = new PostCommitEventExecutor(); 117 asyncExec = new AsyncEventExecutor(); 118 } 119 120 public void init() { 121 asyncExec.init(); 122 123 EventDispatcherDescriptor dispatcherDescriptor = dispatchers.getDispatcherDescriptor(); 124 if (dispatcherDescriptor != null) { 125 List<EventPipeDescriptor> pipes = registeredPipes.getPipes(); 126 if (!pipes.isEmpty()) { 127 pipeDispatcher = dispatcherDescriptor.getInstance(); 128 pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters()); 129 } 130 } 131 } 132 133 public EventBundleDispatcher getEventBundleDispatcher() { 134 return pipeDispatcher; 135 } 136 137 public void shutdown(long timeoutMillis) throws InterruptedException { 138 postCommitExec.shutdown(timeoutMillis); 139 Set<AsyncWaitHook> notTerminated = asyncWaitHooks.stream().filter(hook -> !hook.shutdown()).collect( 140 Collectors.toSet()); 141 if (!notTerminated.isEmpty()) { 142 throw new RuntimeException("Asynch services are still running : " + notTerminated); 143 } 144 145 if (!asyncExec.shutdown(timeoutMillis)) { 146 throw new RuntimeException("Async executor is still running, timeout expired"); 147 } 148 if (pipeDispatcher != null) { 149 pipeDispatcher.shutdown(); 150 } 151 } 152 153 public void registerForAsyncWait(AsyncWaitHook callback) { 154 asyncWaitHooks.add(callback); 155 } 156 157 public void unregisterForAsyncWait(AsyncWaitHook callback) { 158 asyncWaitHooks.remove(callback); 159 } 160 161 @Override 162 public void waitForAsyncCompletion() { 163 waitForAsyncCompletion(Long.MAX_VALUE); 164 } 165 166 @Override 167 public void waitForAsyncCompletion(long timeout) { 168 Set<AsyncWaitHook> notCompleted = asyncWaitHooks.stream() 169 .filter(hook -> !hook.waitForAsyncCompletion()) 170 .collect(Collectors.toSet()); 171 if (!notCompleted.isEmpty()) { 172 throw new RuntimeException("Async tasks are still running : " + notCompleted); 173 } 174 try { 175 if (!asyncExec.waitForCompletion(timeout)) { 176 throw new RuntimeException("Async event listeners thread pool is not terminated"); 177 } 178 } catch (InterruptedException e) { 179 Thread.currentThread().interrupt(); 180 // TODO change signature 181 throw new RuntimeException(e); 182 } 183 if (pipeDispatcher != null) { 184 try { 185 pipeDispatcher.waitForCompletion(timeout); 186 } catch (InterruptedException e) { 187 Thread.currentThread().interrupt(); 188 throw new RuntimeException(e); 189 } 190 } 191 } 192 193 @Override 194 public void addEventListener(EventListenerDescriptor listener) { 195 listenerDescriptors.add(listener); 196 log.debug("Registered event listener: " + listener.getName()); 197 } 198 199 public void addEventPipe(EventPipeDescriptor pipeDescriptor) { 200 registeredPipes.addContribution(pipeDescriptor); 201 log.debug("Registered event pipe: " + pipeDescriptor.getName()); 202 } 203 204 public void addEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 205 dispatchers.addContrib(dispatcherDescriptor); 206 log.debug("Registered event dispatcher: " + dispatcherDescriptor.getName()); 207 } 208 209 @Override 210 public void removeEventListener(EventListenerDescriptor listener) { 211 listenerDescriptors.removeDescriptor(listener); 212 log.debug("Unregistered event listener: " + listener.getName()); 213 } 214 215 public void removeEventPipe(EventPipeDescriptor pipeDescriptor) { 216 registeredPipes.removeContribution(pipeDescriptor); 217 log.debug("Unregistered event pipe: " + pipeDescriptor.getName()); 218 } 219 220 public void removeEventDispatcher(EventDispatcherDescriptor dispatcherDescriptor) { 221 dispatchers.removeContrib(dispatcherDescriptor); 222 log.debug("Unregistered event dispatcher: " + dispatcherDescriptor.getName()); 223 } 224 225 @Override 226 public void fireEvent(String name, EventContext context) { 227 fireEvent(new EventImpl(name, context)); 228 } 229 230 @Override 231 public void fireEvent(Event event) { 232 233 String ename = event.getName(); 234 EventStats stats = Framework.getService(EventStats.class); 235 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledInlineListenersDescriptors()) { 236 if (!desc.acceptEvent(ename)) { 237 continue; 238 } 239 try { 240 long t0 = System.currentTimeMillis(); 241 SequenceTracer.start("Fire sync event " + event.getName()); 242 desc.asEventListener().handleEvent(event); 243 long elapsed = System.currentTimeMillis() - t0; 244 SequenceTracer.stop("done in " + elapsed + " ms"); 245 if (stats != null) { 246 stats.logSyncExec(desc, elapsed); 247 } 248 if (event.isCanceled()) { 249 // break loop 250 return; 251 } 252 } catch (ConcurrentUpdateException e) { 253 // never swallow ConcurrentUpdateException 254 throw e; 255 } catch (RuntimeException e) { 256 // get message 257 SequenceTracer.destroy("failure"); 258 String message = "Exception during " + desc.getName() + " sync listener execution, "; 259 if (event.isBubbleException()) { 260 message += "other listeners will be ignored"; 261 } else if (event.isMarkedForRollBack()) { 262 message += "transaction will be rolled back"; 263 if (event.getRollbackMessage() != null) { 264 message += " (" + event.getRollbackMessage() + ")"; 265 } 266 } else { 267 message += "continuing to run other listeners"; 268 } 269 // log 270 if (e instanceof RecoverableClientException) { 271 log.info(message + "\n" + e.getMessage()); 272 log.debug(message, e); 273 } else { 274 log.error(message, e); 275 } 276 // rethrow or swallow 277 if (TransactionHelper.isTransactionMarkedRollback()) { 278 throw e; 279 } else if (event.isBubbleException()) { 280 throw e; 281 } else if (event.isMarkedForRollBack()) { 282 Exception ee; 283 if (event.getRollbackException() != null) { 284 ee = event.getRollbackException(); 285 } else { 286 ee = e; 287 } 288 // when marked for rollback, throw a generic 289 // RuntimeException to make sure nobody catches it 290 throw new RuntimeException(message, ee); 291 } else { 292 // swallow exception 293 } 294 } 295 } 296 297 if (!event.isInline()) { // record the event 298 // don't record the complete event, only a shallow copy 299 ShallowEvent shallowEvent = ShallowEvent.create(event); 300 if (event.isImmediate()) { 301 EventBundleImpl b = new EventBundleImpl(); 302 b.push(shallowEvent); 303 fireEventBundle(b); 304 } else { 305 recordEvent(shallowEvent); 306 } 307 } 308 } 309 310 @Override 311 public void fireEventBundle(EventBundle event) { 312 List<EventListenerDescriptor> postCommitSync = listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors(); 313 List<EventListenerDescriptor> postCommitAsync = listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors(); 314 315 if (bulkModeEnabled) { 316 // run all listeners synchronously in one transaction 317 List<EventListenerDescriptor> listeners = new ArrayList<>(); 318 if (!blockSyncPostCommitProcessing) { 319 listeners = postCommitSync; 320 } 321 if (!blockAsyncProcessing) { 322 listeners.addAll(postCommitAsync); 323 } 324 if (!listeners.isEmpty()) { 325 postCommitExec.runBulk(listeners, event); 326 } 327 return; 328 } 329 330 // run sync listeners 331 if (blockSyncPostCommitProcessing) { 332 log.debug("Dropping PostCommit handler execution"); 333 } else { 334 if (!postCommitSync.isEmpty()) { 335 postCommitExec.run(postCommitSync, event); 336 } 337 } 338 339 if (blockAsyncProcessing) { 340 log.debug("Dopping bundle"); 341 return; 342 } 343 344 // fire async listeners 345 if (pipeDispatcher == null) { 346 asyncExec.run(postCommitAsync, event); 347 } else { 348 // rather than sending to the WorkManager: send to the Pipe 349 pipeDispatcher.sendEventBundle(event); 350 } 351 } 352 353 @Override 354 public void fireEventBundleSync(EventBundle event) { 355 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors()) { 356 desc.asPostCommitListener().handleEvent(event); 357 } 358 for (EventListenerDescriptor desc : listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors()) { 359 desc.asPostCommitListener().handleEvent(event); 360 } 361 } 362 363 @Override 364 public List<EventListener> getEventListeners() { 365 return listenerDescriptors.getInLineListeners(); 366 } 367 368 @Override 369 public List<PostCommitEventListener> getPostCommitEventListeners() { 370 List<PostCommitEventListener> result = new ArrayList<>(); 371 372 result.addAll(listenerDescriptors.getSyncPostCommitListeners()); 373 result.addAll(listenerDescriptors.getAsyncPostCommitListeners()); 374 375 return result; 376 } 377 378 public EventListenerList getEventListenerList() { 379 return listenerDescriptors; 380 } 381 382 @Override 383 public EventListenerDescriptor getEventListener(String name) { 384 return listenerDescriptors.getDescriptor(name); 385 } 386 387 // methods for monitoring 388 389 @Override 390 public EventListenerList getListenerList() { 391 return listenerDescriptors; 392 } 393 394 @Override 395 public void setListenerEnabledFlag(String listenerName, boolean enabled) { 396 if (!listenerDescriptors.hasListener(listenerName)) { 397 return; 398 } 399 400 for (EventListenerDescriptor desc : listenerDescriptors.getAsyncPostCommitListenersDescriptors()) { 401 if (desc.getName().equals(listenerName)) { 402 desc.setEnabled(enabled); 403 synchronized (this) { 404 listenerDescriptors.recomputeEnabledListeners(); 405 } 406 return; 407 } 408 } 409 410 for (EventListenerDescriptor desc : listenerDescriptors.getSyncPostCommitListenersDescriptors()) { 411 if (desc.getName().equals(listenerName)) { 412 desc.setEnabled(enabled); 413 synchronized (this) { 414 listenerDescriptors.recomputeEnabledListeners(); 415 } 416 return; 417 } 418 } 419 420 for (EventListenerDescriptor desc : listenerDescriptors.getInlineListenersDescriptors()) { 421 if (desc.getName().equals(listenerName)) { 422 desc.setEnabled(enabled); 423 synchronized (this) { 424 listenerDescriptors.recomputeEnabledListeners(); 425 } 426 return; 427 } 428 } 429 } 430 431 @Override 432 public int getActiveThreadsCount() { 433 return asyncExec.getActiveCount(); 434 } 435 436 @Override 437 public int getEventsInQueueCount() { 438 return asyncExec.getUnfinishedCount(); 439 } 440 441 @Override 442 public boolean isBlockAsyncHandlers() { 443 return blockAsyncProcessing; 444 } 445 446 @Override 447 public boolean isBlockSyncPostCommitHandlers() { 448 return blockSyncPostCommitProcessing; 449 } 450 451 @Override 452 public void setBlockAsyncHandlers(boolean blockAsyncHandlers) { 453 blockAsyncProcessing = blockAsyncHandlers; 454 } 455 456 @Override 457 public void setBlockSyncPostCommitHandlers(boolean blockSyncPostComitHandlers) { 458 blockSyncPostCommitProcessing = blockSyncPostComitHandlers; 459 } 460 461 @Override 462 public boolean isBulkModeEnabled() { 463 return bulkModeEnabled; 464 } 465 466 @Override 467 public void setBulkModeEnabled(boolean bulkModeEnabled) { 468 this.bulkModeEnabled = bulkModeEnabled; 469 } 470 471 protected void recordEvent(Event event) { 472 CompositeEventBundle b = threadBundles.get(); 473 b.push(event); 474 if (TransactionHelper.isTransactionActive()) { 475 if (!b.registeredSynchronization) { 476 // register as synchronization 477 try { 478 TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this); 479 } catch (NamingException | SystemException | RollbackException e) { 480 throw new RuntimeException("Cannot register Synchronization", e); 481 } 482 b.registeredSynchronization = true; 483 } 484 } else if (event.isCommitEvent()) { 485 handleTxCommited(); 486 } 487 } 488 489 @Override 490 public void beforeCompletion() { 491 } 492 493 @Override 494 public void afterCompletion(int status) { 495 if (status == Status.STATUS_COMMITTED) { 496 handleTxCommited(); 497 } else if (status == Status.STATUS_ROLLEDBACK) { 498 handleTxRollbacked(); 499 } else { 500 log.error("Unexpected afterCompletion status: " + status); 501 } 502 } 503 504 protected void handleTxRollbacked() { 505 threadBundles.remove(); 506 } 507 508 protected void handleTxCommited() { 509 CompositeEventBundle b = threadBundles.get(); 510 threadBundles.remove(); 511 512 // notify post commit event listeners 513 for (EventBundle bundle : b.byRepository.values()) { 514 try { 515 fireEventBundle(bundle); 516 } catch (NuxeoException e) { 517 log.error("Error while processing " + bundle, e); 518 } 519 } 520 } 521 522}