001/* 002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Thierry Delprat 019 * Florent Guillaume 020 */ 021package org.nuxeo.ecm.core.event.impl; 022 023import java.util.LinkedList; 024import java.util.List; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 030import org.nuxeo.ecm.core.api.DocumentModel; 031import org.nuxeo.ecm.core.event.Event; 032import org.nuxeo.ecm.core.event.EventBundle; 033import org.nuxeo.ecm.core.event.EventContext; 034import org.nuxeo.ecm.core.event.EventService; 035import org.nuxeo.ecm.core.event.EventStats; 036import org.nuxeo.ecm.core.event.ReconnectedEventBundle; 037import org.nuxeo.ecm.core.work.AbstractWork; 038import org.nuxeo.ecm.core.work.api.Work.State; 039import org.nuxeo.ecm.core.work.api.WorkManager; 040import org.nuxeo.runtime.api.Framework; 041 042/** 043 * Executor of async listeners passing them to the WorkManager. 044 */ 045public class AsyncEventExecutor { 046 047 private static final Log log = LogFactory.getLog(AsyncEventExecutor.class); 048 049 public AsyncEventExecutor() { 050 } 051 052 public WorkManager getWorkManager() { 053 return Framework.getLocalService(WorkManager.class); 054 } 055 056 public void init() { 057 WorkManager workManager = getWorkManager(); 058 if (workManager != null) { 059 workManager.init(); 060 } 061 } 062 063 public boolean shutdown(long timeoutMillis) throws InterruptedException { 064 WorkManager workManager = getWorkManager(); 065 if (workManager == null) { 066 return true; 067 } 068 return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS); 069 } 070 071 public boolean waitForCompletion(long timeoutMillis) throws InterruptedException { 072 WorkManager workManager = getWorkManager(); 073 return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS); 074 } 075 076 public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) { 077 for (EventListenerDescriptor listener : listeners) { 078 EventBundle filtered = listener.filterBundle(bundle); 079 if (filtered.isEmpty()) { 080 continue; 081 } 082 getWorkManager().schedule(new ListenerWork(listener, filtered)); 083 } 084 } 085 086 public int getUnfinishedCount() { 087 WorkManager workManager = getWorkManager(); 088 int n = 0; 089 for (String queueId : workManager.getWorkQueueIds()) { 090 n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING); 091 } 092 return n; 093 } 094 095 public int getActiveCount() { 096 WorkManager workManager = getWorkManager(); 097 int n = 0; 098 for (String queueId : workManager.getWorkQueueIds()) { 099 n += workManager.getQueueSize(queueId, State.RUNNING); 100 } 101 return n; 102 } 103 104 protected static class ListenerWork extends AbstractWork { 105 106 private static final long serialVersionUID = 1L; 107 108 private static final int DEFAULT_RETRY_COUNT = 2; 109 110 protected final String title; 111 112 protected ReconnectedEventBundle bundle; 113 114 protected String listenerName; 115 116 protected int retryCount; 117 118 protected transient EventListenerDescriptor listener; 119 120 public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) { 121 super(); // random id, for unique job 122 listenerName = listener.getName(); 123 if (bundle instanceof ReconnectedEventBundle) { 124 this.bundle = (ReconnectedEventBundle) bundle; 125 } else { 126 this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName); 127 } 128 List<String> l = new LinkedList<String>(); 129 List<String> docIds = new LinkedList<String>(); 130 String repositoryName = null; 131 for (Event event : bundle) { 132 String s = event.getName(); 133 EventContext ctx = event.getContext(); 134 if (ctx instanceof DocumentEventContext) { 135 DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument(); 136 if (source != null) { 137 s += "/" + source.getRef(); 138 docIds.add(source.getId()); 139 repositoryName = source.getRepositoryName(); 140 } 141 } 142 l.add(s); 143 } 144 title = "Listener " + listenerName + " " + l; 145 if (!docIds.isEmpty()) { 146 setDocuments(repositoryName, docIds); 147 } 148 Integer count = listener.getRetryCount(); 149 retryCount = count == null ? DEFAULT_RETRY_COUNT : count.intValue(); 150 if (retryCount < 0) { 151 retryCount = DEFAULT_RETRY_COUNT; 152 } 153 } 154 155 @Override 156 public String getCategory() { 157 return listenerName; 158 } 159 160 @Override 161 public String getTitle() { 162 return title; 163 } 164 165 @Override 166 public int getRetryCount() { 167 return retryCount; 168 } 169 170 @Override 171 public void work() { 172 EventService eventService = Framework.getLocalService(EventService.class); 173 listener = eventService.getEventListener(listenerName); 174 if (listener == null) { 175 throw new RuntimeException("Cannot find listener: " + listenerName); 176 } 177 listener.asPostCommitListener().handleEvent(bundle); 178 } 179 180 @Override 181 public void cleanUp(boolean ok, Exception e) { 182 super.cleanUp(ok, e); 183 bundle.disconnect(); 184 if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) { 185 log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e); 186 } 187 if (listener != null) { 188 EventStats stats = Framework.getLocalService(EventStats.class); 189 if (stats != null) { 190 stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime()); 191 } 192 listener = null; 193 } 194 } 195 196 @Override 197 public String toString() { 198 StringBuilder buf = new StringBuilder(); 199 buf.append(getClass().getSimpleName()); 200 buf.append('('); 201 buf.append(title); 202 buf.append(", "); 203 buf.append(getProgress()); 204 buf.append(", "); 205 buf.append(getStatus()); 206 buf.append(')'); 207 return buf.toString(); 208 } 209 } 210 211}