001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.List;
022import java.util.concurrent.ThreadPoolExecutor;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.work.api.Work;
027import org.nuxeo.ecm.core.work.api.Work.State;
028import org.nuxeo.ecm.core.work.api.WorkManager;
029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
031
032/**
033 * Interface describing how the {@link WorkManager} implements queuing.
034 * <p>
035 * There are 4 structures maintained per-queue:
036 * <ul>
037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li>
038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the
039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li>
040 * <li>the set of running work,</li>
041 * <li>the set of completed work.</li>
042 *
043 * @since 5.8
044 */
045public interface WorkQueuing {
046
047    /**
048     * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time.
049     * @return
050     */
051    NuxeoBlockingQueue init(WorkQueueDescriptor config);
052
053    /**
054     * Enable/disable this {@code queueId} processing
055     * @since 8.3
056     */
057    void setActive(String queueId, boolean value);
058
059    /**
060     * @return true if the implementation supports processing disabling
061     * @since 10.3
062     */
063    boolean supportsProcessingDisabling();
064
065    /**
066     * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}.
067     *
068     * @since 8.1
069     * @param queueId
070     * @return
071     */
072    NuxeoBlockingQueue getQueue(String queueId);
073
074    /**
075     * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set.
076     *
077     * @param queueId the queue id
078     * @param work the work instance
079     * @since 8.1
080     */
081    void workSchedule(String queueId, Work work);
082
083    /**
084     * Removes a work instance from scheduled set.
085     *
086     * @since 8.3
087     **/
088    void workCanceled(String queueId, Work work);
089
090    /**
091     * Put the work instance into the running set.
092     *
093     * @param queueId the queue id
094     * @param work the work instance
095     * @since 5.8
096     */
097    void workRunning(String queueId, Work work);
098
099    /**
100     * Moves a work instance from the running set to the completed set.
101     *
102     * @param queueId the queue id
103     * @param work the work instance
104     * @since 5.8
105     */
106    void workCompleted(String queueId, Work work);
107
108    /**
109     * Moves back a work instance from running set to the scheduled set.
110     *
111     * @since 8.3
112     **/
113    void workReschedule(String queueId, Work work);
114
115    /**
116     * Finds a work instance in the scheduled or running or completed sets.
117     *
118     * @param workId the id of the work to find
119     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
120     *        RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for SCHEDULED or RUNNING
121     * @return the found work instance, or {@code null} if not found
122     */
123    Work find(String workId, State state);
124
125    /**
126     * Finds a scheduled work instance and removes it from the scheduled queue.
127     *
128     * @param queueId the queue id
129     * @param workId the id of the work to find
130     * @since 5.8
131     */
132    void removeScheduled(String queueId, String workId);
133
134    /**
135     * Checks if a work instance with the given id is in the given state.
136     *
137     * @param workId the work id
138     * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, {@link State#COMPLETED
139     *        COMPLETED}, or {@code null} for non-completed
140     * @return {@code true} if a work instance with the given id is in the given state
141     * @since 5.8
142     */
143    boolean isWorkInState(String workId, State state);
144
145    /**
146     * Gets the state in which a work instance is.
147     * <p>
148     * This can be {@link State#SCHEDULED}, {@link State#RUNNING}, {@link State#COMPLETED}, {@link State#FAILED}, or
149     * {@link State#CANCELED}.
150     *
151     * @param workId the id of the work to find
152     * @return the work state, or {@code null} if not found
153     * @since 5.8
154     */
155    State getWorkState(String workId);
156
157    /**
158     * Lists the work instances in a given queue in a defined state.
159     * <p>
160     * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as
161     * COMPLETED could be found FAILED.
162     *
163     * @param queueId the queue id
164     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
165     *        RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for non-completed
166     * @return the list of work instances in the given state
167     */
168    List<Work> listWork(String queueId, State state);
169
170    /**
171     * Lists the work ids in a given queue in a defined state.
172     *
173     * @param queueId the queue id
174     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
175     *        RUNNING},  or {@code null} for non-completed
176     * @return the list of work ids in the given state
177     */
178    List<String> listWorkIds(String queueId, State state);
179
180    /**
181     * Gets the number of work instances in the given state in a given queue.
182     *
183     * @param queueId the queue id
184     * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING} or
185     *        {@link State#COMPLETED COMPLETED}
186     * @return the number of scheduled work instances in the queue
187     * @since 5.8
188     */
189    long count(String queueId, State state);
190
191    /**
192     * Returns current metrics of queue identified by the {@code queueId}
193     *
194     * @since 8.3
195     */
196    WorkQueueMetrics metrics(String queueId);
197
198    /**
199     * Set the callback for debugging purpose
200     *
201     * @since 8.3
202     */
203    void listen(Listener listener);
204
205    public interface Listener {
206
207        void queueActivated(WorkQueueMetrics metric);
208
209        void queueDeactivated(WorkQueueMetrics metric);
210
211        void queueChanged(Work work, WorkQueueMetrics metric);
212
213        static Listener lookupListener() {
214            final Log log = LogFactory.getLog(WorkQueuing.class);
215            if (log.isTraceEnabled()) {
216                class Tracing implements Listener {
217                    private final Log log;
218
219                    protected Tracing(Log log) {
220                        this.log = log;
221                    }
222
223                    @Override
224                    public void queueChanged(Work work, WorkQueueMetrics metrics) {
225                        log.trace(String.format("%s -> changed on %s %s",
226                                metrics,
227                                work.getWorkInstanceState(),
228                                work.getSchedulePath()));
229                    }
230
231                    @Override
232                    public void queueActivated(WorkQueueMetrics metrics) {
233                        log.trace(String.format("%s -> activated", metrics));
234                    }
235
236                    @Override
237                    public void queueDeactivated(WorkQueueMetrics metrics) {
238                        log.trace(String.format("%s -> deactivated", metrics));
239                    }
240                }
241
242                return new Tracing(log);
243            } else {
244                class Null implements Listener {
245                    @Override
246                    public void queueActivated(WorkQueueMetrics metric) {
247
248                    }
249
250                    @Override
251                    public void queueDeactivated(WorkQueueMetrics metric) {
252
253                    }
254
255                    @Override
256                    public void queueChanged(Work work, WorkQueueMetrics metric) {
257
258                    }
259                }
260                return new Null();
261            }
262        }
263    }
264}