001/*
002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027
028import org.nuxeo.ecm.core.work.api.Work;
029import org.nuxeo.ecm.core.work.api.Work.State;
030import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
031import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
032
033/**
034 * Implementation of a {@link WorkQueuing} using in-memory queuing.
035 *
036 * @since 5.8
037 */
038public class MemoryWorkQueuing implements WorkQueuing {
039
040    protected final Map<String, MemoryBlockingQueue> allQueued = new HashMap<>();
041
042    protected Listener listener;
043
044    public MemoryWorkQueuing(Listener listener) {
045        this.listener = listener;
046    }
047
048    @Override
049    public MemoryBlockingQueue init(WorkQueueDescriptor config) {
050        int capacity = config.getCapacity();
051        if (capacity <= 0) {
052            capacity = -1; // unbounded
053        }
054        MemoryBlockingQueue queue = new MemoryBlockingQueue(config.id, this, capacity);
055        allQueued.put(queue.queueId, queue);
056        return queue;
057    }
058
059    @Override
060    public MemoryBlockingQueue getQueue(String queueId) {
061        return allQueued.get(queueId);
062    }
063
064    @Override
065    public void workSchedule(String queueId, Work work) {
066        listener.queueChanged(work, getQueue(queueId).workSchedule(work));
067    }
068
069    @Override
070    public void workCanceled(String queueId, Work work) {
071        listener.queueChanged(work, getQueue(queueId).workCanceled(work));
072    }
073
074    @Override
075    public void workRunning(String queueId, Work work) {
076        listener.queueChanged(work, getQueue(queueId).workRunning(work));
077    }
078
079    @Override
080    public void workCompleted(String queueId, Work work) {
081        listener.queueChanged(work, getQueue(queueId).workCompleted(work));
082    }
083
084    @Override
085    public void workReschedule(String queueId, Work work) {
086        listener.queueChanged(work, getQueue(queueId).workRescheduleRunning(work));
087    }
088
089    Optional<Work> lookup(String workId) {
090        return allQueued.values()
091                .stream()
092                .map(queue -> queue.lookup(workId))
093                .filter(work -> work != null)
094                .findAny();
095    }
096
097    @Override
098    public Work find(String workId, State state) {
099        return lookup(workId)
100                .filter(work -> work.getWorkInstanceState() == state)
101                .orElse(null);
102    }
103
104    @Override
105    public boolean isWorkInState(String workId, State state) {
106        Optional<Work> option = lookup(workId);
107        if (!option.isPresent()) {
108            return false;
109        }
110        if (state == null) {
111            return true;
112        }
113        return option.get()
114                .getWorkInstanceState()
115                .equals(state);
116    }
117
118    @Override
119    public State getWorkState(String workId) {
120        return lookup(workId).map(Work::getWorkInstanceState)
121                .orElse(null);
122    }
123
124    @Override
125    public List<Work> listWork(String queueId, State state) {
126        MemoryBlockingQueue queue = getQueue(queueId);
127        if (state == null) {
128            return queue.list();
129        }
130        switch (state) {
131        case SCHEDULED:
132            return queue.listScheduled();
133        case RUNNING:
134            return queue.listRunning();
135        default:
136            throw new IllegalArgumentException(String.valueOf(state));
137        }
138    }
139
140    @Override
141    public List<String> listWorkIds(String queueId, State state) {
142        MemoryBlockingQueue queue = getQueue(queueId);
143        if (state == null) {
144            return queue.keys();
145        }
146        switch (state) {
147        case SCHEDULED:
148            return queue.scheduledKeys();
149        case RUNNING:
150            return queue.runningKeys();
151        default:
152            throw new IllegalArgumentException(String.valueOf(state));
153        }
154    }
155
156    @Override
157    public long count(String queueId, State state) {
158        switch (state) {
159        case SCHEDULED:
160            return metrics(queueId).scheduled.longValue();
161        case RUNNING:
162            return metrics(queueId).running.longValue();
163        default:
164            throw new IllegalArgumentException(String.valueOf(state));
165        }
166    }
167
168    @Override
169    public synchronized void removeScheduled(String queueId, String workId) {
170        final MemoryBlockingQueue queue = getQueue(queueId);
171        Work work = queue.lookup(workId);
172        if (work == null) {
173            return;
174        }
175        work.setWorkInstanceState(State.UNKNOWN);
176        listener.queueChanged(work, queue.workCanceled(work));
177    }
178
179    @Override
180    public int setSuspending(String queueId) {
181        MemoryBlockingQueue queue = getQueue(queueId);
182        List<Runnable> scheduled = new ArrayList<>();
183        queue.drainTo(scheduled);
184        for (Runnable r : scheduled) {
185            Work work = WorkHolder.getWork(r);
186            listener.queueChanged(work, queue.workCanceled(work));
187        }
188        return scheduled.size();
189    }
190
191    @Override
192    public void setActive(String queueId, boolean value) {
193        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
194        if (value) {
195            listener.queueActivated(metrics);
196        } else {
197            listener.queueDeactivated(metrics);
198        }
199    }
200
201    @Override
202    public void listen(Listener listener) {
203        this.listener = listener;
204    }
205
206    @Override
207    public WorkQueueMetrics metrics(String queueId) {
208        return getQueue(queueId).metrics();
209    }
210
211}