001/* 002 * Copyright (c) 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Bogdan Stefanescu 011 * Thierry Delprat 012 * Florent Guillaume 013 */ 014package org.nuxeo.ecm.core.event.impl; 015 016import java.util.LinkedList; 017import java.util.List; 018import java.util.concurrent.TimeUnit; 019 020import org.apache.commons.logging.Log; 021import org.apache.commons.logging.LogFactory; 022import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 023import org.nuxeo.ecm.core.api.DocumentModel; 024import org.nuxeo.ecm.core.event.Event; 025import org.nuxeo.ecm.core.event.EventBundle; 026import org.nuxeo.ecm.core.event.EventContext; 027import org.nuxeo.ecm.core.event.EventService; 028import org.nuxeo.ecm.core.event.EventStats; 029import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 030import org.nuxeo.ecm.core.work.AbstractWork; 031import org.nuxeo.ecm.core.work.api.Work.State; 032import org.nuxeo.ecm.core.work.api.WorkManager; 033import org.nuxeo.runtime.api.Framework; 034 035/** 036 * Executor of async listeners passing them to the WorkManager. 037 */ 038public class AsyncEventExecutor { 039 040 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 041 042 public AsyncEventExecutor() { 043 } 044 045 public WorkManager getWorkManager() { 046 return Framework.getLocalService(WorkManager.class); 047 } 048 049 public void init() { 050 WorkManager workManager = getWorkManager(); 051 if (workManager != null) { 052 workManager.init(); 053 } 054 } 055 056 public boolean shutdown(long timeoutMillis) throws InterruptedException { 057 WorkManager workManager = getWorkManager(); 058 if (workManager == null) { 059 return true; 060 } 061 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 062 } 063 064 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 065 WorkManager workManager = getWorkManager(); 066 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 067 } 068 069 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 070 for (EventListenerDescriptor listener : listeners) { 071 EventBundle filtered = listener.filterBundle(bundle); 072 if (filtered.isEmpty()) { 073 continue; 074 } 075 getWorkManager().schedule(new ListenerWork(listener, filtered)); 076 } 077 } 078 079 public int getUnfinishedCount() { 080 WorkManager workManager = getWorkManager(); 081 int n = 0; 082 for (String queueId : workManager.getWorkQueueIds()) { 083 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 084 } 085 return n; 086 } 087 088 public int getActiveCount() { 089 WorkManager workManager = getWorkManager(); 090 int n = 0; 091 for (String queueId : workManager.getWorkQueueIds()) { 092 n += workManager.getQueueSize(queueId, State.RUNNING); 093 } 094 return n; 095 } 096 097 protected static class ListenerWork extends AbstractWork { 098 099 private static final long serialVersionUID = 1L; 100 101 private static final int DEFAULT_RETRY_COUNT = 2; 102 103 protected final String title; 104 105 protected ReconnectedEventBundle bundle; 106 107 protected String listenerName; 108 109 protected int retryCount; 110 111 protected transient EventListenerDescriptor listener; 112 113 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 114 super(); // random id, for unique job 115 listenerName = listener.getName(); 116 if (bundle instanceof ReconnectedEventBundle) { 117 this.bundle = (ReconnectedEventBundle) bundle; 118 } else { 119 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 120 } 121 List<String> l = new LinkedList<String>(); 122 List<String> docIds = new LinkedList<String>(); 123 String repositoryName = null; 124 for (Event event : bundle) { 125 String s = event.getName(); 126 EventContext ctx = event.getContext(); 127 if (ctx instanceof DocumentEventContext) { 128 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 129 if (source != null) { 130 s += "/" + source.getRef(); 131 docIds.add(source.getId()); 132 repositoryName = source.getRepositoryName(); 133 } 134 } 135 l.add(s); 136 } 137 title = "Listener " + listenerName + " " + l; 138 if (!docIds.isEmpty()) { 139 setDocuments(repositoryName, docIds); 140 } 141 Integer count = listener.getRetryCount(); 142 retryCount = count == null ? DEFAULT_RETRY_COUNT : count.intValue(); 143 if (retryCount < 0) { 144 retryCount = DEFAULT_RETRY_COUNT; 145 } 146 } 147 148 @Override 149 public String getCategory() { 150 return listenerName; 151 } 152 153 @Override 154 public String getTitle() { 155 return title; 156 } 157 158 @Override 159 public int getRetryCount() { 160 return retryCount; 161 } 162 163 @Override 164 public void work() { 165 EventService eventService = Framework.getLocalService(EventService.class); 166 listener = eventService.getEventListener(listenerName); 167 if (listener == null) { 168 throw new RuntimeException("Cannot find listener: " + listenerName); 169 } 170 listener.asPostCommitListener().handleEvent(bundle); 171 } 172 173 @Override 174 public void cleanUp(boolean ok, Exception e) { 175 super.cleanUp(ok, e); 176 bundle.disconnect(); 177 if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) { 178 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 179 } 180 if (listener != null) { 181 EventStats stats = Framework.getLocalService(EventStats.class); 182 if (stats != null) { 183 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 184 } 185 listener = null; 186 } 187 } 188 189 @Override 190 public String toString() { 191 StringBuilder buf = new StringBuilder(); 192 buf.append(getClass().getSimpleName()); 193 buf.append('('); 194 buf.append(title); 195 buf.append(", "); 196 buf.append(getProgress()); 197 buf.append(", "); 198 buf.append(getStatus()); 199 buf.append(')'); 200 return buf.toString(); 201 } 202 } 203 204}