001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.work;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.LinkedHashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.BlockingQueue;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.work.api.Work;
032import org.nuxeo.ecm.core.work.api.Work.State;
033import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
034
035/**
036 * Implementation of a {@link WorkQueuing} using in-memory queuing.
037 *
038 * @since 5.8
039 */
040public class MemoryWorkQueuing implements WorkQueuing {
041
042    private static final Log log = LogFactory.getLog(MemoryWorkQueuing.class);
043
044    protected final WorkManagerImpl mgr;
045
046    // @GuardedBy("this")
047    protected final WorkQueueDescriptorRegistry workQueueDescriptors;
048
049    // @GuardedBy("this")
050    protected final Map<String, BlockingQueue<Runnable>> allScheduled = new HashMap<String, BlockingQueue<Runnable>>();
051
052    // @GuardedBy("this")
053    // queueId -> workId -> work
054    protected final Map<String, Map<String, Work>> allRunning = new HashMap<String, Map<String, Work>>();
055
056    // @GuardedBy("this")
057    // queueId -> workId -> work
058    protected final Map<String, Map<String, Work>> allCompleted = new HashMap<String, Map<String, Work>>();
059
060    public MemoryWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) {
061        this.mgr = mgr;
062        this.workQueueDescriptors = workQueueDescriptors;
063    }
064
065    @Override
066    public synchronized void init() {
067        allScheduled.clear();
068        allRunning.clear();
069        allCompleted.clear();
070    }
071
072    // called synchronized
073    protected WorkQueueDescriptor getDescriptor(String queueId) {
074        WorkQueueDescriptor descriptor = workQueueDescriptors.get(queueId);
075        if (descriptor == null) {
076            throw new IllegalArgumentException("No such work queue: " + queueId);
077        }
078        return descriptor;
079    }
080
081    @Override
082    public BlockingQueue<Runnable> initScheduleQueue(String queueId) {
083        if (allScheduled.containsKey(queueId)) {
084            throw new IllegalStateException(queueId + " was already configured");
085        }
086        final BlockingQueue<Runnable> queue = newBlockingQueue(getDescriptor(queueId));
087        allScheduled.put(queueId, queue);
088        return queue;
089    }
090
091    @Override
092    public BlockingQueue<Runnable> getScheduledQueue(String queueId) {
093        if (!allScheduled.containsKey(queueId)) {
094            throw new IllegalStateException(queueId + " was not configured yet");
095        }
096        return allScheduled.get(queueId);
097    }
098
099    // called synchronized
100    protected Map<String, Work> getRunning(String queueId) {
101        Map<String, Work> running = allRunning.get(queueId);
102        if (running == null) {
103            allRunning.put(queueId, running = newRunningMap());
104        }
105        return running;
106    }
107
108    // called synchronized
109    protected Map<String, Work> getCompleted(String queueId) {
110        Map<String, Work> completed = allCompleted.get(queueId);
111        if (completed == null) {
112            allCompleted.put(queueId, completed = newCompletedMap());
113        }
114        return completed;
115    }
116
117    protected BlockingQueue<Runnable> newBlockingQueue(WorkQueueDescriptor workQueueDescriptor) {
118        int capacity = workQueueDescriptor.getCapacity();
119        if (capacity <= 0) {
120            capacity = -1; // unbounded
121        }
122        return new MemoryBlockingQueue(this, capacity);
123    }
124
125    protected Map<String, Work> newRunningMap() {
126        return new HashMap<String, Work>();
127    }
128
129    protected Map<String, Work> newCompletedMap() {
130        return new LinkedHashMap<String, Work>();
131    }
132
133    @Override
134    public synchronized void workRunning(String queueId, Work work) {
135        // work is already taken from the scheduled queue
136        // by the thread pool executor
137        getRunning(queueId).put(work.getId(), work);
138    }
139
140    @Override
141    public synchronized void workCompleted(String queueId, Work work) {
142        getRunning(queueId).remove(work.getId());
143        getCompleted(queueId).put(work.getId(), work);
144    }
145
146    @Override
147    public Work find(String workId, State state) {
148        if (state == null) {
149            Work w = findScheduled(workId);
150            if (w == null) {
151                w = findRunning(workId);
152            }
153            return w;
154        }
155        switch (state) {
156        case SCHEDULED:
157            return findScheduled(workId);
158        case RUNNING:
159            return findRunning(workId);
160        case COMPLETED:
161            return findCompleted(workId);
162        default:
163            return null;
164        }
165    }
166
167    @Override
168    public boolean isWorkInState(String workId, State state) {
169        if (state == null) {
170            return isScheduled(workId) || isRunning(workId);
171        }
172        switch (state) {
173        case SCHEDULED:
174            return isScheduled(workId);
175        case RUNNING:
176            return isRunning(workId);
177        case COMPLETED:
178            return isCompleted(workId);
179        default:
180            return false;
181        }
182    }
183
184    @Override
185    public State getWorkState(String workId) {
186        // TODO this is linear, but isScheduled is buggy
187        if (findScheduled(workId) != null) {
188            return State.SCHEDULED;
189        }
190        if (isRunning(workId)) {
191            return State.RUNNING;
192        }
193        if (isCompleted(workId)) {
194            return State.COMPLETED;
195        }
196        return null;
197    }
198
199    @Override
200    public synchronized List<Work> listWork(String queueId, State state) {
201        switch (state) {
202        case SCHEDULED:
203            return listScheduled(queueId);
204        case RUNNING:
205            return listRunning(queueId);
206        case COMPLETED:
207            return listCompleted(queueId);
208        default:
209            throw new IllegalArgumentException(String.valueOf(state));
210        }
211    }
212
213    @Override
214    public synchronized List<String> listWorkIds(String queueId, State state) {
215        if (state == null) {
216            return listNonCompletedIds(queueId);
217        }
218        switch (state) {
219        case SCHEDULED:
220            return listScheduledIds(queueId);
221        case RUNNING:
222            return listRunningIds(queueId);
223        case COMPLETED:
224            return listCompletedIds(queueId);
225        default:
226            throw new IllegalArgumentException(String.valueOf(state));
227        }
228    }
229
230    @Override
231    public int getQueueSize(String queueId, State state) {
232        switch (state) {
233        case SCHEDULED:
234            return getScheduledSize(queueId);
235        case RUNNING:
236            return getRunningSize(queueId);
237        case COMPLETED:
238            return getCompletedSize(queueId);
239        default:
240            throw new IllegalArgumentException(String.valueOf(state));
241        }
242    }
243
244    protected synchronized int getScheduledSize(String queueId) {
245        BlockingQueue<Runnable> scheduled = allScheduled.get(queueId);
246        return scheduled == null ? 0 : scheduled.size();
247    }
248
249    protected synchronized int getRunningSize(String queueId) {
250        Map<String, Work> running = allRunning.get(queueId);
251        return running == null ? 0 : running.size();
252    }
253
254    protected synchronized int getCompletedSize(String queueId) {
255        Map<String, Work> completed = allCompleted.get(queueId);
256        return completed == null ? 0 : completed.size();
257    }
258
259    protected synchronized boolean isScheduled(String workId) {
260        for (BlockingQueue<Runnable> scheduled : allScheduled.values()) {
261            MemoryBlockingQueue q = (MemoryBlockingQueue) scheduled;
262            if (q.containsWorkId(workId)) {
263                return true;
264            }
265        }
266        return false;
267    }
268
269    protected synchronized boolean isRunning(String workId) {
270        for (Map<String, Work> running : allRunning.values()) {
271            if (running.containsKey(workId)) {
272                return true;
273            }
274        }
275        return false;
276    }
277
278    protected synchronized boolean isCompleted(String workId) {
279        for (Map<String, Work> completed : allCompleted.values()) {
280            if (completed.containsKey(workId)) {
281                return true;
282            }
283        }
284        return false;
285    }
286
287    protected synchronized Work findScheduled(String workId) {
288        for (BlockingQueue<Runnable> scheduled : allScheduled.values()) {
289            for (Runnable r : scheduled) {
290                Work w = WorkHolder.getWork(r);
291                if (w.getId().equals(workId)) {
292                    return w;
293                }
294            }
295        }
296        return null;
297    }
298
299    protected synchronized Work findRunning(String workId) {
300        for (Map<String, Work> running : allRunning.values()) {
301            Work w = running.get(workId);
302            if (w != null) {
303                return w;
304            }
305        }
306        return null;
307    }
308
309    protected synchronized Work findCompleted(String workId) {
310        for (Map<String, Work> completed : allCompleted.values()) {
311            Work w = completed.get(workId);
312            if (w != null) {
313                return w;
314            }
315        }
316        return null;
317    }
318
319    // no synchronized as scheduled queue is thread-safe
320    protected List<Work> listScheduled(String queueId) {
321        BlockingQueue<Runnable> scheduled = getScheduledQueue(queueId);
322        List<Work> list = new ArrayList<Work>(scheduled.size());
323        for (Runnable r : scheduled) {
324            Work w = WorkHolder.getWork(r);
325            list.add(w);
326        }
327        return list;
328    }
329
330    // called synchronized
331    protected List<Work> listRunning(String queueId) {
332        return new ArrayList<Work>(getRunning(queueId).values());
333    }
334
335    // called synchronized
336    protected List<Work> listCompleted(String queueId) {
337        return new ArrayList<Work>(getCompleted(queueId).values());
338    }
339
340    // no synchronized as scheduled queue is thread-safe
341    protected List<String> listScheduledIds(String queueId) {
342        BlockingQueue<Runnable> scheduled = getScheduledQueue(queueId);
343        List<String> list = new ArrayList<String>(scheduled.size());
344        for (Runnable r : scheduled) {
345            Work w = WorkHolder.getWork(r);
346            list.add(w.getId());
347        }
348        return list;
349    }
350
351    // called synchronized
352    protected List<String> listRunningIds(String queueId) {
353        return new ArrayList<String>(getRunning(queueId).keySet());
354    }
355
356    // called synchronized
357    protected List<String> listNonCompletedIds(String queueId) {
358        List<String> list = listScheduledIds(queueId);
359        list.addAll(listRunningIds(queueId));
360        return list;
361    }
362
363    // called synchronized
364    protected List<String> listCompletedIds(String queueId) {
365        return new ArrayList<String>(getCompleted(queueId).keySet());
366    }
367
368    @Override
369    public Work removeScheduled(String queueId, String workId) {
370        for (Iterator<Runnable> it = getScheduledQueue(queueId).iterator(); it.hasNext();) {
371            Runnable r = it.next();
372            Work w = WorkHolder.getWork(r);
373            if (w.getId().equals(workId)) {
374                it.remove();
375                return w;
376            }
377        }
378        return null;
379    }
380
381    @Override
382    public int setSuspending(String queueId) {
383        // for in-memory queuing, there's no suspend
384        // drain scheduled queue and mark work canceled
385        List<Runnable> scheduled = new ArrayList<Runnable>();
386        getScheduledQueue(queueId).drainTo(scheduled);
387        for (Runnable r : scheduled) {
388            Work work = WorkHolder.getWork(r);
389            work.setWorkInstanceState(State.CANCELED);
390        }
391        return scheduled.size();
392    }
393
394    @Override
395    public Set<String> getCompletedQueueIds() {
396        return new HashSet<String>(allCompleted.keySet());
397    }
398
399    @Override
400    public synchronized void clearCompletedWork(String queueId, long completionTime) {
401        Map<String, Work> completed = getCompleted(queueId);
402        if (completionTime <= 0) {
403            completed.clear();
404        } else {
405            for (Iterator<Work> it = completed.values().iterator(); it.hasNext();) {
406                Work w = it.next();
407                if (w.getCompletionTime() < completionTime) {
408                    it.remove();
409                }
410            }
411        }
412    }
413
414}