001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 */
021package org.nuxeo.ecm.core.event.impl;
022
023import java.util.LinkedList;
024import java.util.List;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
030import org.nuxeo.ecm.core.api.DocumentModel;
031import org.nuxeo.ecm.core.event.Event;
032import org.nuxeo.ecm.core.event.EventBundle;
033import org.nuxeo.ecm.core.event.EventContext;
034import org.nuxeo.ecm.core.event.EventService;
035import org.nuxeo.ecm.core.event.EventStats;
036import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
037import org.nuxeo.ecm.core.work.AbstractWork;
038import org.nuxeo.ecm.core.work.api.Work.State;
039import org.nuxeo.ecm.core.work.api.WorkManager;
040import org.nuxeo.runtime.api.Framework;
041
042/**
043 * Executor of async listeners passing them to the WorkManager.
044 */
045public class AsyncEventExecutor {
046
047    private static final Log log = LogFactory.getLog(AsyncEventExecutor.class);
048
049    public AsyncEventExecutor() {
050    }
051
052    public WorkManager getWorkManager() {
053        return Framework.getLocalService(WorkManager.class);
054    }
055
056    public void init() {
057        WorkManager workManager = getWorkManager();
058        if (workManager != null) {
059            workManager.init();
060        }
061    }
062
063    public boolean shutdown(long timeoutMillis) throws InterruptedException {
064        WorkManager workManager = getWorkManager();
065        if (workManager == null) {
066            return true;
067        }
068        return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS);
069    }
070
071    public boolean waitForCompletion(long timeoutMillis) throws InterruptedException {
072        WorkManager workManager = getWorkManager();
073        return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS);
074    }
075
076    public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
077        for (EventListenerDescriptor listener : listeners) {
078            EventBundle filtered = listener.filterBundle(bundle);
079            if (filtered.isEmpty()) {
080                continue;
081            }
082            // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit
083            // in other cases. If the transaction has been marked rollback-only, then scheduling must discard
084            // so we schedule "after commit"
085            getWorkManager().schedule(new ListenerWork(listener, filtered), true);
086        }
087    }
088
089    public int getUnfinishedCount() {
090        WorkManager workManager = getWorkManager();
091        int n = 0;
092        for (String queueId : workManager.getWorkQueueIds()) {
093            n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING);
094        }
095        return n;
096    }
097
098    public int getActiveCount() {
099        WorkManager workManager = getWorkManager();
100        int n = 0;
101        for (String queueId : workManager.getWorkQueueIds()) {
102            n += workManager.getQueueSize(queueId, State.RUNNING);
103        }
104        return n;
105    }
106
107    protected static class ListenerWork extends AbstractWork {
108
109        private static final long serialVersionUID = 1L;
110
111        private static final int DEFAULT_RETRY_COUNT = 2;
112
113        protected final String title;
114
115        protected ReconnectedEventBundle bundle;
116
117        protected String listenerName;
118
119        protected int retryCount;
120
121        protected transient EventListenerDescriptor listener;
122
123        public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) {
124            super(); // random id, for unique job
125            listenerName = listener.getName();
126            if (bundle instanceof ReconnectedEventBundle) {
127                this.bundle = (ReconnectedEventBundle) bundle;
128            } else {
129                this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName);
130            }
131            List<String> l = new LinkedList<String>();
132            List<String> docIds = new LinkedList<String>();
133            String repositoryName = null;
134            for (Event event : bundle) {
135                String s = event.getName();
136                EventContext ctx = event.getContext();
137                if (ctx instanceof DocumentEventContext) {
138                    DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument();
139                    if (source != null) {
140                        s += "/" + source.getRef();
141                        docIds.add(source.getId());
142                        repositoryName = source.getRepositoryName();
143                    }
144                }
145                l.add(s);
146            }
147            title = "Listener " + listenerName + " " + l;
148            if (!docIds.isEmpty()) {
149                setDocuments(repositoryName, docIds);
150            }
151            Integer count = listener.getRetryCount();
152            retryCount = count == null ? DEFAULT_RETRY_COUNT : count.intValue();
153            if (retryCount < 0) {
154                retryCount = DEFAULT_RETRY_COUNT;
155            }
156        }
157
158        @Override
159        public String getCategory() {
160            return listenerName;
161        }
162
163        @Override
164        public String getTitle() {
165            return title;
166        }
167
168        @Override
169        public int getRetryCount() {
170            return retryCount;
171        }
172
173        @Override
174        public void work() {
175            EventService eventService = Framework.getLocalService(EventService.class);
176            listener = eventService.getEventListener(listenerName);
177            if (listener == null) {
178                throw new RuntimeException("Cannot find listener: " + listenerName);
179            }
180            listener.asPostCommitListener().handleEvent(bundle);
181        }
182
183        @Override
184        public void cleanUp(boolean ok, Exception e) {
185            super.cleanUp(ok, e);
186            bundle.disconnect();
187            if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) {
188                log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e);
189            }
190            if (listener != null) {
191                EventStats stats = Framework.getLocalService(EventStats.class);
192                if (stats != null) {
193                    stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime());
194                }
195                listener = null;
196            }
197        }
198
199        @Override
200        public String toString() {
201            StringBuilder buf = new StringBuilder();
202            buf.append(getClass().getSimpleName());
203            buf.append('(');
204            buf.append(title);
205            buf.append(", ");
206            buf.append(getProgress());
207            buf.append(", ");
208            buf.append(getStatus());
209            buf.append(')');
210            return buf.toString();
211        }
212    }
213
214}