001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.common.utils.ExceptionUtils; 033import org.nuxeo.ecm.core.api.CloseableCoreSession; 034import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 035import org.nuxeo.ecm.core.api.CoreSession; 036import org.nuxeo.ecm.core.api.DocumentModel; 037import org.nuxeo.ecm.core.event.Event; 038import org.nuxeo.ecm.core.event.EventBundle; 039import org.nuxeo.ecm.core.event.EventContext; 040import org.nuxeo.ecm.core.event.EventService; 041import org.nuxeo.ecm.core.event.EventStats; 042import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 043import org.nuxeo.ecm.core.work.AbstractWork; 044import org.nuxeo.ecm.core.work.api.Work.State; 045import org.nuxeo.ecm.core.work.api.WorkManager; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.transaction.TransactionHelper; 048 049/** 050 * Executor of async listeners passing them to the WorkManager. 051 */ 052@SuppressWarnings("PackageAccessibility") 053public class AsyncEventExecutor { 054 055 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 056 057 public AsyncEventExecutor() { 058 } 059 060 public WorkManager getWorkManager() { 061 return Framework.getService(WorkManager.class); 062 } 063 064 public void init() { 065 WorkManager workManager = getWorkManager(); 066 if (workManager != null) { 067 workManager.init(); 068 } 069 } 070 071 public boolean shutdown(long timeoutMillis) throws InterruptedException { 072 WorkManager workManager = getWorkManager(); 073 if (workManager == null) { 074 return true; 075 } 076 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 077 } 078 079 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 080 WorkManager workManager = getWorkManager(); 081 if (workManager == null) { 082 return false; 083 } 084 085 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 086 } 087 088 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 089 // EventBundle that have gone through bus have been serialized 090 // we need to reconnect them before filtering 091 // this means we need a valid transaction ! 092 if (!(bundle instanceof ReconnectedEventBundleImpl)) { 093 scheduleListeners(listeners, bundle); 094 } else { 095 final EventBundle tmpBundle = bundle; 096 097 TransactionHelper.runInTransaction(() -> { 098 EventBundle connectedBundle = new EventBundleImpl(); 099 Map<String, CoreSession> sessions = new HashMap<>(); 100 101 List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents(); 102 for (Event event : events) { 103 connectedBundle.push(event); 104 CoreSession session = event.getContext().getCoreSession(); 105 if (!(sessions.keySet().contains(session.getRepositoryName()))) { 106 sessions.put(session.getRepositoryName(), session); 107 } 108 } 109 for (CoreSession session : sessions.values()) { 110 ((CloseableCoreSession) session).close(); 111 } 112 scheduleListeners(listeners, connectedBundle); 113 }); 114 } 115 } 116 117 private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 118 for (EventListenerDescriptor listener : listeners) { 119 EventBundle filtered = listener.filterBundle(bundle); 120 if (filtered.isEmpty()) { 121 continue; 122 } 123 // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit 124 // in other cases. If the transaction has been marked rollback-only, then scheduling must discard 125 // so we schedule "after commit" 126 getWorkManager().schedule(new ListenerWork(listener, filtered), true); 127 } 128 } 129 130 public int getUnfinishedCount() { 131 WorkManager workManager = getWorkManager(); 132 int n = 0; 133 for (String queueId : workManager.getWorkQueueIds()) { 134 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 135 } 136 return n; 137 } 138 139 public int getActiveCount() { 140 WorkManager workManager = getWorkManager(); 141 int n = 0; 142 for (String queueId : workManager.getWorkQueueIds()) { 143 n += workManager.getQueueSize(queueId, State.RUNNING); 144 } 145 return n; 146 } 147 148 protected static class ListenerWork extends AbstractWork { 149 150 private static final long serialVersionUID = 1L; 151 152 private static final int DEFAULT_RETRY_COUNT = 2; 153 154 protected final String title; 155 156 protected ReconnectedEventBundle bundle; 157 158 protected String listenerName; 159 160 protected int retryCount; 161 162 protected transient EventListenerDescriptor listener; 163 164 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 165 super(); // random id, for unique job 166 listenerName = listener.getName(); 167 if (bundle instanceof ReconnectedEventBundle) { 168 this.bundle = (ReconnectedEventBundle) bundle; 169 } else { 170 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 171 } 172 List<String> l = new LinkedList<String>(); 173 List<String> docIds = new LinkedList<String>(); 174 String repositoryName = null; 175 for (Event event : bundle) { 176 String s = event.getName(); 177 EventContext ctx = event.getContext(); 178 if (ctx instanceof DocumentEventContext) { 179 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 180 if (source != null) { 181 s += "/" + source.getRef(); 182 docIds.add(source.getId()); 183 repositoryName = source.getRepositoryName(); 184 } 185 } 186 l.add(s); 187 } 188 title = "Listener " + listenerName + " " + l; 189 if (!docIds.isEmpty()) { 190 setDocuments(repositoryName, docIds); 191 } 192 Integer count = listener.getRetryCount(); 193 retryCount = count == null ? DEFAULT_RETRY_COUNT : count; 194 if (retryCount < 0) { 195 retryCount = DEFAULT_RETRY_COUNT; 196 } 197 } 198 199 @Override 200 public String getCategory() { 201 return listenerName; 202 } 203 204 @Override 205 public String getTitle() { 206 return title; 207 } 208 209 @Override 210 public int getRetryCount() { 211 return retryCount; 212 } 213 214 @Override 215 public void work() { 216 EventService eventService = Framework.getService(EventService.class); 217 listener = eventService.getEventListener(listenerName); 218 if (listener == null) { 219 throw new RuntimeException("Cannot find listener: " + listenerName); 220 } 221 listener.asPostCommitListener().handleEvent(bundle); 222 } 223 224 @Override 225 public void cleanUp(boolean ok, Exception e) { 226 super.cleanUp(ok, e); 227 bundle.disconnect(); 228 if (e != null && !ExceptionUtils.hasInterruptedCause(e) && !(e instanceof ConcurrentUpdateException)) { 229 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 230 } 231 if (listener != null) { 232 EventStats stats = Framework.getService(EventStats.class); 233 if (stats != null) { 234 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 235 } 236 listener = null; 237 } 238 } 239 240 @Override 241 public String toString() { 242 StringBuilder buf = new StringBuilder(); 243 buf.append(getClass().getSimpleName()); 244 buf.append('('); 245 buf.append(title); 246 buf.append(", "); 247 buf.append(getProgress()); 248 buf.append(", "); 249 buf.append(getStatus()); 250 buf.append(')'); 251 return buf.toString(); 252 } 253 } 254 255}