001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 027import org.nuxeo.ecm.core.api.CoreSession; 028import org.nuxeo.ecm.core.api.DocumentModel; 029import org.nuxeo.ecm.core.event.Event; 030import org.nuxeo.ecm.core.event.EventBundle; 031import org.nuxeo.ecm.core.event.EventContext; 032import org.nuxeo.ecm.core.event.EventService; 033import org.nuxeo.ecm.core.event.EventStats; 034import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 035import org.nuxeo.ecm.core.work.AbstractWork; 036import org.nuxeo.ecm.core.work.api.Work.State; 037import org.nuxeo.ecm.core.work.api.WorkManager; 038import org.nuxeo.runtime.api.Framework; 039import org.nuxeo.runtime.transaction.TransactionHelper; 040 041import java.util.HashMap; 042import java.util.LinkedList; 043import java.util.List; 044import java.util.Map; 045import java.util.concurrent.TimeUnit; 046 047/** 048 * Executor of async listeners passing them to the WorkManager. 049 */ 050@SuppressWarnings("PackageAccessibility") 051public class AsyncEventExecutor { 052 053 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 054 055 public AsyncEventExecutor() { 056 } 057 058 public WorkManager getWorkManager() { 059 return Framework.getLocalService(WorkManager.class); 060 } 061 062 public void init() { 063 WorkManager workManager = getWorkManager(); 064 if (workManager != null) { 065 workManager.init(); 066 } 067 } 068 069 public boolean shutdown(long timeoutMillis) throws InterruptedException { 070 WorkManager workManager = getWorkManager(); 071 if (workManager == null) { 072 return true; 073 } 074 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 075 } 076 077 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 078 WorkManager workManager = getWorkManager(); 079 if (workManager == null) { 080 return false; 081 } 082 083 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 084 } 085 086 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 087 // EventBundle that have gone through bus have been serialized 088 // we need to reconnect them before filtering 089 // this means we need a valid transaction ! 090 if (!(bundle instanceof ReconnectedEventBundleImpl)) { 091 scheduleListeners(listeners, bundle); 092 } else { 093 final EventBundle tmpBundle = bundle; 094 095 TransactionHelper.runInTransaction(() -> { 096 EventBundle connectedBundle = new EventBundleImpl(); 097 Map<String, CoreSession> sessions = new HashMap<>(); 098 099 List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents(); 100 for (Event event : events) { 101 connectedBundle.push(event); 102 CoreSession session = event.getContext().getCoreSession(); 103 if (!(sessions.keySet().contains(session.getRepositoryName()))) { 104 sessions.put(session.getRepositoryName(), session); 105 } 106 } 107 108 sessions.values().forEach(CoreSession::close); 109 scheduleListeners(listeners, connectedBundle); 110 }); 111 } 112 } 113 114 private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 115 for (EventListenerDescriptor listener : listeners) { 116 EventBundle filtered = listener.filterBundle(bundle); 117 if (filtered.isEmpty()) { 118 continue; 119 } 120 // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit 121 // in other cases. If the transaction has been marked rollback-only, then scheduling must discard 122 // so we schedule "after commit" 123 getWorkManager().schedule(new ListenerWork(listener, filtered), true); 124 } 125 } 126 127 public int getUnfinishedCount() { 128 WorkManager workManager = getWorkManager(); 129 int n = 0; 130 for (String queueId : workManager.getWorkQueueIds()) { 131 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 132 } 133 return n; 134 } 135 136 public int getActiveCount() { 137 WorkManager workManager = getWorkManager(); 138 int n = 0; 139 for (String queueId : workManager.getWorkQueueIds()) { 140 n += workManager.getQueueSize(queueId, State.RUNNING); 141 } 142 return n; 143 } 144 145 protected static class ListenerWork extends AbstractWork { 146 147 private static final long serialVersionUID = 1L; 148 149 private static final int DEFAULT_RETRY_COUNT = 2; 150 151 protected final String title; 152 153 protected ReconnectedEventBundle bundle; 154 155 protected String listenerName; 156 157 protected int retryCount; 158 159 protected transient EventListenerDescriptor listener; 160 161 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 162 super(); // random id, for unique job 163 listenerName = listener.getName(); 164 if (bundle instanceof ReconnectedEventBundle) { 165 this.bundle = (ReconnectedEventBundle) bundle; 166 } else { 167 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 168 } 169 List<String> l = new LinkedList<String>(); 170 List<String> docIds = new LinkedList<String>(); 171 String repositoryName = null; 172 for (Event event : bundle) { 173 String s = event.getName(); 174 EventContext ctx = event.getContext(); 175 if (ctx instanceof DocumentEventContext) { 176 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 177 if (source != null) { 178 s += "/" + source.getRef(); 179 docIds.add(source.getId()); 180 repositoryName = source.getRepositoryName(); 181 } 182 } 183 l.add(s); 184 } 185 title = "Listener " + listenerName + " " + l; 186 if (!docIds.isEmpty()) { 187 setDocuments(repositoryName, docIds); 188 } 189 Integer count = listener.getRetryCount(); 190 retryCount = count == null ? DEFAULT_RETRY_COUNT : count; 191 if (retryCount < 0) { 192 retryCount = DEFAULT_RETRY_COUNT; 193 } 194 } 195 196 @Override 197 public String getCategory() { 198 return listenerName; 199 } 200 201 @Override 202 public String getTitle() { 203 return title; 204 } 205 206 @Override 207 public int getRetryCount() { 208 return retryCount; 209 } 210 211 @Override 212 public void work() { 213 EventService eventService = Framework.getLocalService(EventService.class); 214 listener = eventService.getEventListener(listenerName); 215 if (listener == null) { 216 throw new RuntimeException("Cannot find listener: " + listenerName); 217 } 218 listener.asPostCommitListener().handleEvent(bundle); 219 } 220 221 @Override 222 public void cleanUp(boolean ok, Exception e) { 223 super.cleanUp(ok, e); 224 bundle.disconnect(); 225 if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) { 226 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 227 } 228 if (listener != null) { 229 EventStats stats = Framework.getLocalService(EventStats.class); 230 if (stats != null) { 231 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 232 } 233 listener = null; 234 } 235 } 236 237 @Override 238 public String toString() { 239 StringBuilder buf = new StringBuilder(); 240 buf.append(getClass().getSimpleName()); 241 buf.append('('); 242 buf.append(title); 243 buf.append(", "); 244 buf.append(getProgress()); 245 buf.append(", "); 246 buf.append(getStatus()); 247 buf.append(')'); 248 return buf.toString(); 249 } 250 } 251 252}