001/*
002 * (C) Copyright 2012-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.work;
021
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Objects;
026import java.util.Optional;
027
028import org.nuxeo.ecm.core.work.api.Work;
029import org.nuxeo.ecm.core.work.api.Work.State;
030import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
031import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
032
033/**
034 * Implementation of a {@link WorkQueuing} using in-memory queuing.
035 *
036 * @since 5.8
037 */
038public class MemoryWorkQueuing implements WorkQueuing {
039
040    protected final Map<String, MemoryBlockingQueue> allQueued = new HashMap<>();
041
042    protected Listener listener;
043
044    public MemoryWorkQueuing(Listener listener) {
045        this.listener = listener;
046    }
047
048    @Override
049    public MemoryBlockingQueue init(WorkQueueDescriptor config) {
050        int capacity = config.getCapacity();
051        if (capacity <= 0) {
052            capacity = -1; // unbounded
053        }
054        MemoryBlockingQueue queue = new MemoryBlockingQueue(config.id, this, capacity);
055        allQueued.put(queue.queueId, queue);
056        return queue;
057    }
058
059    @Override
060    public MemoryBlockingQueue getQueue(String queueId) {
061        return allQueued.get(queueId);
062    }
063
064    @Override
065    public void workSchedule(String queueId, Work work) {
066        listener.queueChanged(work, getQueue(queueId).workSchedule(work));
067    }
068
069    @Override
070    public void workCanceled(String queueId, Work work) {
071        listener.queueChanged(work, getQueue(queueId).workCanceled(work));
072    }
073
074    @Override
075    public void workRunning(String queueId, Work work) {
076        listener.queueChanged(work, getQueue(queueId).workRunning(work));
077    }
078
079    @Override
080    public void workCompleted(String queueId, Work work) {
081        listener.queueChanged(work, getQueue(queueId).workCompleted(work));
082    }
083
084    @Override
085    public void workReschedule(String queueId, Work work) {
086        listener.queueChanged(work, getQueue(queueId).workRescheduleRunning(work));
087    }
088
089    Optional<Work> lookup(String workId) {
090        return allQueued.values().stream().map(queue -> queue.lookup(workId)).filter(Objects::nonNull).findAny();
091    }
092
093    @Override
094    public Work find(String workId, State state) {
095        return lookup(workId).filter(work -> workHasState(work, state)).orElse(null);
096    }
097
098    @Override
099    public boolean isWorkInState(String workId, State state) {
100        return lookup(workId).filter(work -> workHasState(work, state)).isPresent();
101    }
102
103    @Override
104    public State getWorkState(String workId) {
105        return lookup(workId).map(Work::getWorkInstanceState).orElse(null);
106    }
107
108    @Override
109    public List<Work> listWork(String queueId, State state) {
110        MemoryBlockingQueue queue = getQueue(queueId);
111        if (state == null) {
112            return queue.list();
113        }
114        switch (state) {
115        case SCHEDULED:
116            return queue.listScheduled();
117        case RUNNING:
118            return queue.listRunning();
119        default:
120            throw new IllegalArgumentException(String.valueOf(state));
121        }
122    }
123
124    @Override
125    public List<String> listWorkIds(String queueId, State state) {
126        MemoryBlockingQueue queue = getQueue(queueId);
127        if (state == null) {
128            return queue.keys();
129        }
130        switch (state) {
131        case SCHEDULED:
132            return queue.scheduledKeys();
133        case RUNNING:
134            return queue.runningKeys();
135        default:
136            throw new IllegalArgumentException(String.valueOf(state));
137        }
138    }
139
140    @Override
141    public long count(String queueId, State state) {
142        switch (state) {
143        case SCHEDULED:
144            return metrics(queueId).scheduled.longValue();
145        case RUNNING:
146            return metrics(queueId).running.longValue();
147        default:
148            throw new IllegalArgumentException(String.valueOf(state));
149        }
150    }
151
152    @Override
153    public synchronized void removeScheduled(String queueId, String workId) {
154        final MemoryBlockingQueue queue = getQueue(queueId);
155        Work work = queue.lookup(workId);
156        if (work == null) {
157            return;
158        }
159        work.setWorkInstanceState(State.UNKNOWN);
160        listener.queueChanged(work, queue.workCanceled(work));
161    }
162
163    @Override
164    public void setActive(String queueId, boolean value) {
165        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
166        if (value) {
167            listener.queueActivated(metrics);
168        } else {
169            listener.queueDeactivated(metrics);
170        }
171    }
172
173    @Override
174    public void listen(Listener listener) {
175        this.listener = listener;
176    }
177
178    @Override
179    public WorkQueueMetrics metrics(String queueId) {
180        return getQueue(queueId).metrics();
181    }
182
183    /**
184     * Returns {@code true} if the given state is not {@code null} and matches the state of the given work or if the
185     * state is {@code null} and the work's state is either {@link State#SCHEDULED} or {@link State#RUNNING},
186     * {@code false} otherwise.
187     *
188     * @since 9.3
189     */
190    protected static boolean workHasState(Work work, State state) {
191        State workState = work.getWorkInstanceState();
192        return state == null ? (workState == State.SCHEDULED || workState == State.RUNNING) : workState == state;
193    }
194
195    @Override
196    public boolean supportsProcessingDisabling() {
197        return false;
198    }
199
200}