001/*
002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026
027import org.nuxeo.ecm.core.work.api.Work;
028import org.nuxeo.ecm.core.work.api.Work.State;
029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
031
032/**
033 * Implementation of a {@link WorkQueuing} using in-memory queuing.
034 *
035 * @since 5.8
036 */
037public class MemoryWorkQueuing implements WorkQueuing {
038
039    protected final Map<String, MemoryBlockingQueue> allQueued = new HashMap<>();
040
041    protected Listener listener;
042
043    public MemoryWorkQueuing(Listener listener) {
044        this.listener = listener;
045    }
046
047    @Override
048    public MemoryBlockingQueue init(WorkQueueDescriptor config) {
049        int capacity = config.getCapacity();
050        if (capacity <= 0) {
051            capacity = -1; // unbounded
052        }
053        MemoryBlockingQueue queue = new MemoryBlockingQueue(config.id, this, capacity);
054        allQueued.put(queue.queueId, queue);
055        return queue;
056    }
057
058    @Override
059    public MemoryBlockingQueue getQueue(String queueId) {
060        return allQueued.get(queueId);
061    }
062
063    @Override
064    public void workSchedule(String queueId, Work work) {
065        listener.queueChanged(work, getQueue(queueId).workSchedule(work));
066    }
067
068    @Override
069    public void workCanceled(String queueId, Work work) {
070        listener.queueChanged(work, getQueue(queueId).workCanceled(work));
071    }
072
073    @Override
074    public void workRunning(String queueId, Work work) {
075        listener.queueChanged(work, getQueue(queueId).workRunning(work));
076    }
077
078    @Override
079    public void workCompleted(String queueId, Work work) {
080        listener.queueChanged(work, getQueue(queueId).workCompleted(work));
081    }
082
083    @Override
084    public void workReschedule(String queueId, Work work) {
085        listener.queueChanged(work, getQueue(queueId).workRescheduleRunning(work));
086    }
087
088    Optional<Work> lookup(String workId) {
089        return allQueued.values()
090                .stream()
091                .map(queue -> queue.lookup(workId))
092                .filter(work -> work != null)
093                .findAny();
094    }
095
096    @Override
097    public Work find(String workId, State state) {
098        return lookup(workId)
099                .filter(work -> work.getWorkInstanceState() == state)
100                .orElse(null);
101    }
102
103    @Override
104    public boolean isWorkInState(String workId, State state) {
105        Optional<Work> option = lookup(workId);
106        if (!option.isPresent()) {
107            return false;
108        }
109        if (state == null) {
110            return true;
111        }
112        return option.get()
113                .getWorkInstanceState()
114                .equals(state);
115    }
116
117    @Override
118    public State getWorkState(String workId) {
119        return lookup(workId).map(Work::getWorkInstanceState)
120                .orElse(null);
121    }
122
123    @Override
124    public List<Work> listWork(String queueId, State state) {
125        MemoryBlockingQueue queue = getQueue(queueId);
126        if (state == null) {
127            return queue.list();
128        }
129        switch (state) {
130        case SCHEDULED:
131            return queue.listScheduled();
132        case RUNNING:
133            return queue.listRunning();
134        default:
135            throw new IllegalArgumentException(String.valueOf(state));
136        }
137    }
138
139    @Override
140    public List<String> listWorkIds(String queueId, State state) {
141        MemoryBlockingQueue queue = getQueue(queueId);
142        if (state == null) {
143            return queue.keys();
144        }
145        switch (state) {
146        case SCHEDULED:
147            return queue.scheduledKeys();
148        case RUNNING:
149            return queue.runningKeys();
150        default:
151            throw new IllegalArgumentException(String.valueOf(state));
152        }
153    }
154
155    @Override
156    public long count(String queueId, State state) {
157        switch (state) {
158        case SCHEDULED:
159            return metrics(queueId).scheduled.longValue();
160        case RUNNING:
161            return metrics(queueId).running.longValue();
162        default:
163            throw new IllegalArgumentException(String.valueOf(state));
164        }
165    }
166
167    @Override
168    public synchronized void removeScheduled(String queueId, String workId) {
169        final MemoryBlockingQueue queue = getQueue(queueId);
170        Work work = queue.lookup(workId);
171        if (work == null) {
172            return;
173        }
174        work.setWorkInstanceState(State.UNKNOWN);
175        listener.queueChanged(work, queue.workCanceled(work));
176    }
177
178    @Override
179    public void setActive(String queueId, boolean value) {
180        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
181        if (value) {
182            listener.queueActivated(metrics);
183        } else {
184            listener.queueDeactivated(metrics);
185        }
186    }
187
188    @Override
189    public void listen(Listener listener) {
190        this.listener = listener;
191    }
192
193    @Override
194    public WorkQueueMetrics metrics(String queueId) {
195        return getQueue(queueId).metrics();
196    }
197
198}