001/*
002 * Copyright (c) 2006-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Bogdan Stefanescu
011 *     Thierry Delprat
012 *     Florent Guillaume
013 */
014package org.nuxeo.ecm.core.event.impl;
015
016import java.util.LinkedList;
017import java.util.List;
018import java.util.concurrent.TimeUnit;
019
020import org.apache.commons.logging.Log;
021import org.apache.commons.logging.LogFactory;
022import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
023import org.nuxeo.ecm.core.api.DocumentModel;
024import org.nuxeo.ecm.core.event.Event;
025import org.nuxeo.ecm.core.event.EventBundle;
026import org.nuxeo.ecm.core.event.EventContext;
027import org.nuxeo.ecm.core.event.EventService;
028import org.nuxeo.ecm.core.event.EventStats;
029import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
030import org.nuxeo.ecm.core.work.AbstractWork;
031import org.nuxeo.ecm.core.work.api.Work.State;
032import org.nuxeo.ecm.core.work.api.WorkManager;
033import org.nuxeo.runtime.api.Framework;
034
035/**
036 * Executor of async listeners passing them to the WorkManager.
037 */
038public class AsyncEventExecutor {
039
040    private static final Log log = LogFactory.getLog(AsyncEventExecutor.class);
041
042    public AsyncEventExecutor() {
043    }
044
045    public WorkManager getWorkManager() {
046        return Framework.getLocalService(WorkManager.class);
047    }
048
049    public void init() {
050        WorkManager workManager = getWorkManager();
051        if (workManager != null) {
052            workManager.init();
053        }
054    }
055
056    public boolean shutdown(long timeoutMillis) throws InterruptedException {
057        WorkManager workManager = getWorkManager();
058        if (workManager == null) {
059            return true;
060        }
061        return workManager.shutdown(timeoutMillis, TimeUnit.MILLISECONDS);
062    }
063
064    public boolean waitForCompletion(long timeoutMillis) throws InterruptedException {
065        WorkManager workManager = getWorkManager();
066        return workManager.awaitCompletion(timeoutMillis, TimeUnit.MILLISECONDS);
067    }
068
069    public void run(final List<EventListenerDescriptor> listeners, EventBundle bundle) {
070        for (EventListenerDescriptor listener : listeners) {
071            EventBundle filtered = listener.filterBundle(bundle);
072            if (filtered.isEmpty()) {
073                continue;
074            }
075            getWorkManager().schedule(new ListenerWork(listener, filtered));
076        }
077    }
078
079    public int getUnfinishedCount() {
080        WorkManager workManager = getWorkManager();
081        int n = 0;
082        for (String queueId : workManager.getWorkQueueIds()) {
083            n += workManager.getQueueSize(queueId, State.SCHEDULED) + workManager.getQueueSize(queueId, State.RUNNING);
084        }
085        return n;
086    }
087
088    public int getActiveCount() {
089        WorkManager workManager = getWorkManager();
090        int n = 0;
091        for (String queueId : workManager.getWorkQueueIds()) {
092            n += workManager.getQueueSize(queueId, State.RUNNING);
093        }
094        return n;
095    }
096
097    protected static class ListenerWork extends AbstractWork {
098
099        private static final long serialVersionUID = 1L;
100
101        private static final int DEFAULT_RETRY_COUNT = 2;
102
103        protected final String title;
104
105        protected ReconnectedEventBundle bundle;
106
107        protected String listenerName;
108
109        protected int retryCount;
110
111        protected transient EventListenerDescriptor listener;
112
113        public ListenerWork(EventListenerDescriptor listener, EventBundle bundle) {
114            super(); // random id, for unique job
115            listenerName = listener.getName();
116            if (bundle instanceof ReconnectedEventBundle) {
117                this.bundle = (ReconnectedEventBundle) bundle;
118            } else {
119                this.bundle = new ReconnectedEventBundleImpl(bundle, listenerName);
120            }
121            List<String> l = new LinkedList<String>();
122            List<String> docIds = new LinkedList<String>();
123            String repositoryName = null;
124            for (Event event : bundle) {
125                String s = event.getName();
126                EventContext ctx = event.getContext();
127                if (ctx instanceof DocumentEventContext) {
128                    DocumentModel source = ((DocumentEventContext) ctx).getSourceDocument();
129                    if (source != null) {
130                        s += "/" + source.getRef();
131                        docIds.add(source.getId());
132                        repositoryName = source.getRepositoryName();
133                    }
134                }
135                l.add(s);
136            }
137            title = "Listener " + listenerName + " " + l;
138            if (!docIds.isEmpty()) {
139                setDocuments(repositoryName, docIds);
140            }
141            Integer count = listener.getRetryCount();
142            retryCount = count == null ? DEFAULT_RETRY_COUNT : count.intValue();
143            if (retryCount < 0) {
144                retryCount = DEFAULT_RETRY_COUNT;
145            }
146        }
147
148        @Override
149        public String getCategory() {
150            return listenerName;
151        }
152
153        @Override
154        public String getTitle() {
155            return title;
156        }
157
158        @Override
159        public int getRetryCount() {
160            return retryCount;
161        }
162
163        @Override
164        public void work() {
165            EventService eventService = Framework.getLocalService(EventService.class);
166            listener = eventService.getEventListener(listenerName);
167            if (listener == null) {
168                throw new RuntimeException("Cannot find listener: " + listenerName);
169            }
170            listener.asPostCommitListener().handleEvent(bundle);
171        }
172
173        @Override
174        public void cleanUp(boolean ok, Exception e) {
175            super.cleanUp(ok, e);
176            bundle.disconnect();
177            if (e != null && !(e instanceof InterruptedException) && !(e instanceof ConcurrentUpdateException)) {
178                log.error("Failed to execute async event " + bundle.getName() + " on listener " + listenerName, e);
179            }
180            if (listener != null) {
181                EventStats stats = Framework.getLocalService(EventStats.class);
182                if (stats != null) {
183                    stats.logAsyncExec(listener, System.currentTimeMillis() - getStartTime());
184                }
185                listener = null;
186            }
187        }
188
189        @Override
190        public String toString() {
191            StringBuilder buf = new StringBuilder();
192            buf.append(getClass().getSimpleName());
193            buf.append('(');
194            buf.append(title);
195            buf.append(", ");
196            buf.append(getProgress());
197            buf.append(", ");
198            buf.append(getStatus());
199            buf.append(')');
200            return buf.toString();
201        }
202    }
203
204}