001/*
002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedHashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.BlockingQueue;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.work.api.Work;
035import org.nuxeo.ecm.core.work.api.Work.State;
036import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
037
038/**
039 * Implementation of a {@link WorkQueuing} using in-memory queuing.
040 *
041 * @since 5.8
042 */
043public class MemoryWorkQueuing implements WorkQueuing {
044
045    private static final Log log = LogFactory.getLog(MemoryWorkQueuing.class);
046
047    protected final WorkManagerImpl mgr;
048
049    // @GuardedBy("this")
050    protected final WorkQueueDescriptorRegistry workQueueDescriptors;
051
052    // @GuardedBy("this")
053    protected final Map<String, BlockingQueue<Runnable>> allQueued = new HashMap<>();
054
055    // @GuardedBy("this")
056    // queueId -> workId -> work
057    protected final Map<String, Map<String, Work>> allScheduled = new HashMap<>();
058
059    // @GuardedBy("this")
060    // queueId -> workId -> work
061    protected final Map<String, Map<String, Work>> allRunning = new HashMap<>();
062
063    // @GuardedBy("this")
064    // queueId -> workId -> work
065    protected final Map<String, Map<String, Work>> allCompleted = new HashMap<>();
066
067    public MemoryWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) {
068        this.mgr = mgr;
069        this.workQueueDescriptors = workQueueDescriptors;
070    }
071
072    @Override
073    public synchronized void init() {
074        allQueued.clear();
075        allScheduled.clear();
076        allRunning.clear();
077        allCompleted.clear();
078    }
079
080    // called synchronized
081    protected WorkQueueDescriptor getDescriptor(String queueId) {
082        WorkQueueDescriptor descriptor = workQueueDescriptors.get(queueId);
083        if (descriptor == null) {
084            throw new IllegalArgumentException("No such work queue: " + queueId);
085        }
086        return descriptor;
087    }
088
089    @Override
090    public BlockingQueue<Runnable> initWorkQueue(String queueId) {
091        if (allQueued.containsKey(queueId)) {
092            throw new IllegalStateException(queueId + " was already configured");
093        }
094        final BlockingQueue<Runnable> queue = newBlockingQueue(getDescriptor(queueId));
095        allQueued.put(queueId, queue);
096        return queue;
097    }
098
099    public BlockingQueue<Runnable> getWorkQueue(String queueId) {
100        if (!allQueued.containsKey(queueId)) {
101            throw new IllegalStateException(queueId + " was not configured yet");
102        }
103        return allQueued.get(queueId);
104    }
105
106    @Override
107    public synchronized boolean workSchedule(String queueId, Work work) {
108        boolean ret = getWorkQueue(queueId).offer(new WorkHolder(work));
109        if (ret) {
110            getScheduled(queueId).put(work.getId(), work);
111        }
112        return ret;
113    }
114
115    // called synchronized
116    protected Map<String, Work> getScheduled(String queueId) {
117        Map<String, Work> scheduled = allScheduled.get(queueId);
118        if (scheduled == null) {
119            allScheduled.put(queueId, scheduled = newScheduledMap());
120        }
121        return scheduled;
122    }
123
124    // called synchronized
125    protected Map<String, Work> getRunning(String queueId) {
126        Map<String, Work> running = allRunning.get(queueId);
127        if (running == null) {
128            allRunning.put(queueId, running = newRunningMap());
129        }
130        return running;
131    }
132
133    // called synchronized
134    protected Map<String, Work> getCompleted(String queueId) {
135        Map<String, Work> completed = allCompleted.get(queueId);
136        if (completed == null) {
137            allCompleted.put(queueId, completed = newCompletedMap());
138        }
139        return completed;
140    }
141
142    protected BlockingQueue<Runnable> newBlockingQueue(WorkQueueDescriptor workQueueDescriptor) {
143        int capacity = workQueueDescriptor.getCapacity();
144        if (capacity <= 0) {
145            capacity = -1; // unbounded
146        }
147        return new MemoryBlockingQueue(capacity);
148    }
149
150    protected Map<String, Work> newScheduledMap() {
151        return new HashMap<>();
152    }
153
154    protected Map<String, Work> newRunningMap() {
155        return new HashMap<>();
156    }
157
158    protected Map<String, Work> newCompletedMap() {
159        return new LinkedHashMap<>();
160    }
161
162    @Override
163    public synchronized void workRunning(String queueId, Work work) {
164        // at this time the work is already taken from the queue by the thread pool
165        Work ret = getRunning(queueId).put(work.getId(), work);
166        if (ret != null) {
167            log.warn("Running a work with the same ID " + work.getId() + " multiple time, already running " + ret +
168                    ", new work: " + work);
169        }
170        getScheduled(queueId).remove(work.getId());
171        if (log.isTraceEnabled()) {
172            log.trace("Work running " + work.getId() + " on " + queueId + ", total running: " +
173                    count(queueId, State.RUNNING));
174        }
175    }
176
177    @Override
178    public synchronized void workCompleted(String queueId, Work work) {
179        if (log.isTraceEnabled()) {
180            log.trace("Work completed " + work.getId());
181        }
182        getCompleted(queueId).put(work.getId(), work);
183        getRunning(queueId).remove(work.getId());
184    }
185
186    @Override
187    public Work find(String workId, State state) {
188        if (state == null) {
189            Work w = findScheduled(workId);
190            if (w == null) {
191                w = findRunning(workId);
192            }
193            return w;
194        }
195        switch (state) {
196        case SCHEDULED:
197            return findScheduled(workId);
198        case RUNNING:
199            return findRunning(workId);
200        case COMPLETED:
201            return findCompleted(workId);
202        default:
203            return null;
204        }
205    }
206
207    @Override
208    public boolean isWorkInState(String workId, State state) {
209        if (state == null) {
210            return isScheduled(workId) || isRunning(workId);
211        }
212        switch (state) {
213        case SCHEDULED:
214            return isScheduled(workId);
215        case RUNNING:
216            return isRunning(workId);
217        case COMPLETED:
218            return isCompleted(workId);
219        default:
220            return false;
221        }
222    }
223
224    @Override
225    public State getWorkState(String workId) {
226        // TODO this is linear, but isScheduled is buggy
227        if (findScheduled(workId) != null) {
228            return State.SCHEDULED;
229        }
230        if (isRunning(workId)) {
231            return State.RUNNING;
232        }
233        if (isCompleted(workId)) {
234            return State.COMPLETED;
235        }
236        return null;
237    }
238
239    @Override
240    public synchronized List<Work> listWork(String queueId, State state) {
241        switch (state) {
242        case SCHEDULED:
243            return listScheduled(queueId);
244        case RUNNING:
245            return listRunning(queueId);
246        case COMPLETED:
247            return listCompleted(queueId);
248        default:
249            throw new IllegalArgumentException(String.valueOf(state));
250        }
251    }
252
253    @Override
254    public synchronized List<String> listWorkIds(String queueId, State state) {
255        if (state == null) {
256            return listNonCompletedIds(queueId);
257        }
258        switch (state) {
259        case SCHEDULED:
260            return listScheduledIds(queueId);
261        case RUNNING:
262            return listRunningIds(queueId);
263        case COMPLETED:
264            return listCompletedIds(queueId);
265        default:
266            throw new IllegalArgumentException(String.valueOf(state));
267        }
268    }
269
270    @Override
271    public int count(String queueId, State state) {
272        switch (state) {
273        case SCHEDULED:
274            return getScheduledSize(queueId);
275        case RUNNING:
276            return getRunningSize(queueId);
277        case COMPLETED:
278            return getCompletedSize(queueId);
279        default:
280            throw new IllegalArgumentException(String.valueOf(state));
281        }
282    }
283
284    protected synchronized int getScheduledSize(String queueId) {
285        Map<String, Work> scheduled = allScheduled.get(queueId);
286        return scheduled == null ? 0 : scheduled.size();
287    }
288
289    protected synchronized int getRunningSize(String queueId) {
290        Map<String, Work> running = allRunning.get(queueId);
291        return running == null ? 0 : running.size();
292    }
293
294    protected synchronized int getCompletedSize(String queueId) {
295        Map<String, Work> completed = allCompleted.get(queueId);
296        return completed == null ? 0 : completed.size();
297    }
298
299    protected synchronized boolean isScheduled(String workId) {
300        return find(workId, allScheduled) != null;
301    }
302
303    protected synchronized boolean isRunning(String workId) {
304        return find(workId, allRunning) != null;
305    }
306
307    protected synchronized boolean isCompleted(String workId) {
308        return find(workId, allCompleted) != null;
309    }
310
311    protected synchronized Work findScheduled(String workId) {
312        return find(workId, allScheduled);
313    }
314
315    protected synchronized Work findRunning(String workId) {
316        return find(workId, allRunning);
317    }
318
319    protected synchronized Work findCompleted(String workId) {
320        return find(workId, allCompleted);
321    }
322
323    private Work find(String workId, Map<String, Map<String, Work>> allWorks) {
324        for (Map<String, Work> works : allWorks.values()) {
325            Work ret = works.get(workId);
326            if (ret != null) {
327                return ret;
328            }
329        }
330        return null;
331    }
332
333    // called synchronized
334    protected List<Work> listScheduled(String queueId) {
335        return new ArrayList<>(getScheduled(queueId).values());
336    }
337
338    // called synchronized
339    protected List<Work> listRunning(String queueId) {
340        return new ArrayList<>(getRunning(queueId).values());
341    }
342
343    // called synchronized
344    protected List<Work> listCompleted(String queueId) {
345        return new ArrayList<>(getCompleted(queueId).values());
346    }
347
348    // called synchronized
349    protected List<String> listScheduledIds(String queueId) {
350        return new ArrayList<>(getScheduled(queueId).keySet());
351    }
352
353    // called synchronized
354    protected List<String> listRunningIds(String queueId) {
355        return new ArrayList<>(getRunning(queueId).keySet());
356    }
357
358    // called synchronized
359    protected List<String> listNonCompletedIds(String queueId) {
360        List<String> list = listScheduledIds(queueId);
361        list.addAll(listRunningIds(queueId));
362        return list;
363    }
364
365    // called synchronized
366    protected List<String> listCompletedIds(String queueId) {
367        return new ArrayList<>(getCompleted(queueId).keySet());
368    }
369
370    @Override
371    public Work removeScheduled(String queueId, String workId) {
372        removeFromScheduledSet(queueId, workId);
373        for (Iterator<Runnable> it = getWorkQueue(queueId).iterator(); it.hasNext();) {
374            Runnable r = it.next();
375            Work w = WorkHolder.getWork(r);
376            if (w.getId().equals(workId)) {
377                it.remove();
378                return w;
379            }
380        }
381        return null;
382    }
383
384    protected Work removeFromScheduledSet(String queueId, String workId) {
385        for (Iterator<Work> it = getScheduled(queueId).values().iterator(); it.hasNext();) {
386            Work w = it.next();
387            if (w.getId().equals(workId)) {
388                it.remove();
389                return w;
390            }
391        }
392        return null;
393    }
394
395    @Override
396    public int setSuspending(String queueId) {
397        // for in-memory queuing, there's no suspend
398        // drain scheduled queue and mark work canceled
399        List<Runnable> scheduled = new ArrayList<>();
400        getWorkQueue(queueId).drainTo(scheduled);
401        for (Runnable r : scheduled) {
402            Work work = WorkHolder.getWork(r);
403            work.setWorkInstanceState(State.CANCELED);
404        }
405        getScheduled(queueId).clear();
406        return scheduled.size();
407    }
408
409    @Override
410    public Set<String> getCompletedQueueIds() {
411        return new HashSet<>(allCompleted.keySet());
412    }
413
414    @Override
415    public synchronized void clearCompletedWork(String queueId, long completionTime) {
416        Map<String, Work> completed = getCompleted(queueId);
417        if (completionTime <= 0) {
418            completed.clear();
419        } else {
420            for (Iterator<Work> it = completed.values().iterator(); it.hasNext();) {
421                Work w = it.next();
422                if (w.getCompletionTime() < completionTime) {
423                    it.remove();
424                }
425            }
426        }
427    }
428
429}