001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.List; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.work.api.Work; 027import org.nuxeo.ecm.core.work.api.Work.State; 028import org.nuxeo.ecm.core.work.api.WorkManager; 029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 031 032/** 033 * Interface describing how the {@link WorkManager} implements queuing. 034 * <p> 035 * There are 4 structures maintained per-queue: 036 * <ul> 037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li> 038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the 039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li> 040 * <li>the set of running work,</li> 041 * <li>the set of completed work.</li> 042 * 043 * @since 5.8 044 */ 045public interface WorkQueuing { 046 047 /** 048 * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time. 049 * @return 050 */ 051 NuxeoBlockingQueue init(WorkQueueDescriptor config); 052 053 /** 054 * Enable/disable this {@code queueId} processing 055 * @since 8.3 056 */ 057 void setActive(String queueId, boolean value); 058 059 /** 060 * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}. 061 * 062 * @since 8.1 063 * @param queueId 064 * @return 065 */ 066 NuxeoBlockingQueue getQueue(String queueId); 067 068 /** 069 * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set. 070 * 071 * @param queueId the queue id 072 * @param work the work instance 073 * @since 8.1 074 */ 075 void workSchedule(String queueId, Work work); 076 077 /** 078 * Removes a work instance from scheduled set. 079 * 080 * @since 8.3 081 **/ 082 void workCanceled(String queueId, Work work); 083 084 /** 085 * Put the work instance into the running set. 086 * 087 * @param queueId the queue id 088 * @param work the work instance 089 * @since 5.8 090 */ 091 void workRunning(String queueId, Work work); 092 093 /** 094 * Moves a work instance from the running set to the completed set. 095 * 096 * @param queueId the queue id 097 * @param work the work instance 098 * @since 5.8 099 */ 100 void workCompleted(String queueId, Work work); 101 102 /** 103 * Moves back a work instance from running set to the scheduled set. 104 * 105 * @since 8.3 106 **/ 107 void workReschedule(String queueId, Work work); 108 109 /** 110 * Finds a work instance in the scheduled or running or completed sets. 111 * 112 * @param workId the id of the work to find 113 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 114 * RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for SCHEDULED or RUNNING 115 * @return the found work instance, or {@code null} if not found 116 */ 117 Work find(String workId, State state); 118 119 /** 120 * Finds a scheduled work instance and removes it from the scheduled queue. 121 * 122 * @param queueId the queue id 123 * @param workId the id of the work to find 124 * @since 5.8 125 */ 126 void removeScheduled(String queueId, String workId); 127 128 /** 129 * Checks if a work instance with the given id is in the given state. 130 * 131 * @param workId the work id 132 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, {@link State#COMPLETED 133 * COMPLETED}, or {@code null} for non-completed 134 * @return {@code true} if a work instance with the given id is in the given state 135 * @since 5.8 136 */ 137 boolean isWorkInState(String workId, State state); 138 139 /** 140 * Gets the state in which a work instance is. 141 * <p> 142 * This can be {@link State#SCHEDULED}, {@link State#RUNNING}, {@link State#COMPLETED}, {@link State#FAILED}, or 143 * {@link State#CANCELED}. 144 * 145 * @param workId the id of the work to find 146 * @return the work state, or {@code null} if not found 147 * @since 5.8 148 */ 149 State getWorkState(String workId); 150 151 /** 152 * Lists the work instances in a given queue in a defined state. 153 * <p> 154 * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as 155 * COMPLETED could be found FAILED. 156 * 157 * @param queueId the queue id 158 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 159 * RUNNING}, {@link State#COMPLETED COMPLETED}, or {@code null} for non-completed 160 * @return the list of work instances in the given state 161 */ 162 List<Work> listWork(String queueId, State state); 163 164 /** 165 * Lists the work ids in a given queue in a defined state. 166 * 167 * @param queueId the queue id 168 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 169 * RUNNING}, or {@code null} for non-completed 170 * @return the list of work ids in the given state 171 */ 172 List<String> listWorkIds(String queueId, State state); 173 174 /** 175 * Gets the number of work instances in the given state in a given queue. 176 * 177 * @param queueId the queue id 178 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING} or 179 * {@link State#COMPLETED COMPLETED} 180 * @return the number of scheduled work instances in the queue 181 * @since 5.8 182 */ 183 long count(String queueId, State state); 184 185 /** 186 * Returns current metrics of queue identified by the {@code queueId} 187 * 188 * @since 8.3 189 */ 190 WorkQueueMetrics metrics(String queueId); 191 192 /** 193 * Set the callback for debugging purpose 194 * 195 * @since 8.3 196 */ 197 void listen(Listener listener); 198 199 public interface Listener { 200 201 void queueActivated(WorkQueueMetrics metric); 202 203 void queueDeactivated(WorkQueueMetrics metric); 204 205 void queueChanged(Work work, WorkQueueMetrics metric); 206 207 static Listener lookupListener() { 208 final Log log = LogFactory.getLog(WorkQueuing.class); 209 if (log.isTraceEnabled()) { 210 class Tracing implements Listener { 211 private final Log log; 212 213 protected Tracing(Log log) { 214 this.log = log; 215 } 216 217 @Override 218 public void queueChanged(Work work, WorkQueueMetrics metrics) { 219 log.trace(String.format("%s -> changed on %s %s", 220 metrics, 221 work.getWorkInstanceState(), 222 work.getSchedulePath())); 223 } 224 225 @Override 226 public void queueActivated(WorkQueueMetrics metrics) { 227 log.trace(String.format("%s -> activated", metrics)); 228 } 229 230 @Override 231 public void queueDeactivated(WorkQueueMetrics metrics) { 232 log.trace(String.format("%s -> deactivated", metrics)); 233 } 234 } 235 236 return new Tracing(log); 237 } else { 238 class Null implements Listener { 239 @Override 240 public void queueActivated(WorkQueueMetrics metric) { 241 242 } 243 244 @Override 245 public void queueDeactivated(WorkQueueMetrics metric) { 246 247 } 248 249 @Override 250 public void queueChanged(Work work, WorkQueueMetrics metric) { 251 252 } 253 } 254 return new Null(); 255 } 256 } 257 258 } 259 260 261}