public class RedisWorkQueuing extends Object implements WorkQueuing
WorkQueuing
storing Work
instances in Redis.WorkQueuing.Listener
Modifier and Type | Field and Description |
---|---|
protected Map<String,NuxeoBlockingQueue> |
allQueued |
protected byte[] |
cancelledRunningWorkSha |
protected byte[] |
cancelledScheduledWorkSha |
protected byte[] |
completedWorkSha |
protected byte[] |
initWorkQueueSha |
protected static byte[] |
KEY_CANCELED |
protected static String |
KEY_CANCELED_PREFIX |
protected static byte[] |
KEY_COMPLETED |
protected static String |
KEY_COMPLETED_PREFIX
Per-queue set of counters.
|
protected static String |
KEY_COUNT_PREFIX |
protected static String |
KEY_DATA
Global hash of Work instance id -> serialoized Work instance.
|
protected static byte[] |
KEY_QUEUE |
protected static String |
KEY_QUEUE_PREFIX
Per-queue list of scheduled Work instance ids.
|
protected static byte[] |
KEY_RUNNING |
protected static String |
KEY_RUNNING_PREFIX
Per-queue set of running Work instance ids.
|
protected static byte[] |
KEY_SCHEDULED |
protected static String |
KEY_SCHEDULED_PREFIX
Per-queue set of scheduled Work instance ids.
|
protected static String |
KEY_STATE
Global hash of Work instance id -> Work state.
|
protected static byte[] |
KEY_SUSPENDED |
protected static String |
KEY_SUSPENDED_PREFIX
Per-queue list of suspended Work instance ids.
|
protected WorkQueuing.Listener |
listener |
protected byte[] |
metricsWorkQueueSha |
protected byte[] |
popWorkSha |
protected String |
redisNamespace |
protected byte[] |
runningWorkSha |
protected byte[] |
schedulingWorkSha |
protected static byte[] |
STATE_RUNNING |
protected static byte |
STATE_RUNNING_B |
protected static byte |
STATE_RUNNING_C |
protected static byte[] |
STATE_SCHEDULED |
protected static byte |
STATE_SCHEDULED_B |
protected static byte[] |
STATE_UNKNOWN |
protected static String |
UTF_8 |
Constructor and Description |
---|
RedisWorkQueuing(WorkQueuing.Listener listener) |
Modifier and Type | Method and Description |
---|---|
protected List<byte[]> |
args(String workId) |
protected List<byte[]> |
args(Work work,
boolean serialize) |
protected byte[] |
bytes(String string) |
protected byte[] |
bytes(Work.State state) |
protected byte[] |
canceledKey(String queueId) |
protected static Number[] |
coerceNullToZero(List<Number> numbers) |
protected static Number[] |
coerceNullToZero(Number[] counters) |
protected byte[] |
completedKey(String queueId) |
long |
count(String queueId,
Work.State state)
Gets the number of work instances in the given state in a given queue.
|
protected byte[] |
countKey(String queueId) |
protected byte[] |
dataKey() |
protected Work |
deserializeWork(byte[] workBytes) |
Work |
find(String workId,
Work.State state)
Finds a work instance in the scheduled or running or completed sets.
|
NuxeoBlockingQueue |
getQueue(String queueId)
Gets the blocking queue of work used by the
ThreadPoolExecutor . |
protected Set<String> |
getQueueIds(String queuePrefix)
Finds which queues have work for a given state prefix.
|
protected Set<String> |
getRunningQueueIds() |
protected Set<String> |
getScheduledQueueIds() |
protected Set<String> |
getSuspendedQueueIds()
Finds which queues have suspended work.
|
protected Work |
getWork(byte[] workIdBytes) |
protected Work |
getWorkData(byte[] workIdBytes) |
protected Work |
getWorkFromQueue(String queueId)
Removes first work from work queue.
|
Work.State |
getWorkState(String workId)
Gets the state in which a work instance is.
|
protected Work.State |
getWorkStateInfo(String workId)
Gets the work state.
|
NuxeoBlockingQueue |
init(WorkQueueDescriptor config)
Starts up this
WorkQueuing and attempts to resume work previously suspended and saved at shutdown time. |
boolean |
isWorkInState(String workId,
Work.State state)
Checks if a work instance with the given id is in the given state.
|
protected static String |
key(String... names) |
protected byte[] |
keyBytes(String prefix) |
protected byte[] |
keyBytes(String prefix,
String queueId) |
protected List<byte[]> |
keys(String queueid) |
void |
listen(WorkQueuing.Listener listener)
Set the callback for debugging purpose
|
protected List<String> |
listNonCompletedIds(String queueId) |
protected List<Work> |
listRunning(String queueId) |
protected List<String> |
listRunningIds(String queueId) |
protected List<Work> |
listScheduled(String queueId) |
protected List<String> |
listScheduledIds(String queueId) |
List<Work> |
listWork(String queueId,
Work.State state)
Lists the work instances in a given queue in a defined state.
|
List<String> |
listWorkIds(String queueId,
Work.State state)
Lists the work ids in a given queue in a defined state.
|
protected List<String> |
listWorkIdsList(byte[] queueBytes) |
protected List<String> |
listWorkIdsSet(byte[] queueBytes) |
protected List<Work> |
listWorkList(byte[] queueBytes) |
protected List<Work> |
listWorkSet(byte[] queueBytes) |
WorkQueueMetrics |
metrics(String queueId)
Returns current metrics of queue identified by the
queueId |
protected byte[] |
queuedKey(String queueId) |
void |
removeScheduled(String queueId,
String workId)
Finds a scheduled work instance and removes it from the scheduled queue.
|
protected void |
removeScheduledWork(String queueId,
String workId)
Removes a given work from queue, move the work from scheduled to completed set.
|
protected byte[] |
runningKey(String queueId) |
protected byte[] |
scheduledKey(String queueId) |
int |
scheduleSuspendedWork(String queueId)
Resumes all suspended work instances by moving them to the scheduled queue.
|
protected byte[] |
serializeWork(Work work) |
void |
setActive(String queueId,
boolean value)
Enable/disable this
queueId processing |
protected byte[] |
stateKey() |
protected String |
string(byte[] bytes) |
protected byte[] |
suspendedKey(String queueId) |
int |
suspendScheduledWork(String queueId)
Suspends all scheduled work instances by moving them to the suspended queue.
|
void |
workCanceled(String queueId,
Work work)
Removes a work instance from scheduled set.
|
void |
workCompleted(String queueId,
Work work)
Moves a work instance from the running set to the completed set.
|
protected byte[] |
workId(String id) |
protected byte[] |
workId(Work work) |
void |
workReschedule(String queueId,
Work work)
Moves back a work instance from running set to the scheduled set.
|
void |
workRunning(String queueId,
Work work)
Put the work instance into the running set.
|
void |
workSchedule(String queueId,
Work work)
Submit a work to the
ThreadPoolExecutor and put it in the scheduled set. |
protected void |
workSetCancelledScheduled(String queueId,
Work work)
Switches a work to state completed, and saves its new state.
|
protected void |
workSetCompleted(String queueId,
Work work)
Switches a work to state completed, and saves its new state.
|
protected void |
workSetReschedule(String queueId,
Work work)
Switches a work to state canceled, and saves its new state.
|
protected void |
workSetRunning(String queueId,
Work work)
Switches a work to state running.
|
void |
workSetScheduled(String queueId,
Work work)
Persists a work instance and adds it to the scheduled queue.
|
protected static final String UTF_8
protected static final String KEY_DATA
protected static final String KEY_STATE
protected static final String KEY_SUSPENDED_PREFIX
protected static final byte[] KEY_SUSPENDED
protected static final String KEY_QUEUE_PREFIX
protected static final byte[] KEY_QUEUE
protected static final String KEY_SCHEDULED_PREFIX
protected static final byte[] KEY_SCHEDULED
protected static final String KEY_RUNNING_PREFIX
protected static final byte[] KEY_RUNNING
protected static final String KEY_COMPLETED_PREFIX
protected static final byte[] KEY_COMPLETED
protected static final String KEY_CANCELED_PREFIX
protected static final byte[] KEY_CANCELED
protected static final String KEY_COUNT_PREFIX
protected static final byte STATE_SCHEDULED_B
protected static final byte STATE_RUNNING_B
protected static final byte STATE_RUNNING_C
protected static final byte[] STATE_SCHEDULED
protected static final byte[] STATE_RUNNING
protected static final byte[] STATE_UNKNOWN
protected WorkQueuing.Listener listener
protected final Map<String,NuxeoBlockingQueue> allQueued
protected String redisNamespace
protected byte[] initWorkQueueSha
protected byte[] metricsWorkQueueSha
protected byte[] schedulingWorkSha
protected byte[] popWorkSha
protected byte[] runningWorkSha
protected byte[] cancelledScheduledWorkSha
protected byte[] completedWorkSha
protected byte[] cancelledRunningWorkSha
public RedisWorkQueuing(WorkQueuing.Listener listener)
public NuxeoBlockingQueue init(WorkQueueDescriptor config)
WorkQueuing
WorkQueuing
and attempts to resume work previously suspended and saved at shutdown time.init
in interface WorkQueuing
public NuxeoBlockingQueue getQueue(String queueId)
WorkQueuing
ThreadPoolExecutor
.getQueue
in interface WorkQueuing
public void workSchedule(String queueId, Work work)
WorkQueuing
ThreadPoolExecutor
and put it in the scheduled set.workSchedule
in interface WorkQueuing
queueId
- the queue idwork
- the work instancepublic void workRunning(String queueId, Work work)
WorkQueuing
workRunning
in interface WorkQueuing
queueId
- the queue idwork
- the work instancepublic void workCanceled(String queueId, Work work)
WorkQueuing
workCanceled
in interface WorkQueuing
public void workCompleted(String queueId, Work work)
WorkQueuing
workCompleted
in interface WorkQueuing
queueId
- the queue idwork
- the work instancepublic void workReschedule(String queueId, Work work)
WorkQueuing
workReschedule
in interface WorkQueuing
public List<Work> listWork(String queueId, Work.State state)
WorkQueuing
Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as COMPLETED could be found FAILED.
listWork
in interface WorkQueuing
queueId
- the queue idstate
- the state defining the state to look into, SCHEDULED
, RUNNING
, COMPLETED
, or null
for non-completedpublic List<String> listWorkIds(String queueId, Work.State state)
WorkQueuing
listWorkIds
in interface WorkQueuing
queueId
- the queue idstate
- the state defining the state to look into, SCHEDULED
, RUNNING
, or null
for non-completedprotected List<Work> listScheduled(String queueId)
protected List<Work> listRunning(String queueId)
protected List<String> listScheduledIds(String queueId)
protected List<String> listRunningIds(String queueId)
protected List<String> listNonCompletedIds(String queueId)
public long count(String queueId, Work.State state)
WorkQueuing
count
in interface WorkQueuing
queueId
- the queue idstate
- the state, SCHEDULED
, RUNNING
or
COMPLETED
public Work find(String workId, Work.State state)
WorkQueuing
find
in interface WorkQueuing
workId
- the id of the work to findstate
- the state defining the state to look into, SCHEDULED
, RUNNING
, COMPLETED
, or null
for SCHEDULED or RUNNINGnull
if not foundpublic boolean isWorkInState(String workId, Work.State state)
WorkQueuing
isWorkInState
in interface WorkQueuing
workId
- the work idstate
- the state, SCHEDULED
, RUNNING
, COMPLETED
, or null
for non-completedtrue
if a work instance with the given id is in the given statepublic void removeScheduled(String queueId, String workId)
WorkQueuing
removeScheduled
in interface WorkQueuing
queueId
- the queue idworkId
- the id of the work to findpublic Work.State getWorkState(String workId)
WorkQueuing
This can be Work.State.SCHEDULED
, Work.State.RUNNING
, State#COMPLETED
, State#FAILED
, or
State#CANCELED
.
getWorkState
in interface WorkQueuing
workId
- the id of the work to findnull
if not foundpublic void setActive(String queueId, boolean value)
WorkQueuing
queueId
processingsetActive
in interface WorkQueuing
protected byte[] bytes(Work.State state)
protected byte[] suspendedKey(String queueId)
protected byte[] runningKey(String queueId)
protected byte[] scheduledKey(String queueId)
protected byte[] completedKey(String queueId)
protected byte[] canceledKey(String queueId)
protected byte[] stateKey()
protected byte[] dataKey()
protected byte[] serializeWork(Work work) throws IOException
IOException
protected Work deserializeWork(byte[] workBytes)
protected Set<String> getSuspendedQueueIds() throws IOException
IOException
protected Set<String> getScheduledQueueIds()
protected Set<String> getRunningQueueIds()
protected Set<String> getQueueIds(String queuePrefix)
public int scheduleSuspendedWork(String queueId) throws IOException
queueId
- the queue idIOException
public int suspendScheduledWork(String queueId) throws IOException
queueId
- the queue idIOException
public WorkQueueMetrics metrics(String queueId)
WorkQueuing
queueId
metrics
in interface WorkQueuing
public void workSetScheduled(String queueId, Work work) throws IOException
queueId
- the queue idwork
- the work instanceIOException
protected void workSetCancelledScheduled(String queueId, Work work) throws IOException
IOException
protected void workSetRunning(String queueId, Work work) throws IOException
queueId
- the queue idwork
- the workIOException
protected void workSetCompleted(String queueId, Work work) throws IOException
IOException
protected void workSetReschedule(String queueId, Work work) throws IOException
IOException
protected List<byte[]> args(String workId) throws IOException
IOException
protected List<byte[]> args(Work work, boolean serialize) throws IOException
IOException
protected Work.State getWorkStateInfo(String workId)
workId
- the work idnull
if not foundprotected List<String> listWorkIdsList(byte[] queueBytes) throws IOException
IOException
protected List<String> listWorkIdsSet(byte[] queueBytes) throws IOException
IOException
protected List<Work> listWorkList(byte[] queueBytes) throws IOException
IOException
protected List<Work> listWorkSet(byte[] queueBytes) throws IOException
IOException
protected Work getWorkData(byte[] workIdBytes) throws IOException
IOException
protected Work getWorkFromQueue(String queueId) throws IOException
queueId
- the queue idnull
if the scheduled queue is emptyIOException
protected void removeScheduledWork(String queueId, String workId) throws IOException
IOException
protected static Number[] coerceNullToZero(List<Number> numbers)
protected static Number[] coerceNullToZero(Number[] counters)
public void listen(WorkQueuing.Listener listener)
WorkQueuing
listen
in interface WorkQueuing
Copyright © 2018 Nuxeo. All rights reserved.