001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.common.utils.ExceptionUtils;
033import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.DocumentModel;
036import org.nuxeo.ecm.core.event.Event;
037import org.nuxeo.ecm.core.event.EventBundle;
038import org.nuxeo.ecm.core.event.EventContext;
039import org.nuxeo.ecm.core.event.EventService;
040import org.nuxeo.ecm.core.event.EventStats;
041import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
042import org.nuxeo.ecm.core.work.AbstractWork;
043import org.nuxeo.ecm.core.work.api.Work.State;
044import org.nuxeo.ecm.core.work.api.WorkManager;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.transaction.TransactionHelper;
047
048/**
049 * Executor of async listeners passing them to the WorkManager.
050 */
051@SuppressWarnings("PackageAccessibility")
052public class AsyncEventExecutor {
053
054    private static final Log log = LogFactory.getLog(AsyncEventExecutor.class);
055
056    public AsyncEventExecutor() {
057    }
058
059    public WorkManager getWorkManager() {
060        return Framework.getService(WorkManager.class);
061    }
062
063    public void init() {
064        WorkManager workManager = getWorkManager();
065        if (workManager != null) {
066            workManager.init();
067        }
068    }
069
070    public boolean shutdown(long timeoutMillis) throws InterruptedException {
071        WorkManager workManager = getWorkManager();
072        if (workManager == null) {
073            return true;
074        }
075        return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS);
076    }
077
078    public boolean waitForCompletion(long timeoutMillis) throws InterruptedException {
079        WorkManager workManager = getWorkManager();
080        if (workManager == null) {
081            return false;
082        }
083
084        return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS);
085    }
086
087    public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
088        // EventBundle that have gone through bus have been serialized
089        // we need to reconnect them before filtering
090        // this means we need a valid transaction !
091        if (!(bundle instanceof ReconnectedEventBundleImpl)) {
092            scheduleListeners(listeners, bundle);
093        } else {
094            final EventBundle tmpBundle = bundle;
095
096            TransactionHelper.runInTransaction(() -> {
097                EventBundle connectedBundle = new EventBundleImpl();
098                Map<String, CoreSession> sessions = new HashMap<>();
099
100                List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents();
101                for (Event event : events) {
102                    connectedBundle.push(event);
103                    CoreSession session = event.getContext().getCoreSession();
104                    if (!(sessions.keySet().contains(session.getRepositoryName()))) {
105                        sessions.put(session.getRepositoryName(), session);
106                    }
107                }
108
109                sessions.values().forEach(CoreSession::close);
110                scheduleListeners(listeners, connectedBundle);
111            });
112        }
113    }
114
115    private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
116        for (EventListenerDescriptor listener : listeners) {
117            EventBundle filtered = listener.filterBundle(bundle);
118            if (filtered.isEmpty()) {
119                continue;
120            }
121            // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit
122            // in other cases. If the transaction has been marked rollback-only, then scheduling must discard
123            // so we schedule "after commit"
124            getWorkManager().schedule(new ListenerWork(listener, filtered), true);
125        }
126    }
127
128    public int getUnfinishedCount() {
129        WorkManager workManager = getWorkManager();
130        int n = 0;
131        for (String queueId : workManager.getWorkQueueIds()) {
132            n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING);
133        }
134        return n;
135    }
136
137    public int getActiveCount() {
138        WorkManager workManager = getWorkManager();
139        int n = 0;
140        for (String queueId : workManager.getWorkQueueIds()) {
141            n += workManager.getQueueSize(queueId, State.RUNNING);
142        }
143        return n;
144    }
145
146    protected static class ListenerWork extends AbstractWork {
147
148        private static final long serialVersionUID = 1L;
149
150        private static final int DEFAULT_RETRY_COUNT = 2;
151
152        protected final String title;
153
154        protected ReconnectedEventBundle bundle;
155
156        protected String listenerName;
157
158        protected int retryCount;
159
160        protected transient EventListenerDescriptor listener;
161
162        public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) {
163            super(); // random id, for unique job
164            listenerName = listener.getName();
165            if (bundle instanceof ReconnectedEventBundle) {
166                this.bundle = (ReconnectedEventBundle) bundle;
167            } else {
168                this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName);
169            }
170            List<String> l = new LinkedList<String>();
171            List<String> docIds = new LinkedList<String>();
172            String repositoryName = null;
173            for (Event event : bundle) {
174                String s = event.getName();
175                EventContext ctx = event.getContext();
176                if (ctx instanceof DocumentEventContext) {
177                    DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument();
178                    if (source != null) {
179                        s += "/" + source.getRef();
180                        docIds.add(source.getId());
181                        repositoryName = source.getRepositoryName();
182                    }
183                }
184                l.add(s);
185            }
186            title = "Listener " + listenerName + " " + l;
187            if (!docIds.isEmpty()) {
188                setDocuments(repositoryName, docIds);
189            }
190            Integer count = listener.getRetryCount();
191            retryCount = count == null ? DEFAULT_RETRY_COUNT : count;
192            if (retryCount < 0) {
193                retryCount = DEFAULT_RETRY_COUNT;
194            }
195        }
196
197        @Override
198        public String getCategory() {
199            return listenerName;
200        }
201
202        @Override
203        public String getTitle() {
204            return title;
205        }
206
207        @Override
208        public int getRetryCount() {
209            return retryCount;
210        }
211
212        @Override
213        public void work() {
214            EventService eventService = Framework.getService(EventService.class);
215            listener = eventService.getEventListener(listenerName);
216            if (listener == null) {
217                throw new RuntimeException("Cannot find listener: " + listenerName);
218            }
219            listener.asPostCommitListener().handleEvent(bundle);
220        }
221
222        @Override
223        public void cleanUp(boolean ok, Exception e) {
224            super.cleanUp(ok, e);
225            bundle.disconnect();
226            if (e != null && !ExceptionUtils.hasInterruptedCause(e) && !(e instanceof ConcurrentUpdateException)) {
227                log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e);
228            }
229            if (listener != null) {
230                EventStats stats = Framework.getService(EventStats.class);
231                if (stats != null) {
232                    stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime());
233                }
234                listener = null;
235            }
236        }
237
238        @Override
239        public String toString() {
240            StringBuilder buf = new StringBuilder();
241            buf.append(getClass().getSimpleName());
242            buf.append('(');
243            buf.append(title);
244            buf.append(", ");
245            buf.append(getProgress());
246            buf.append(", ");
247            buf.append(getStatus());
248            buf.append(')');
249            return buf.toString();
250        }
251    }
252
253}