001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.List; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.work.api.Work; 027import org.nuxeo.ecm.core.work.api.Work.State; 028import org.nuxeo.ecm.core.work.api.WorkManager; 029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 031 032/** 033 * Interface describing how the {@link WorkManager} implements queuing. 034 * <p> 035 * There are 4 structures maintained per-queue: 036 * <ul> 037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li> 038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the 039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li> 040 * <li>the set of running work,</li> 041 * <li>the set of completed work.</li> 042 * 043 * @since 5.8 044 */ 045public interface WorkQueuing { 046 047 /** 048 * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time. 049 * @return 050 */ 051 NuxeoBlockingQueue init(WorkQueueDescriptor config); 052 053 /** 054 * Enable/disable this {@code queueId} processing 055 * @since 8.3 056 */ 057 void setActive(String queueId, boolean value); 058 059 /** 060 * @return true if the implementation supports processing disabling 061 * @since 10.3 062 */ 063 boolean supportsProcessingDisabling(); 064 065 /** 066 * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}. 067 * 068 * @since 8.1 069 * @param queueId 070 * @return 071 */ 072 NuxeoBlockingQueue getQueue(String queueId); 073 074 /** 075 * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set. 076 * 077 * @param queueId the queue id 078 * @param work the work instance 079 * @since 8.1 080 */ 081 void workSchedule(String queueId, Work work); 082 083 /** 084 * Removes a work instance from scheduled set. 085 * 086 * @since 8.3 087 **/ 088 void workCanceled(String queueId, Work work); 089 090 /** 091 * Put the work instance into the running set. 092 * 093 * @param queueId the queue id 094 * @param work the work instance 095 * @since 5.8 096 */ 097 void workRunning(String queueId, Work work); 098 099 /** 100 * Moves a work instance from the running set to the completed set. 101 * 102 * @param queueId the queue id 103 * @param work the work instance 104 * @since 5.8 105 */ 106 void workCompleted(String queueId, Work work); 107 108 /** 109 * Moves back a work instance from running set to the scheduled set. 110 * 111 * @since 8.3 112 **/ 113 void workReschedule(String queueId, Work work); 114 115 /** 116 * Finds a work instance in the scheduled or running or completed sets. 117 * 118 * @param workId the id of the work to find 119 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 120 * RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for SCHEDULED or RUNNING 121 * @return the found work instance, or {@code null} if not found 122 */ 123 Work find(String workId, State state); 124 125 /** 126 * Finds a scheduled work instance and removes it from the scheduled queue. 127 * 128 * @param queueId the queue id 129 * @param workId the id of the work to find 130 * @since 5.8 131 */ 132 void removeScheduled(String queueId, String workId); 133 134 /** 135 * Checks if a work instance with the given id is in the given state. 136 * 137 * @param workId the work id 138 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, {@link State#COMPLETED 139 * COMPLETED}, or {@code null} for non-completed 140 * @return {@code true} if a work instance with the given id is in the given state 141 * @since 5.8 142 */ 143 boolean isWorkInState(String workId, State state); 144 145 /** 146 * Gets the state in which a work instance is. 147 * <p> 148 * This can be {@link State#SCHEDULED}, {@link State#RUNNING}, {@link State#COMPLETED}, {@link State#FAILED}, or 149 * {@link State#CANCELED}. 150 * 151 * @param workId the id of the work to find 152 * @return the work state, or {@code null} if not found 153 * @since 5.8 154 */ 155 State getWorkState(String workId); 156 157 /** 158 * Lists the work instances in a given queue in a defined state. 159 * <p> 160 * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as 161 * COMPLETED could be found FAILED. 162 * 163 * @param queueId the queue id 164 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 165 * RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for non-completed 166 * @return the list of work instances in the given state 167 */ 168 List<Work> listWork(String queueId, State state); 169 170 /** 171 * Lists the work ids in a given queue in a defined state. 172 * 173 * @param queueId the queue id 174 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 175 * RUNNING}, or {@code null} for non-completed 176 * @return the list of work ids in the given state 177 */ 178 List<String> listWorkIds(String queueId, State state); 179 180 /** 181 * Gets the number of work instances in the given state in a given queue. 182 * 183 * @param queueId the queue id 184 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING} or 185 * {@link State#COMPLETED COMPLETED} 186 * @return the number of scheduled work instances in the queue 187 * @since 5.8 188 */ 189 long count(String queueId, State state); 190 191 /** 192 * Returns current metrics of queue identified by the {@code queueId} 193 * 194 * @since 8.3 195 */ 196 WorkQueueMetrics metrics(String queueId); 197 198 /** 199 * Set the callback for debugging purpose 200 * 201 * @since 8.3 202 */ 203 void listen(Listener listener); 204 205 public interface Listener { 206 207 void queueActivated(WorkQueueMetrics metric); 208 209 void queueDeactivated(WorkQueueMetrics metric); 210 211 void queueChanged(Work work, WorkQueueMetrics metric); 212 213 static Listener lookupListener() { 214 final Log log = LogFactory.getLog(WorkQueuing.class); 215 if (log.isTraceEnabled()) { 216 class Tracing implements Listener { 217 private final Log log; 218 219 protected Tracing(Log log) { 220 this.log = log; 221 } 222 223 @Override 224 public void queueChanged(Work work, WorkQueueMetrics metrics) { 225 log.trace(String.format("%s -> changed on %s %s", 226 metrics, 227 work.getWorkInstanceState(), 228 work.getSchedulePath())); 229 } 230 231 @Override 232 public void queueActivated(WorkQueueMetrics metrics) { 233 log.trace(String.format("%s -> activated", metrics)); 234 } 235 236 @Override 237 public void queueDeactivated(WorkQueueMetrics metrics) { 238 log.trace(String.format("%s -> deactivated", metrics)); 239 } 240 } 241 242 return new Tracing(log); 243 } else { 244 class Null implements Listener { 245 @Override 246 public void queueActivated(WorkQueueMetrics metric) { 247 248 } 249 250 @Override 251 public void queueDeactivated(WorkQueueMetrics metric) { 252 253 } 254 255 @Override 256 public void queueChanged(Work work, WorkQueueMetrics metric) { 257 258 } 259 } 260 return new Null(); 261 } 262 } 263 } 264}