001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
027import org.nuxeo.ecm.core.api.CoreSession;
028import org.nuxeo.ecm.core.api.DocumentModel;
029import org.nuxeo.ecm.core.event.Event;
030import org.nuxeo.ecm.core.event.EventBundle;
031import org.nuxeo.ecm.core.event.EventContext;
032import org.nuxeo.ecm.core.event.EventService;
033import org.nuxeo.ecm.core.event.EventStats;
034import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
035import org.nuxeo.ecm.core.work.AbstractWork;
036import org.nuxeo.ecm.core.work.api.Work.State;
037import org.nuxeo.ecm.core.work.api.WorkManager;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.transaction.TransactionHelper;
040
041import java.util.HashMap;
042import java.util.LinkedList;
043import java.util.List;
044import java.util.Map;
045import java.util.concurrent.TimeUnit;
046
047/**
048 * Executor of async listeners passing them to the WorkManager.
049 */
050@SuppressWarnings("PackageAccessibility")
051public class AsyncEventExecutor {
052
053    private static final Log log = LogFactory.getLog(AsyncEventExecutor.class);
054
055    public AsyncEventExecutor() {
056    }
057
058    public WorkManager getWorkManager() {
059        return Framework.getLocalService(WorkManager.class);
060    }
061
062    public void init() {
063        WorkManager workManager = getWorkManager();
064        if (workManager != null) {
065            workManager.init();
066        }
067    }
068
069    public boolean shutdown(long timeoutMillis) throws InterruptedException {
070        WorkManager workManager = getWorkManager();
071        if (workManager == null) {
072            return true;
073        }
074        return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS);
075    }
076
077    public boolean waitForCompletion(long timeoutMillis) throws InterruptedException {
078        WorkManager workManager = getWorkManager();
079        if (workManager == null) {
080            return false;
081        }
082
083        return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS);
084    }
085
086    public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
087        // EventBundle that have gone through bus have been serialized
088        // we need to reconnect them before filtering
089        // this means we need a valid transaction !
090        if (!(bundle instanceof ReconnectedEventBundleImpl)) {
091            scheduleListeners(listeners, bundle);
092        } else {
093            final EventBundle tmpBundle = bundle;
094
095            TransactionHelper.runInTransaction(() -> {
096                EventBundle connectedBundle = new EventBundleImpl();
097                Map<String, CoreSession> sessions = new HashMap<>();
098
099                List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents();
100                for (Event event : events) {
101                    connectedBundle.push(event);
102                    CoreSession session = event.getContext().getCoreSession();
103                    if (!(sessions.keySet().contains(session.getRepositoryName()))) {
104                        sessions.put(session.getRepositoryName(), session);
105                    }
106                }
107
108                sessions.values().forEach(CoreSession::close);
109                scheduleListeners(listeners, connectedBundle);
110            });
111        }
112    }
113
114    private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
115        for (EventListenerDescriptor listener : listeners) {
116            EventBundle filtered = listener.filterBundle(bundle);
117            if (filtered.isEmpty()) {
118                continue;
119            }
120            // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit
121            // in other cases. If the transaction has been marked rollback-only, then scheduling must discard
122            // so we schedule "after commit"
123            getWorkManager().schedule(new ListenerWork(listener, filtered), true);
124        }
125    }
126
127    public int getUnfinishedCount() {
128        WorkManager workManager = getWorkManager();
129        int n = 0;
130        for (String queueId : workManager.getWorkQueueIds()) {
131            n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING);
132        }
133        return n;
134    }
135
136    public int getActiveCount() {
137        WorkManager workManager = getWorkManager();
138        int n = 0;
139        for (String queueId : workManager.getWorkQueueIds()) {
140            n += workManager.getQueueSize(queueId, State.RUNNING);
141        }
142        return n;
143    }
144
145    protected static class ListenerWork extends AbstractWork {
146
147        private static final long serialVersionUID = 1L;
148
149        private static final int DEFAULT_RETRY_COUNT = 2;
150
151        protected final String title;
152
153        protected ReconnectedEventBundle bundle;
154
155        protected String listenerName;
156
157        protected int retryCount;
158
159        protected transient EventListenerDescriptor listener;
160
161        public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) {
162            super(); // random id, for unique job
163            listenerName = listener.getName();
164            if (bundle instanceof ReconnectedEventBundle) {
165                this.bundle = (ReconnectedEventBundle) bundle;
166            } else {
167                this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName);
168            }
169            List<String> l = new LinkedList<String>();
170            List<String> docIds = new LinkedList<String>();
171            String repositoryName = null;
172            for (Event event : bundle) {
173                String s = event.getName();
174                EventContext ctx = event.getContext();
175                if (ctx instanceof DocumentEventContext) {
176                    DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument();
177                    if (source != null) {
178                        s += "/" + source.getRef();
179                        docIds.add(source.getId());
180                        repositoryName = source.getRepositoryName();
181                    }
182                }
183                l.add(s);
184            }
185            title = "Listener " + listenerName + " " + l;
186            if (!docIds.isEmpty()) {
187                setDocuments(repositoryName, docIds);
188            }
189            Integer count = listener.getRetryCount();
190            retryCount = count == null ? DEFAULT_RETRY_COUNT : count;
191            if (retryCount < 0) {
192                retryCount = DEFAULT_RETRY_COUNT;
193            }
194        }
195
196        @Override
197        public String getCategory() {
198            return listenerName;
199        }
200
201        @Override
202        public String getTitle() {
203            return title;
204        }
205
206        @Override
207        public int getRetryCount() {
208            return retryCount;
209        }
210
211        @Override
212        public void work() {
213            EventService eventService = Framework.getLocalService(EventService.class);
214            listener = eventService.getEventListener(listenerName);
215            if (listener == null) {
216                throw new RuntimeException("Cannot find listener: " + listenerName);
217            }
218            listener.asPostCommitListener().handleEvent(bundle);
219        }
220
221        @Override
222        public void cleanUp(boolean ok, Exception e) {
223            super.cleanUp(ok, e);
224            bundle.disconnect();
225            if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) {
226                log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e);
227            }
228            if (listener != null) {
229                EventStats stats = Framework.getLocalService(EventStats.class);
230                if (stats != null) {
231                    stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime());
232                }
233                listener = null;
234            }
235        }
236
237        @Override
238        public String toString() {
239            StringBuilder buf = new StringBuilder();
240            buf.append(getClass().getSimpleName());
241            buf.append('(');
242            buf.append(title);
243            buf.append(", ");
244            buf.append(getProgress());
245            buf.append(", ");
246            buf.append(getStatus());
247            buf.append(')');
248            return buf.toString();
249        }
250    }
251
252}