001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import java.util.List; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.nuxeo.ecm.core.work.api.Work; 027import org.nuxeo.ecm.core.work.api.Work.State; 028import org.nuxeo.ecm.core.work.api.WorkManager; 029import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 030import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 031 032/** 033 * Interface describing how the {@link WorkManager} implements queuing. 034 * <p> 035 * There are 4 structures maintained per-queue: 036 * <ul> 037 * <li>the work queue managed by the {@link ThreadPoolExecutor},</li> 038 * <li>the set of scheduled work, this enables to list a work as being scheduled while it has been removed from the 039 * queue by the {@link ThreadPoolExecutor} and not yet executed (not yet running).</li> 040 * <li>the set of running work,</li> 041 * <li>the set of completed work.</li> 042 * </ul> 043 * 044 * @since 5.8 045 */ 046public interface WorkQueuing { 047 048 /** 049 * Starts up this {@link WorkQueuing} and attempts to resume work previously suspended and saved at shutdown time. 050 */ 051 NuxeoBlockingQueue init(WorkQueueDescriptor config); 052 053 /** 054 * Enable/disable this {@code queueId} processing 055 * 056 * @since 8.3 057 */ 058 void setActive(String queueId, boolean value); 059 060 /** 061 * @return true if the implementation supports processing disabling 062 * @since 10.3 063 */ 064 boolean supportsProcessingDisabling(); 065 066 /** 067 * Gets the blocking queue of work used by the {@link ThreadPoolExecutor}. 068 * 069 * @since 8.1 070 */ 071 NuxeoBlockingQueue getQueue(String queueId); 072 073 /** 074 * Submit a work to the {@link ThreadPoolExecutor} and put it in the scheduled set. 075 * 076 * @param queueId the queue id 077 * @param work the work instance 078 * @since 8.1 079 */ 080 void workSchedule(String queueId, Work work); 081 082 /** 083 * Removes a work instance from scheduled set. 084 * 085 * @since 8.3 086 **/ 087 void workCanceled(String queueId, Work work); 088 089 /** 090 * Put the work instance into the running set. 091 * 092 * @param queueId the queue id 093 * @param work the work instance 094 * @since 5.8 095 */ 096 void workRunning(String queueId, Work work); 097 098 /** 099 * Moves a work instance from the running set to the completed set. 100 * 101 * @param queueId the queue id 102 * @param work the work instance 103 * @since 5.8 104 */ 105 void workCompleted(String queueId, Work work); 106 107 /** 108 * Moves back a work instance from running set to the scheduled set. 109 * 110 * @since 8.3 111 **/ 112 void workReschedule(String queueId, Work work); 113 114 /** 115 * Finds a work instance in the scheduled or running or completed sets. 116 * 117 * @param workId the id of the work to find 118 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 119 * RUNNING}, or {@code null} for SCHEDULED or RUNNING 120 * @return the found work instance, or {@code null} if not found 121 */ 122 Work find(String workId, State state); 123 124 /** 125 * Finds a scheduled work instance and removes it from the scheduled queue. 126 * 127 * @param queueId the queue id 128 * @param workId the id of the work to find 129 * @since 5.8 130 */ 131 void removeScheduled(String queueId, String workId); 132 133 /** 134 * Checks if a work instance with the given id is in the given state. 135 * 136 * @param workId the work id 137 * @param state the state, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING RUNNING}, or {@code null} for 138 * non-completed 139 * @return {@code true} if a work instance with the given id is in the given state 140 * @since 5.8 141 */ 142 boolean isWorkInState(String workId, State state); 143 144 /** 145 * Gets the state in which a work instance is. 146 * <p> 147 * This can be {@link State#SCHEDULED} or {@link State#RUNNING}. 148 * 149 * @param workId the id of the work to find 150 * @return the work state, or {@code null} if not found 151 * @since 5.8 152 */ 153 State getWorkState(String workId); 154 155 /** 156 * Lists the work instances in a given queue in a defined state. 157 * <p> 158 * Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as 159 * COMPLETED could be found FAILED. 160 * 161 * @param queueId the queue id 162 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 163 * RUNNING}, or {@code null} for non-completed 164 * @return the list of work instances in the given state 165 */ 166 List<Work> listWork(String queueId, State state); 167 168 /** 169 * Lists the work ids in a given queue in a defined state. 170 * 171 * @param queueId the queue id 172 * @param state the state defining the state to look into, {@link State#SCHEDULED SCHEDULED}, {@link State#RUNNING 173 * RUNNING}, or {@code null} for non-completed 174 * @return the list of work ids in the given state 175 */ 176 List<String> listWorkIds(String queueId, State state); 177 178 /** 179 * Gets the number of work instances in the given state in a given queue. 180 * 181 * @param queueId the queue id 182 * @param state the state, {@link State#SCHEDULED SCHEDULED} or {@link State#RUNNING RUNNING} 183 * @return the number of scheduled work instances in the queue 184 * @since 5.8 185 */ 186 long count(String queueId, State state); 187 188 /** 189 * Returns current metrics of queue identified by the {@code queueId} 190 * 191 * @since 8.3 192 */ 193 WorkQueueMetrics metrics(String queueId); 194 195 /** 196 * Set the callback for debugging purpose 197 * 198 * @since 8.3 199 */ 200 void listen(Listener listener); 201 202 interface Listener { 203 204 void queueActivated(WorkQueueMetrics metric); 205 206 void queueDeactivated(WorkQueueMetrics metric); 207 208 void queueChanged(Work work, WorkQueueMetrics metric); 209 210 static Listener lookupListener() { 211 final Log log = LogFactory.getLog(WorkQueuing.class); 212 if (log.isTraceEnabled()) { 213 class Tracing implements Listener { 214 private final Log log; 215 216 protected Tracing(Log log) { 217 this.log = log; 218 } 219 220 @Override 221 public void queueChanged(Work work, WorkQueueMetrics metrics) { 222 log.trace(String.format("%s -> changed on %s %s", metrics, work.getWorkInstanceState(), 223 work.getSchedulePath())); 224 } 225 226 @Override 227 public void queueActivated(WorkQueueMetrics metrics) { 228 log.trace(String.format("%s -> activated", metrics)); 229 } 230 231 @Override 232 public void queueDeactivated(WorkQueueMetrics metrics) { 233 log.trace(String.format("%s -> deactivated", metrics)); 234 } 235 } 236 237 return new Tracing(log); 238 } else { 239 class Null implements Listener { 240 @Override 241 public void queueActivated(WorkQueueMetrics metric) { 242 243 } 244 245 @Override 246 public void queueDeactivated(WorkQueueMetrics metric) { 247 248 } 249 250 @Override 251 public void queueChanged(Work work, WorkQueueMetrics metric) { 252 253 } 254 } 255 return new Null(); 256 } 257 } 258 } 259}