001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.common.utils.ExceptionUtils; 033import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 034import org.nuxeo.ecm.core.api.CoreSession; 035import org.nuxeo.ecm.core.api.DocumentModel; 036import org.nuxeo.ecm.core.event.Event; 037import org.nuxeo.ecm.core.event.EventBundle; 038import org.nuxeo.ecm.core.event.EventContext; 039import org.nuxeo.ecm.core.event.EventService; 040import org.nuxeo.ecm.core.event.EventStats; 041import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 042import org.nuxeo.ecm.core.work.AbstractWork; 043import org.nuxeo.ecm.core.work.api.Work.State; 044import org.nuxeo.ecm.core.work.api.WorkManager; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.transaction.TransactionHelper; 047 048/** 049 * Executor of async listeners passing them to the WorkManager. 050 */ 051@SuppressWarnings("PackageAccessibility") 052public class AsyncEventExecutor { 053 054 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 055 056 public AsyncEventExecutor() { 057 } 058 059 public WorkManager getWorkManager() { 060 return Framework.getService(WorkManager.class); 061 } 062 063 public void init() { 064 WorkManager workManager = getWorkManager(); 065 if (workManager != null) { 066 workManager.init(); 067 } 068 } 069 070 public boolean shutdown(long timeoutMillis) throws InterruptedException { 071 WorkManager workManager = getWorkManager(); 072 if (workManager == null) { 073 return true; 074 } 075 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 076 } 077 078 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 079 WorkManager workManager = getWorkManager(); 080 if (workManager == null) { 081 return false; 082 } 083 084 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 085 } 086 087 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 088 // EventBundle that have gone through bus have been serialized 089 // we need to reconnect them before filtering 090 // this means we need a valid transaction ! 091 if (!(bundle instanceof ReconnectedEventBundleImpl)) { 092 scheduleListeners(listeners, bundle); 093 } else { 094 final EventBundle tmpBundle = bundle; 095 096 TransactionHelper.runInTransaction(() -> { 097 EventBundle connectedBundle = new EventBundleImpl(); 098 Map<String, CoreSession> sessions = new HashMap<>(); 099 100 List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents(); 101 for (Event event : events) { 102 connectedBundle.push(event); 103 CoreSession session = event.getContext().getCoreSession(); 104 if (!(sessions.keySet().contains(session.getRepositoryName()))) { 105 sessions.put(session.getRepositoryName(), session); 106 } 107 } 108 109 sessions.values().forEach(CoreSession::close); 110 scheduleListeners(listeners, connectedBundle); 111 }); 112 } 113 } 114 115 private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 116 for (EventListenerDescriptor listener : listeners) { 117 EventBundle filtered = listener.filterBundle(bundle); 118 if (filtered.isEmpty()) { 119 continue; 120 } 121 // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit 122 // in other cases. If the transaction has been marked rollback-only, then scheduling must discard 123 // so we schedule "after commit" 124 getWorkManager().schedule(new ListenerWork(listener, filtered), true); 125 } 126 } 127 128 public int getUnfinishedCount() { 129 WorkManager workManager = getWorkManager(); 130 int n = 0; 131 for (String queueId : workManager.getWorkQueueIds()) { 132 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 133 } 134 return n; 135 } 136 137 public int getActiveCount() { 138 WorkManager workManager = getWorkManager(); 139 int n = 0; 140 for (String queueId : workManager.getWorkQueueIds()) { 141 n += workManager.getQueueSize(queueId, State.RUNNING); 142 } 143 return n; 144 } 145 146 protected static class ListenerWork extends AbstractWork { 147 148 private static final long serialVersionUID = 1L; 149 150 private static final int DEFAULT_RETRY_COUNT = 2; 151 152 protected final String title; 153 154 protected ReconnectedEventBundle bundle; 155 156 protected String listenerName; 157 158 protected int retryCount; 159 160 protected transient EventListenerDescriptor listener; 161 162 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 163 super(); // random id, for unique job 164 listenerName = listener.getName(); 165 if (bundle instanceof ReconnectedEventBundle) { 166 this.bundle = (ReconnectedEventBundle) bundle; 167 } else { 168 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 169 } 170 List<String> l = new LinkedList<String>(); 171 List<String> docIds = new LinkedList<String>(); 172 String repositoryName = null; 173 for (Event event : bundle) { 174 String s = event.getName(); 175 EventContext ctx = event.getContext(); 176 if (ctx instanceof DocumentEventContext) { 177 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 178 if (source != null) { 179 s += "/" + source.getRef(); 180 docIds.add(source.getId()); 181 repositoryName = source.getRepositoryName(); 182 } 183 } 184 l.add(s); 185 } 186 title = "Listener " + listenerName + " " + l; 187 if (!docIds.isEmpty()) { 188 setDocuments(repositoryName, docIds); 189 } 190 Integer count = listener.getRetryCount(); 191 retryCount = count == null ? DEFAULT_RETRY_COUNT : count; 192 if (retryCount < 0) { 193 retryCount = DEFAULT_RETRY_COUNT; 194 } 195 } 196 197 @Override 198 public String getCategory() { 199 return listenerName; 200 } 201 202 @Override 203 public String getTitle() { 204 return title; 205 } 206 207 @Override 208 public int getRetryCount() { 209 return retryCount; 210 } 211 212 @Override 213 public void work() { 214 EventService eventService = Framework.getService(EventService.class); 215 listener = eventService.getEventListener(listenerName); 216 if (listener == null) { 217 throw new RuntimeException("Cannot find listener: " + listenerName); 218 } 219 listener.asPostCommitListener().handleEvent(bundle); 220 } 221 222 @Override 223 public void cleanUp(boolean ok, Exception e) { 224 super.cleanUp(ok, e); 225 bundle.disconnect(); 226 if (e != null && !ExceptionUtils.hasInterruptedCause(e) && !(e instanceof ConcurrentUpdateException)) { 227 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 228 } 229 if (listener != null) { 230 EventStats stats = Framework.getService(EventStats.class); 231 if (stats != null) { 232 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 233 } 234 listener = null; 235 } 236 } 237 238 @Override 239 public String toString() { 240 StringBuilder buf = new StringBuilder(); 241 buf.append(getClass().getSimpleName()); 242 buf.append('('); 243 buf.append(title); 244 buf.append(", "); 245 buf.append(getProgress()); 246 buf.append(", "); 247 buf.append(getStatus()); 248 buf.append(')'); 249 return buf.toString(); 250 } 251 } 252 253}