001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.work;
020
021import java.util.List;
022import java.util.concurrent.ThreadPoolExecutor;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.work.api.Work;
027import org.nuxeo.ecm.core.work.api.Work.State;
028import org.nuxeo.ecm.core.work.api.WorkManager;
029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
031
032/**
033 * Interface describing how the {@link WorkManager} implements queuing.
034 * <p>
035 * There are 4 structures maintained per-queue:
036 * <ul>
037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li>
038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the
039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li>
040 * <li>the set of running work,</li>
041 * <li>the set of completed work.</li>
042 *
043 * @since 5.8
044 */
045public interface WorkQueuing {
046
047    /**
048     * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time.
049     * @return
050     */
051    NuxeoBlockingQueue init(WorkQueueDescriptor config);
052
053    /**
054     * Enable/disable this {@code queueId} processing
055     * @since 8.3
056     */
057    void setActive(String queueId, boolean value);
058
059    /**
060     * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}.
061     *
062     * @since 8.1
063     * @param queueId
064     * @return
065     */
066    NuxeoBlockingQueue getQueue(String queueId);
067
068    /**
069     * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set.
070     *
071     * @param queueId the queue id
072     * @param work the work instance
073     * @since 8.1
074     */
075    void workSchedule(String queueId, Work work);
076
077    /**
078     * Removes a work instance from scheduled set.
079     *
080     * @since 8.3
081     **/
082    void workCanceled(String queueId, Work work);
083
084    /**
085     * Put the work instance into the running set.
086     *
087     * @param queueId the queue id
088     * @param work the work instance
089     * @since 5.8
090     */
091    void workRunning(String queueId, Work work);
092
093    /**
094     * Moves a work instance from the running set to the completed set.
095     *
096     * @param queueId the queue id
097     * @param work the work instance
098     * @since 5.8
099     */
100    void workCompleted(String queueId, Work work);
101
102    /**
103     * Moves back a work instance from running set to the scheduled set.
104     *
105     * @since 8.3
106     **/
107    void workReschedule(String queueId, Work work);
108
109    /**
110     * Finds a work instance in the scheduled or running or completed sets.
111     *
112     * @param workId the id of the work to find
113     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
114     *        RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for SCHEDULED or RUNNING
115     * @return the found work instance, or {@code null} if not found
116     */
117    Work find(String workId, State state);
118
119    /**
120     * Finds a scheduled work instance and removes it from the scheduled queue.
121     *
122     * @param queueId the queue id
123     * @param workId the id of the work to find
124     * @since 5.8
125     */
126    void removeScheduled(String queueId, String workId);
127
128    /**
129     * Checks if a work instance with the given id is in the given state.
130     *
131     * @param workId the work id
132     * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, {@link State#COMPLETED
133     *        COMPLETED}, or {@code null} for non-completed
134     * @return {@code true} if a work instance with the given id is in the given state
135     * @since 5.8
136     */
137    boolean isWorkInState(String workId, State state);
138
139    /**
140     * Gets the state in which a work instance is.
141     * <p>
142     * This can be {@link State#SCHEDULED}, {@link State#RUNNING}, {@link State#COMPLETED}, {@link State#FAILED}, or
143     * {@link State#CANCELED}.
144     *
145     * @param workId the id of the work to find
146     * @return the work state, or {@code null} if not found
147     * @since 5.8
148     */
149    State getWorkState(String workId);
150
151    /**
152     * Lists the work instances in a given queue in a defined state.
153     * <p>
154     * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as
155     * COMPLETED could be found FAILED.
156     *
157     * @param queueId the queue id
158     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
159     *        RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for non-completed
160     * @return the list of work instances in the given state
161     */
162    List<Work> listWork(String queueId, State state);
163
164    /**
165     * Lists the work ids in a given queue in a defined state.
166     *
167     * @param queueId the queue id
168     * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING
169     *        RUNNING},  or {@code null} for non-completed
170     * @return the list of work ids in the given state
171     */
172    List<String> listWorkIds(String queueId, State state);
173
174    /**
175     * Gets the number of work instances in the given state in a given queue.
176     *
177     * @param queueId the queue id
178     * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING} or
179     *        {@link State#COMPLETED COMPLETED}
180     * @return the number of scheduled work instances in the queue
181     * @since 5.8
182     */
183    long count(String queueId, State state);
184
185    /**
186     * Returns current metrics of queue identified by the {@code queueId}
187     *
188     * @since 8.3
189     */
190    WorkQueueMetrics metrics(String queueId);
191
192    /**
193     * Set the callback for debugging purpose
194     *
195     * @since 8.3
196     */
197    void listen(Listener listener);
198
199    public interface Listener {
200
201        void queueActivated(WorkQueueMetrics metric);
202
203        void queueDeactivated(WorkQueueMetrics metric);
204
205        void queueChanged(Work work, WorkQueueMetrics metric);
206
207        static Listener lookupListener() {
208            final Log log = LogFactory.getLog(WorkQueuing.class);
209            if (log.isTraceEnabled()) {
210                class Tracing implements Listener {
211                    private final Log log;
212
213                    protected Tracing(Log log) {
214                        this.log = log;
215                    }
216
217                    @Override
218                    public void queueChanged(Work work, WorkQueueMetrics metrics) {
219                        log.trace(String.format("%s -> changed on %s %s",
220                                metrics,
221                                work.getWorkInstanceState(),
222                                work.getSchedulePath()));
223                    }
224
225                    @Override
226                    public void queueActivated(WorkQueueMetrics metrics) {
227                        log.trace(String.format("%s -> activated", metrics));
228                    }
229
230                    @Override
231                    public void queueDeactivated(WorkQueueMetrics metrics) {
232                        log.trace(String.format("%s -> deactivated", metrics));
233                    }
234                }
235
236                return new Tracing(log);
237            } else {
238                class Null implements Listener {
239                    @Override
240                    public void queueActivated(WorkQueueMetrics metric) {
241
242                    }
243
244                    @Override
245                    public void queueDeactivated(WorkQueueMetrics metric) {
246
247                    }
248
249                    @Override
250                    public void queueChanged(Work work, WorkQueueMetrics metric) {
251
252                    }
253                }
254                return new Null();
255            }
256        }
257
258    }
259
260
261}