001/*
002 * (C) Copyright 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Thierry Delprat
019 *     Florent Guillaume
020 *     Andrei Nechaev
021 */
022package org.nuxeo.ecm.core.event.impl;
023
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.common.utils.ExceptionUtils;
033import org.nuxeo.ecm.core.api.CloseableCoreSession;
034import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
035import org.nuxeo.ecm.core.api.CoreSession;
036import org.nuxeo.ecm.core.api.DocumentModel;
037import org.nuxeo.ecm.core.event.Event;
038import org.nuxeo.ecm.core.event.EventBundle;
039import org.nuxeo.ecm.core.event.EventContext;
040import org.nuxeo.ecm.core.event.EventService;
041import org.nuxeo.ecm.core.event.EventStats;
042import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
043import org.nuxeo.ecm.core.work.AbstractWork;
044import org.nuxeo.ecm.core.work.api.Work.State;
045import org.nuxeo.ecm.core.work.api.WorkManager;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049/**
050 * Executor of async listeners passing them to the WorkManager.
051 */
052@SuppressWarnings("PackageAccessibility")
053public class AsyncEventExecutor {
054
055    private static final Log log = LogFactory.getLog(AsyncEventExecutor.class);
056
057    public AsyncEventExecutor() {
058    }
059
060    public WorkManager getWorkManager() {
061        return Framework.getService(WorkManager.class);
062    }
063
064    public void init() {
065        WorkManager workManager = getWorkManager();
066        if (workManager != null) {
067            workManager.init();
068        }
069    }
070
071    public boolean shutdown(long timeoutMillis) throws InterruptedException {
072        WorkManager workManager = getWorkManager();
073        if (workManager == null) {
074            return true;
075        }
076        return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS);
077    }
078
079    public boolean waitForCompletion(long timeoutMillis) throws InterruptedException {
080        WorkManager workManager = getWorkManager();
081        if (workManager == null) {
082            return false;
083        }
084
085        return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS);
086    }
087
088    public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
089        // EventBundle that have gone through bus have been serialized
090        // we need to reconnect them before filtering
091        // this means we need a valid transaction !
092        if (!(bundle instanceof ReconnectedEventBundleImpl)) {
093            scheduleListeners(listeners, bundle);
094        } else {
095            final EventBundle tmpBundle = bundle;
096
097            TransactionHelper.runInTransaction(() -> {
098                EventBundle connectedBundle = new EventBundleImpl();
099                Map<String, CoreSession> sessions = new HashMap<>();
100
101                List<Event> events = ((ReconnectedEventBundleImpl)tmpBundle).getReconnectedEvents();
102                for (Event event : events) {
103                    connectedBundle.push(event);
104                    CoreSession session = event.getContext().getCoreSession();
105                    if (!(sessions.keySet().contains(session.getRepositoryName()))) {
106                        sessions.put(session.getRepositoryName(), session);
107                    }
108                }
109                for (CoreSession session : sessions.values()) {
110                    ((CloseableCoreSession) session).close();
111                }
112                scheduleListeners(listeners, connectedBundle);
113            });
114        }
115    }
116
117    private void scheduleListeners(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
118        for (EventListenerDescriptor listener : listeners) {
119            EventBundle filtered = listener.filterBundle(bundle);
120            if (filtered.isEmpty()) {
121                continue;
122            }
123            // This may be called in a transaction if event.isCommitEvent() is true or at transaction commit
124            // in other cases. If the transaction has been marked rollback-only, then scheduling must discard
125            // so we schedule "after commit"
126            getWorkManager().schedule(new ListenerWork(listener, filtered), true);
127        }
128    }
129
130    public int getUnfinishedCount() {
131        WorkManager workManager = getWorkManager();
132        int n = 0;
133        for (String queueId : workManager.getWorkQueueIds()) {
134            n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING);
135        }
136        return n;
137    }
138
139    public int getActiveCount() {
140        WorkManager workManager = getWorkManager();
141        int n = 0;
142        for (String queueId : workManager.getWorkQueueIds()) {
143            n += workManager.getQueueSize(queueId, State.RUNNING);
144        }
145        return n;
146    }
147
148    protected static class ListenerWork extends AbstractWork {
149
150        private static final long serialVersionUID = 1L;
151
152        private static final int DEFAULT_RETRY_COUNT = 2;
153
154        protected final String title;
155
156        protected ReconnectedEventBundle bundle;
157
158        protected String listenerName;
159
160        protected int retryCount;
161
162        protected transient EventListenerDescriptor listener;
163
164        public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) {
165            super(); // random id, for unique job
166            listenerName = listener.getName();
167            if (bundle instanceof ReconnectedEventBundle) {
168                this.bundle = (ReconnectedEventBundle) bundle;
169            } else {
170                this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName);
171            }
172            List<String> l = new LinkedList<String>();
173            List<String> docIds = new LinkedList<String>();
174            String repositoryName = null;
175            for (Event event : bundle) {
176                String s = event.getName();
177                EventContext ctx = event.getContext();
178                if (ctx instanceof DocumentEventContext) {
179                    DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument();
180                    if (source != null) {
181                        s += "/" + source.getRef();
182                        docIds.add(source.getId());
183                        repositoryName = source.getRepositoryName();
184                    }
185                }
186                l.add(s);
187            }
188            title = "Listener " + listenerName + " " + l;
189            if (!docIds.isEmpty()) {
190                setDocuments(repositoryName, docIds);
191            }
192            Integer count = listener.getRetryCount();
193            retryCount = count == null ? DEFAULT_RETRY_COUNT : count;
194            if (retryCount < 0) {
195                retryCount = DEFAULT_RETRY_COUNT;
196            }
197        }
198
199        @Override
200        public String getCategory() {
201            return listenerName;
202        }
203
204        @Override
205        public String getTitle() {
206            return title;
207        }
208
209        @Override
210        public int getRetryCount() {
211            return retryCount;
212        }
213
214        @Override
215        public void work() {
216            EventService eventService = Framework.getService(EventService.class);
217            listener = eventService.getEventListener(listenerName);
218            if (listener == null) {
219                throw new RuntimeException("Cannot find listener: " + listenerName);
220            }
221            listener.asPostCommitListener().handleEvent(bundle);
222        }
223
224        @Override
225        public void cleanUp(boolean ok, Exception e) {
226            super.cleanUp(ok, e);
227            bundle.disconnect();
228            if (e != null && !ExceptionUtils.hasInterruptedCause(e) && !(e instanceof ConcurrentUpdateException)) {
229                log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e);
230            }
231            if (listener != null) {
232                EventStats stats = Framework.getService(EventStats.class);
233                if (stats != null) {
234                    stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime());
235                }
236                listener = null;
237            }
238        }
239
240        @Override
241        public String toString() {
242            StringBuilder buf = new StringBuilder();
243            buf.append(getClass().getSimpleName());
244            buf.append('(');
245            buf.append(title);
246            buf.append(", ");
247            buf.append(getProgress());
248            buf.append(", ");
249            buf.append(getStatus());
250            buf.append(')');
251            return buf.toString();
252        }
253    }
254
255}