001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 * Andrei Nechaev 021 */ 022package org.nuxeo.ecm.core.event.impl; 023 024import java.util.LinkedList; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.common.utils.ExceptionUtils; 031import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 032import org.nuxeo.ecm.core.api.DocumentModel; 033import org.nuxeo.ecm.core.event.Event; 034import org.nuxeo.ecm.core.event.EventBundle; 035import org.nuxeo.ecm.core.event.EventContext; 036import org.nuxeo.ecm.core.event.EventService; 037import org.nuxeo.ecm.core.event.EventStats; 038import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 039import org.nuxeo.ecm.core.work.AbstractWork; 040import org.nuxeo.ecm.core.work.api.Work.State; 041import org.nuxeo.ecm.core.work.api.WorkManager; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.transaction.TransactionHelper; 044 045/** 046 * Executor of async listeners passing them to the WorkManager. 047 */ 048@SuppressWarnings("PackageAccessibility") 049public class AsyncEventExecutor { 050 051 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 052 053 public AsyncEventExecutor() { 054 } 055 056 public WorkManager getWorkManager() { 057 return Framework.getService(WorkManager.class); 058 } 059 060 public void init() { 061 WorkManager workManager = getWorkManager(); 062 if (workManager != null) { 063 workManager.init(); 064 } 065 } 066 067 public boolean shutdown(long timeoutMillis) throws InterruptedException { 068 WorkManager workManager = getWorkManager(); 069 if (workManager == null) { 070 return true; 071 } 072 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 073 } 074 075 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 076 WorkManager workManager = getWorkManager(); 077 if (workManager == null) { 078 return false; 079 } 080 081 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 082 } 083 084 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 085 // EventBundle that have gone through bus have been serialized 086 // we need to reconnect them before filtering 087 // this means we need a valid transaction ! 088 if (!(bundle instanceof ReconnectedEventBundleImpl)) { 089 scheduleListeners(listeners, bundle); 090 } else { 091 final EventBundle tmpBundle = bundle; 092 093 TransactionHelper.runInTransaction(() -> { 094 EventBundle connectedBundle = new EventBundleImpl(); 095 096 List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents(); 097 for (Event event : events) { 098 connectedBundle.push(event); 099 } 100 scheduleListeners(listeners, connectedBundle); 101 }); 102 } 103 } 104 105 private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 106 for (EventListenerDescriptor listener : listeners) { 107 EventBundle filtered = listener.filterBundle(bundle); 108 if (filtered.isEmpty()) { 109 continue; 110 } 111 // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit 112 // in other cases. If the transaction has been marked rollback-only, then scheduling must discard 113 // so we schedule "after commit" 114 getWorkManager().schedule(new ListenerWork(listener, filtered), true); 115 } 116 } 117 118 public int getUnfinishedCount() { 119 WorkManager workManager = getWorkManager(); 120 int n = 0; 121 for (String queueId : workManager.getWorkQueueIds()) { 122 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 123 } 124 return n; 125 } 126 127 public int getActiveCount() { 128 WorkManager workManager = getWorkManager(); 129 int n = 0; 130 for (String queueId : workManager.getWorkQueueIds()) { 131 n += workManager.getQueueSize(queueId, State.RUNNING); 132 } 133 return n; 134 } 135 136 protected static class ListenerWork extends AbstractWork { 137 138 private static final long serialVersionUID = 1L; 139 140 private static final int DEFAULT_RETRY_COUNT = 2; 141 142 protected final String title; 143 144 protected ReconnectedEventBundle bundle; 145 146 protected String listenerName; 147 148 protected int retryCount; 149 150 protected transient EventListenerDescriptor listener; 151 152 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 153 super(); // random id, for unique job 154 listenerName = listener.getName(); 155 if (bundle instanceof ReconnectedEventBundle) { 156 this.bundle = (ReconnectedEventBundle) bundle; 157 } else { 158 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 159 } 160 List<String> l = new LinkedList<>(); 161 List<String> docIds = new LinkedList<>(); 162 String repositoryName = null; 163 for (Event event : bundle) { 164 String s = event.getName(); 165 EventContext ctx = event.getContext(); 166 if (ctx instanceof DocumentEventContext) { 167 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 168 if (source != null) { 169 s += "/" + source.getRef(); 170 docIds.add(source.getId()); 171 repositoryName = source.getRepositoryName(); 172 } 173 } 174 l.add(s); 175 } 176 title = "Listener " + listenerName + " " + l; 177 if (!docIds.isEmpty()) { 178 setDocuments(repositoryName, docIds); 179 } 180 Integer count = listener.getRetryCount(); 181 retryCount = count == null ? DEFAULT_RETRY_COUNT : count; 182 if (retryCount < 0) { 183 retryCount = DEFAULT_RETRY_COUNT; 184 } 185 } 186 187 @Override 188 public String getCategory() { 189 return listenerName; 190 } 191 192 @Override 193 public String getTitle() { 194 return title; 195 } 196 197 @Override 198 public int getRetryCount() { 199 return retryCount; 200 } 201 202 @Override 203 public void work() { 204 EventService eventService = Framework.getService(EventService.class); 205 listener = eventService.getEventListener(listenerName); 206 if (listener == null) { 207 throw new RuntimeException("Cannot find listener: " + listenerName); 208 } 209 listener.asPostCommitListener().handleEvent(bundle); 210 } 211 212 @Override 213 public void cleanUp(boolean ok, Exception e) { 214 super.cleanUp(ok, e); 215 bundle.disconnect(); 216 if (e != null && !ExceptionUtils.hasInterruptedCause(e) && !(e instanceof ConcurrentUpdateException)) { 217 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 218 } 219 if (listener != null) { 220 EventStats stats = Framework.getService(EventStats.class); 221 if (stats != null) { 222 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 223 } 224 listener = null; 225 } 226 } 227 228 @Override 229 public String toString() { 230 StringBuilder sb = new StringBuilder(); 231 sb.append(getClass().getSimpleName()); 232 sb.append('('); 233 sb.append(title); 234 sb.append(", "); 235 sb.append(getProgress()); 236 sb.append(", "); 237 sb.append(getStatus()); 238 sb.append(')'); 239 return sb.toString(); 240 } 241 } 242 243}