001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.List;
022import java.util.concurrent.ThreadPoolExecutor;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.work.api.Work;
027import org.nuxeo.ecm.core.work.api.Work.State;
028import org.nuxeo.ecm.core.work.api.WorkManager;
029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
031
032/**
033 * Interface describing how the {@link WorkManager} implements queuing.
034 * <p>
035 * There are 4 structures maintained per-queue:
036 * <ul>
037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li>
038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the
039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li>
040 * <li>the set of running work,</li>
041 * <li>the set of completed work.</li>
042 * </ul>
043 *
044 * @since 5.8
045 */
046public interface WorkQueuing {
047
048    /**
049     * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time.
050     */
051    NuxeoBlockingQueue init(WorkQueueDescriptor config);
052
053    /**
054     * Enable/disable this {@code queueId} processing
055     *
056     * @since 8.3
057     */
058    void setActive(String queueId, boolean value);
059
060    /**
061     * @return true if the implementation supports processing disabling
062     * @since 10.3
063     */
064    boolean supportsProcessingDisabling();
065
066    /**
067     * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}.
068     *
069     * @since 8.1
070     */
071    NuxeoBlockingQueue getQueue(String queueId);
072
073    /**
074     * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set.
075     *
076     * @param queueId the queue id
077     * @param work the work instance
078     * @since 8.1
079     */
080    void workSchedule(String queueId, Work work);
081
082    /**
083     * Removes a work instance from scheduled set.
084     *
085     * @since 8.3
086     **/
087    void workCanceled(String queueId, Work work);
088
089    /**
090     * Put the work instance into the running set.
091     *
092     * @param queueId the queue id
093     * @param work the work instance
094     * @since 5.8
095     */
096    void workRunning(String queueId, Work work);
097
098    /**
099     * Moves a work instance from the running set to the completed set.
100     *
101     * @param queueId the queue id
102     * @param work the work instance
103     * @since 5.8
104     */
105    void workCompleted(String queueId, Work work);
106
107    /**
108     * Moves back a work instance from running set to the scheduled set.
109     *
110     * @since 8.3
111     **/
112    void workReschedule(String queueId, Work work);
113
114    /**
115     * Finds a work instance in the scheduled or running or completed sets.
116     *
117     * @param workId the id of the work to find
118     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
119     *            RUNNING}, or {@code null} for SCHEDULED or RUNNING
120     * @return the found work instance, or {@code null} if not found
121     */
122    Work find(String workId, State state);
123
124    /**
125     * Finds a scheduled work instance and removes it from the scheduled queue.
126     *
127     * @param queueId the queue id
128     * @param workId the id of the work to find
129     * @since 5.8
130     */
131    void removeScheduled(String queueId, String workId);
132
133    /**
134     * Checks if a work instance with the given id is in the given state.
135     *
136     * @param workId the work id
137     * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, or {@code null} for
138     *            non-completed
139     * @return {@code true} if a work instance with the given id is in the given state
140     * @since 5.8
141     */
142    boolean isWorkInState(String workId, State state);
143
144    /**
145     * Gets the state in which a work instance is.
146     * <p>
147     * This can be {@link State#SCHEDULED} or {@link State#RUNNING}.
148     *
149     * @param workId the id of the work to find
150     * @return the work state, or {@code null} if not found
151     * @since 5.8
152     */
153    State getWorkState(String workId);
154
155    /**
156     * Lists the work instances in a given queue in a defined state.
157     * <p>
158     * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as
159     * COMPLETED could be found FAILED.
160     *
161     * @param queueId the queue id
162     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
163     *            RUNNING}, or {@code null} for non-completed
164     * @return the list of work instances in the given state
165     */
166    List<Work> listWork(String queueId, State state);
167
168    /**
169     * Lists the work ids in a given queue in a defined state.
170     *
171     * @param queueId the queue id
172     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
173     *            RUNNING}, or {@code null} for non-completed
174     * @return the list of work ids in the given state
175     */
176    List<String> listWorkIds(String queueId, State state);
177
178    /**
179     * Gets the number of work instances in the given state in a given queue.
180     *
181     * @param queueId the queue id
182     * @param state the state, {@link State#SCHEDULED SCHEDULED} or {@link State#RUNNING RUNNING}
183     * @return the number of scheduled work instances in the queue
184     * @since 5.8
185     */
186    long count(String queueId, State state);
187
188    /**
189     * Returns current metrics of queue identified by the {@code queueId}
190     *
191     * @since 8.3
192     */
193    WorkQueueMetrics metrics(String queueId);
194
195    /**
196     * Set the callback for debugging purpose
197     *
198     * @since 8.3
199     */
200    void listen(Listener listener);
201
202    interface Listener {
203
204        void queueActivated(WorkQueueMetrics metric);
205
206        void queueDeactivated(WorkQueueMetrics metric);
207
208        void queueChanged(Work work, WorkQueueMetrics metric);
209
210        static Listener lookupListener() {
211            final Log log = LogFactory.getLog(WorkQueuing.class);
212            if (log.isTraceEnabled()) {
213                class Tracing implements Listener {
214                    private final Log log;
215
216                    protected Tracing(Log log) {
217                        this.log = log;
218                    }
219
220                    @Override
221                    public void queueChanged(Work work, WorkQueueMetrics metrics) {
222                        log.trace(String.format("%s -> changed on %s %s", metrics, work.getWorkInstanceState(),
223                                work.getSchedulePath()));
224                    }
225
226                    @Override
227                    public void queueActivated(WorkQueueMetrics metrics) {
228                        log.trace(String.format("%s -> activated", metrics));
229                    }
230
231                    @Override
232                    public void queueDeactivated(WorkQueueMetrics metrics) {
233                        log.trace(String.format("%s -> deactivated", metrics));
234                    }
235                }
236
237                return new Tracing(log);
238            } else {
239                class Null implements Listener {
240                    @Override
241                    public void queueActivated(WorkQueueMetrics metric) {
242
243                    }
244
245                    @Override
246                    public void queueDeactivated(WorkQueueMetrics metric) {
247
248                    }
249
250                    @Override
251                    public void queueChanged(Work work, WorkQueueMetrics metric) {
252
253                    }
254                }
255                return new Null();
256            }
257        }
258    }
259}