001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.List; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.work.api.Work; 027import org.nuxeo.ecm.core.work.api.Work.State; 028import org.nuxeo.ecm.core.work.api.WorkManager; 029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 031 032/** 033 * Interface describing how the {@link WorkManager} implements queuing. 034 * <p> 035 * There are 4 structures maintained per-queue: 036 * <ul> 037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li> 038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the 039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li> 040 * <li>the set of running work,</li> 041 * <li>the set of completed work.</li> 042 * 043 * @since 5.8 044 */ 045public interface WorkQueuing { 046 047 /** 048 * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time. 049 * @return 050 */ 051 NuxeoBlockingQueue init(WorkQueueDescriptor config); 052 053 /** 054 * Enable/disable this {@code queueId} processing 055 * @since 8.3 056 */ 057 void setActive(String queueId, boolean value); 058 059 /** 060 * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}. 061 * 062 * @since 8.1 063 * @param queueId 064 * @return 065 */ 066 NuxeoBlockingQueue getQueue(String queueId); 067 068 /** 069 * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set. 070 * 071 * @param queueId the queue id 072 * @param work the work instance 073 * @since 8.1 074 */ 075 void workSchedule(String queueId, Work work); 076 077 /** 078 * Removes a work instance from scheduled set. 079 * 080 * @since 8.3 081 **/ 082 void workCanceled(String queueId, Work work); 083 084 /** 085 * Put the work instance into the running set. 086 * 087 * @param queueId the queue id 088 * @param work the work instance 089 * @since 5.8 090 */ 091 void workRunning(String queueId, Work work); 092 093 /** 094 * Moves a work instance from the running set to the completed set. 095 * 096 * @param queueId the queue id 097 * @param work the work instance 098 * @since 5.8 099 */ 100 void workCompleted(String queueId, Work work); 101 102 /** 103 * Moves back a work instance from running set to the scheduled set. 104 * 105 * @since 8.3 106 **/ 107 void workReschedule(String queueId, Work work); 108 109 /** 110 * Finds a work instance in the scheduled or running or completed sets. 111 * 112 * @param workId the id of the work to find 113 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 114 * RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for SCHEDULED or RUNNING 115 * @return the found work instance, or {@code null} if not found 116 */ 117 Work find(String workId, State state); 118 119 /** 120 * Finds a scheduled work instance and removes it from the scheduled queue. 121 * 122 * @param queueId the queue id 123 * @param workId the id of the work to find 124 * @since 5.8 125 */ 126 void removeScheduled(String queueId, String workId); 127 128 /** 129 * Checks if a work instance with the given id is in the given state. 130 * 131 * @param workId the work id 132 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, {@link State#COMPLETED 133 * COMPLETED}, or {@code null} for non-completed 134 * @return {@code true} if a work instance with the given id is in the given state 135 * @since 5.8 136 */ 137 boolean isWorkInState(String workId, State state); 138 139 /** 140 * Gets the state in which a work instance is. 141 * <p> 142 * This can be {@link State#SCHEDULED}, {@link State#RUNNING}, {@link State#COMPLETED}, {@link State#FAILED}, or 143 * {@link State#CANCELED}. 144 * 145 * @param workId the id of the work to find 146 * @return the work state, or {@code null} if not found 147 * @since 5.8 148 */ 149 State getWorkState(String workId); 150 151 /** 152 * Lists the work instances in a given queue in a defined state. 153 * <p> 154 * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as 155 * COMPLETED could be found FAILED. 156 * 157 * @param queueId the queue id 158 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 159 * RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for non-completed 160 * @return the list of work instances in the given state 161 */ 162 List<Work> listWork(String queueId, State state); 163 164 /** 165 * Lists the work ids in a given queue in a defined state. 166 * 167 * @param queueId the queue id 168 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 169 * RUNNING}, or {@code null} for non-completed 170 * @return the list of work ids in the given state 171 */ 172 List<String> listWorkIds(String queueId, State state); 173 174 /** 175 * Gets the number of work instances in the given state in a given queue. 176 * 177 * @param queueId the queue id 178 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING} or 179 * {@link State#COMPLETED COMPLETED} 180 * @return the number of scheduled work instances in the queue 181 * @since 5.8 182 */ 183 long count(String queueId, State state); 184 185 /** 186 * Notifies this queuing that all work should be suspending. 187 * 188 * @return the number of scheduled instances removed from queue 189 */ 190 int setSuspending(String queueId); 191 192 /** 193 * Returns current metrics of queue identified by the {@code queueId} 194 * 195 * @since 8.3 196 */ 197 WorkQueueMetrics metrics(String queueId); 198 199 /** 200 * Set the callback for debugging purpose 201 * 202 * @since 8.3 203 */ 204 void listen(Listener listener); 205 206 public interface Listener { 207 208 void queueActivated(WorkQueueMetrics metric); 209 210 void queueDeactivated(WorkQueueMetrics metric); 211 212 void queueChanged(Work work, WorkQueueMetrics metric); 213 214 static Listener lookupListener() { 215 final Log log = LogFactory.getLog(WorkQueuing.class); 216 if (log.isTraceEnabled()) { 217 class Tracing implements Listener { 218 private final Log log; 219 220 protected Tracing(Log log) { 221 this.log = log; 222 } 223 224 @Override 225 public void queueChanged(Work work, WorkQueueMetrics metrics) { 226 log.trace(String.format("%s -> changed on %s %s", 227 metrics, 228 work.getWorkInstanceState(), 229 work.getSchedulePath())); 230 } 231 232 @Override 233 public void queueActivated(WorkQueueMetrics metrics) { 234 log.trace(String.format("%s -> activated", metrics)); 235 } 236 237 @Override 238 public void queueDeactivated(WorkQueueMetrics metrics) { 239 log.trace(String.format("%s -> deactivated", metrics)); 240 } 241 } 242 243 return new Tracing(log); 244 } else { 245 class Null implements Listener { 246 @Override 247 public void queueActivated(WorkQueueMetrics metric) { 248 249 } 250 251 @Override 252 public void queueDeactivated(WorkQueueMetrics metric) { 253 254 } 255 256 @Override 257 public void queueChanged(Work work, WorkQueueMetrics metric) { 258 259 } 260 } 261 return new Null(); 262 } 263 } 264 265 } 266 267 268}