001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.util.LinkedList;
025import java.util.List;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.common.utils.ExceptionUtils;
031import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
032import org.nuxeo.ecm.core.api.DocumentModel;
033import org.nuxeo.ecm.core.event.Event;
034import org.nuxeo.ecm.core.event.EventBundle;
035import org.nuxeo.ecm.core.event.EventContext;
036import org.nuxeo.ecm.core.event.EventService;
037import org.nuxeo.ecm.core.event.EventStats;
038import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
039import org.nuxeo.ecm.core.work.AbstractWork;
040import org.nuxeo.ecm.core.work.api.Work.State;
041import org.nuxeo.ecm.core.work.api.WorkManager;
042import org.nuxeo.runtime.api.Framework;
043import org.nuxeo.runtime.transaction.TransactionHelper;
044
045/**
046 * Executor of async listeners passing them to the WorkManager.
047 */
048@SuppressWarnings("PackageAccessibility")
049public class AsyncEventExecutor {
050
051    private static final Log log = LogFactory.getLog(AsyncEventExecutor.class);
052
053    public AsyncEventExecutor() {
054    }
055
056    public WorkManager getWorkManager() {
057        return Framework.getService(WorkManager.class);
058    }
059
060    public void init() {
061        WorkManager workManager = getWorkManager();
062        if (workManager != null) {
063            workManager.init();
064        }
065    }
066
067    public boolean shutdown(long timeoutMillis) throws InterruptedException {
068        WorkManager workManager = getWorkManager();
069        if (workManager == null) {
070            return true;
071        }
072        return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS);
073    }
074
075    public boolean waitForCompletion(long timeoutMillis) throws InterruptedException {
076        WorkManager workManager = getWorkManager();
077        if (workManager == null) {
078            return false;
079        }
080
081        return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS);
082    }
083
084    public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
085        // EventBundle that have gone through bus have been serialized
086        // we need to reconnect them before filtering
087        // this means we need a valid transaction !
088        if (!(bundle instanceof ReconnectedEventBundleImpl)) {
089            scheduleListeners(listeners, bundle);
090        } else {
091            final EventBundle tmpBundle = bundle;
092
093            TransactionHelper.runInTransaction(() -> {
094                EventBundle connectedBundle = new EventBundleImpl();
095
096                List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents();
097                for (Event event : events) {
098                    connectedBundle.push(event);
099                }
100                scheduleListeners(listeners, connectedBundle);
101            });
102        }
103    }
104
105    private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
106        for (EventListenerDescriptor listener : listeners) {
107            EventBundle filtered = listener.filterBundle(bundle);
108            if (filtered.isEmpty()) {
109                continue;
110            }
111            // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit
112            // in other cases. If the transaction has been marked rollback-only, then scheduling must discard
113            // so we schedule "after commit"
114            getWorkManager().schedule(new ListenerWork(listener, filtered), true);
115        }
116    }
117
118    public int getUnfinishedCount() {
119        WorkManager workManager = getWorkManager();
120        int n = 0;
121        for (String queueId : workManager.getWorkQueueIds()) {
122            n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING);
123        }
124        return n;
125    }
126
127    public int getActiveCount() {
128        WorkManager workManager = getWorkManager();
129        int n = 0;
130        for (String queueId : workManager.getWorkQueueIds()) {
131            n += workManager.getQueueSize(queueId, State.RUNNING);
132        }
133        return n;
134    }
135
136    protected static class ListenerWork extends AbstractWork {
137
138        private static final long serialVersionUID = 1L;
139
140        private static final int DEFAULT_RETRY_COUNT = 2;
141
142        protected final String title;
143
144        protected ReconnectedEventBundle bundle;
145
146        protected String listenerName;
147
148        protected int retryCount;
149
150        protected transient EventListenerDescriptor listener;
151
152        public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) {
153            super(); // random id, for unique job
154            listenerName = listener.getName();
155            if (bundle instanceof ReconnectedEventBundle) {
156                this.bundle = (ReconnectedEventBundle) bundle;
157            } else {
158                this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName);
159            }
160            List<String> l = new LinkedList<>();
161            List<String> docIds = new LinkedList<>();
162            String repositoryName = null;
163            for (Event event : bundle) {
164                String s = event.getName();
165                EventContext ctx = event.getContext();
166                if (ctx instanceof DocumentEventContext) {
167                    DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument();
168                    if (source != null) {
169                        s += "/" + source.getRef();
170                        docIds.add(source.getId());
171                        repositoryName = source.getRepositoryName();
172                    }
173                }
174                l.add(s);
175            }
176            title = "Listener " + listenerName + " " + l;
177            if (!docIds.isEmpty()) {
178                setDocuments(repositoryName, docIds);
179            }
180            Integer count = listener.getRetryCount();
181            retryCount = count == null ? DEFAULT_RETRY_COUNT : count;
182            if (retryCount < 0) {
183                retryCount = DEFAULT_RETRY_COUNT;
184            }
185        }
186
187        @Override
188        public String getCategory() {
189            return listenerName;
190        }
191
192        @Override
193        public String getTitle() {
194            return title;
195        }
196
197        @Override
198        public int getRetryCount() {
199            return retryCount;
200        }
201
202        @Override
203        public void work() {
204            EventService eventService = Framework.getService(EventService.class);
205            listener = eventService.getEventListener(listenerName);
206            if (listener == null) {
207                throw new RuntimeException("Cannot find listener: " + listenerName);
208            }
209            listener.asPostCommitListener().handleEvent(bundle);
210        }
211
212        @Override
213        public void cleanUp(boolean ok, Exception e) {
214            super.cleanUp(ok, e);
215            bundle.disconnect();
216            if (e != null && !ExceptionUtils.hasInterruptedCause(e) && !(e instanceof ConcurrentUpdateException)) {
217                log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e);
218            }
219            if (listener != null) {
220                EventStats stats = Framework.getService(EventStats.class);
221                if (stats != null) {
222                    stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime());
223                }
224                listener = null;
225            }
226        }
227
228        @Override
229        public String toString() {
230            StringBuilder sb = new StringBuilder();
231            sb.append(getClass().getSimpleName());
232            sb.append('(');
233            sb.append(title);
234            sb.append(", ");
235            sb.append(getProgress());
236            sb.append(", ");
237            sb.append(getStatus());
238            sb.append(')');
239            return sb.toString();
240        }
241    }
242
243}