001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 */ 021package org.nuxeo.ecm.core.event.impl; 022 023import java.util.LinkedList; 024import java.util.List; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.event.Event; 032import org.nuxeo.ecm.core.event.EventBundle; 033import org.nuxeo.ecm.core.event.EventContext; 034import org.nuxeo.ecm.core.event.EventService; 035import org.nuxeo.ecm.core.event.EventStats; 036import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 037import org.nuxeo.ecm.core.work.AbstractWork; 038import org.nuxeo.ecm.core.work.api.Work.State; 039import org.nuxeo.ecm.core.work.api.WorkManager; 040import org.nuxeo.runtime.api.Framework; 041 042/** 043 * Executor of async listeners passing them to the WorkManager. 044 */ 045public class AsyncEventExecutor { 046 047 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 048 049 public AsyncEventExecutor() { 050 } 051 052 public WorkManager getWorkManager() { 053 return Framework.getLocalService(WorkManager.class); 054 } 055 056 public void init() { 057 WorkManager workManager = getWorkManager(); 058 if (workManager != null) { 059 workManager.init(); 060 } 061 } 062 063 public boolean shutdown(long timeoutMillis) throws InterruptedException { 064 WorkManager workManager = getWorkManager(); 065 if (workManager == null) { 066 return true; 067 } 068 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 069 } 070 071 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 072 WorkManager workManager = getWorkManager(); 073 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 074 } 075 076 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 077 for (EventListenerDescriptor listener : listeners) { 078 EventBundle filtered = listener.filterBundle(bundle); 079 if (filtered.isEmpty()) { 080 continue; 081 } 082 // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit 083 // in other cases. If the transaction has been marked rollback-only, then scheduling must discard 084 // so we schedule "after commit" 085 getWorkManager().schedule(new ListenerWork(listener, filtered), true); 086 } 087 } 088 089 public int getUnfinishedCount() { 090 WorkManager workManager = getWorkManager(); 091 int n = 0; 092 for (String queueId : workManager.getWorkQueueIds()) { 093 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 094 } 095 return n; 096 } 097 098 public int getActiveCount() { 099 WorkManager workManager = getWorkManager(); 100 int n = 0; 101 for (String queueId : workManager.getWorkQueueIds()) { 102 n += workManager.getQueueSize(queueId, State.RUNNING); 103 } 104 return n; 105 } 106 107 protected static class ListenerWork extends AbstractWork { 108 109 private static final long serialVersionUID = 1L; 110 111 private static final int DEFAULT_RETRY_COUNT = 2; 112 113 protected final String title; 114 115 protected ReconnectedEventBundle bundle; 116 117 protected String listenerName; 118 119 protected int retryCount; 120 121 protected transient EventListenerDescriptor listener; 122 123 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 124 super(); // random id, for unique job 125 listenerName = listener.getName(); 126 if (bundle instanceof ReconnectedEventBundle) { 127 this.bundle = (ReconnectedEventBundle) bundle; 128 } else { 129 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 130 } 131 List<String> l = new LinkedList<String>(); 132 List<String> docIds = new LinkedList<String>(); 133 String repositoryName = null; 134 for (Event event : bundle) { 135 String s = event.getName(); 136 EventContext ctx = event.getContext(); 137 if (ctx instanceof DocumentEventContext) { 138 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 139 if (source != null) { 140 s += "/" + source.getRef(); 141 docIds.add(source.getId()); 142 repositoryName = source.getRepositoryName(); 143 } 144 } 145 l.add(s); 146 } 147 title = "Listener " + listenerName + " " + l; 148 if (!docIds.isEmpty()) { 149 setDocuments(repositoryName, docIds); 150 } 151 Integer count = listener.getRetryCount(); 152 retryCount = count == null ? DEFAULT_RETRY_COUNT : count.intValue(); 153 if (retryCount < 0) { 154 retryCount = DEFAULT_RETRY_COUNT; 155 } 156 } 157 158 @Override 159 public String getCategory() { 160 return listenerName; 161 } 162 163 @Override 164 public String getTitle() { 165 return title; 166 } 167 168 @Override 169 public int getRetryCount() { 170 return retryCount; 171 } 172 173 @Override 174 public void work() { 175 EventService eventService = Framework.getLocalService(EventService.class); 176 listener = eventService.getEventListener(listenerName); 177 if (listener == null) { 178 throw new RuntimeException("Cannot find listener: " + listenerName); 179 } 180 listener.asPostCommitListener().handleEvent(bundle); 181 } 182 183 @Override 184 public void cleanUp(boolean ok, Exception e) { 185 super.cleanUp(ok, e); 186 bundle.disconnect(); 187 if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) { 188 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 189 } 190 if (listener != null) { 191 EventStats stats = Framework.getLocalService(EventStats.class); 192 if (stats != null) { 193 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 194 } 195 listener = null; 196 } 197 } 198 199 @Override 200 public String toString() { 201 StringBuilder buf = new StringBuilder(); 202 buf.append(getClass().getSimpleName()); 203 buf.append('('); 204 buf.append(title); 205 buf.append(", "); 206 buf.append(getProgress()); 207 buf.append(", "); 208 buf.append(getStatus()); 209 buf.append(')'); 210 return buf.toString(); 211 } 212 } 213 214}