public class RedisWorkQueuing extends Object implements WorkQueuing
WorkQueuing
storing Work
instances in Redis.Modifier and Type | Class and Description |
---|---|
static class |
RedisWorkQueuing.SScanner
Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
not support SSCAN.
|
Modifier and Type | Field and Description |
---|---|
protected Map<String,BlockingQueue<Runnable>> |
allQueued |
protected String |
delCompletedSha |
protected static String |
KEY_COMPLETED_PREFIX
Per-queue set of completed Work instance ids.
|
protected static String |
KEY_DATA
Global hash of Work instance id -> serialized Work instance.
|
protected static String |
KEY_QUEUE_PREFIX
Per-queue list of scheduled Work instance ids.
|
protected static String |
KEY_RUNNING_PREFIX
Per-queue set of running Work instance ids.
|
protected static String |
KEY_SCHEDULED_PREFIX
Per-queue set of scheduled Work instance ids.
|
protected static String |
KEY_STATE
Global hash of Work instance id -> Work state.
|
protected static String |
KEY_SUSPENDED_PREFIX
Per-queue list of suspended Work instance ids.
|
protected WorkManagerImpl |
mgr |
protected RedisAdmin |
redisAdmin |
protected RedisExecutor |
redisExecutor |
protected String |
redisNamespace |
protected static byte[] |
STATE_CANCELED |
protected static byte |
STATE_CANCELED_B |
protected static byte[] |
STATE_COMPLETED |
protected static byte |
STATE_COMPLETED_B |
protected static byte[] |
STATE_RUNNING |
protected static byte |
STATE_RUNNING_B |
protected static byte[] |
STATE_SCHEDULED |
protected static byte |
STATE_SCHEDULED_B |
protected static String |
UTF_8 |
Constructor and Description |
---|
RedisWorkQueuing(WorkManagerImpl mgr,
WorkQueueDescriptorRegistry workQueueDescriptors) |
Modifier and Type | Method and Description |
---|---|
void |
addScheduledWork(String queueId,
Work work)
Persists a work instance and adds it to the scheduled queue.
|
protected byte[] |
bytes(String string) |
void |
clearCompletedWork(String queueId,
long completionTime)
Clears the list of completed work instances older than the given time in the given queue.
|
protected byte[] |
completedKey(String queueId) |
protected String |
completedKeyString(String queueId) |
int |
count(String queueId,
Work.State state)
Gets the number of work instances in the given state in a given queue.
|
protected byte[] |
dataKey() |
protected String |
dataKeyString() |
protected Work |
deserializeWork(byte[] workBytes) |
Work |
find(String workId,
Work.State state)
Finds a work instance in the scheduled or running or completed sets.
|
Set<String> |
getCompletedQueueIds()
Finds which queues have completed work.
|
protected int |
getCompletedSize(String queueId) |
protected Set<String> |
getQueueIds(String queuePrefix)
Finds which queues have work for a given state prefix.
|
protected int |
getRedisListSize(byte[] key) |
protected int |
getRedisSetSize(byte[] key) |
protected Set<String> |
getRunningQueueIds() |
protected int |
getRunningSize(String queueId) |
protected Set<String> |
getScheduledQueueIds() |
protected int |
getScheduledSize(String queueId) |
protected Set<String> |
getSuspendedQueueIds()
Finds which queues have suspended work.
|
protected Work |
getWork(byte[] workIdBytes) |
protected Work |
getWorkData(byte[] workIdBytes) |
protected Work |
getWorkFromQueue(String queueId)
Removes first work from work queue.
|
BlockingQueue<Runnable> |
getWorkQueue(String queueId) |
Work.State |
getWorkState(String workId)
Gets the state in which a work instance is.
|
protected Work.State |
getWorkStateInfo(String workId)
Gets the work state.
|
void |
init()
Starts up this
WorkQueuing and attempts to resume work previously suspended and saved at
shutdown time. |
BlockingQueue<Runnable> |
initWorkQueue(String queueId)
Creates a blocking queue of work used by the
ThreadPoolExecutor . |
boolean |
isWorkInState(String workId,
Work.State state)
Checks if a work instance with the given id is in the given state.
|
protected byte[] |
keyBytes(String prefix) |
protected byte[] |
keyBytes(String prefix,
String queueId) |
protected List<Work> |
listCompleted(String queueId) |
protected List<String> |
listCompletedIds(String queueId) |
protected List<String> |
listNonCompletedIds(String queueId) |
protected List<Work> |
listRunning(String queueId) |
protected List<String> |
listRunningIds(String queueId) |
protected List<Work> |
listScheduled(String queueId) |
protected List<String> |
listScheduledIds(String queueId) |
List<Work> |
listWork(String queueId,
Work.State state)
Lists the work instances in a given queue in a defined state.
|
List<String> |
listWorkIds(String queueId,
Work.State state)
Lists the work ids in a given queue in a defined state.
|
protected List<String> |
listWorkIdsList(byte[] queueBytes) |
protected List<String> |
listWorkIdsSet(byte[] queueBytes) |
protected List<Work> |
listWorkList(byte[] queueBytes) |
protected List<Work> |
listWorkSet(byte[] queueBytes) |
protected BlockingQueue<Runnable> |
newBlockingQueue(String queueId) |
protected byte[] |
queuedKey(String queueId) |
protected void |
removeAllCompletedWork(String queueId) |
protected void |
removeCompletedWork(String queueId,
long completionTime) |
Work |
removeScheduled(String queueId,
String workId)
Finds a scheduled work instance and removes it from the scheduled queue.
|
protected Work |
removeScheduledWork(String queueId,
String workId)
Removes a given work from queue, move the work from scheduled to completed set.
|
protected byte[] |
runningKey(String queueId) |
protected byte[] |
scheduledKey(String queueId) |
int |
scheduleSuspendedWork(String queueId)
Resumes all suspended work instances by moving them to the scheduled queue.
|
protected byte[] |
serializeWork(Work work) |
int |
setSuspending(String queueId)
Notifies this queuing that all work should be suspending.
|
protected byte[] |
stateKey() |
protected String |
stateKeyString() |
protected String |
string(byte[] bytes) |
protected byte[] |
suspendedKey(String queueId) |
int |
suspendScheduledWork(String queueId)
Suspends all scheduled work instances by moving them to the suspended queue.
|
void |
workCompleted(String queueId,
Work work)
Moves a work instance from the running set to the completed set.
|
void |
workRunning(String queueId,
Work work)
Put the work instance into the running set.
|
boolean |
workSchedule(String queueId,
Work work)
Submit a work to the
ThreadPoolExecutor and put it in the scheduled set. |
protected void |
workSetCompleted(String queueId,
Work work)
Switches a work to state completed, and saves its new state.
|
protected void |
workSetRunning(String queueId,
Work work)
Switches a work to state running.
|
protected static final String UTF_8
protected static final String KEY_DATA
protected static final String KEY_STATE
protected static final String KEY_SUSPENDED_PREFIX
protected static final String KEY_QUEUE_PREFIX
protected static final String KEY_SCHEDULED_PREFIX
protected static final String KEY_RUNNING_PREFIX
protected static final String KEY_COMPLETED_PREFIX
protected static final byte STATE_SCHEDULED_B
protected static final byte STATE_CANCELED_B
protected static final byte STATE_RUNNING_B
protected static final byte STATE_COMPLETED_B
protected static final byte[] STATE_SCHEDULED
protected static final byte[] STATE_CANCELED
protected static final byte[] STATE_RUNNING
protected static final byte[] STATE_COMPLETED
protected final WorkManagerImpl mgr
protected Map<String,BlockingQueue<Runnable>> allQueued
protected RedisExecutor redisExecutor
protected RedisAdmin redisAdmin
protected String redisNamespace
protected String delCompletedSha
public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors)
public void init()
WorkQueuing
WorkQueuing
and attempts to resume work previously suspended and saved at
shutdown time.init
in interface WorkQueuing
public BlockingQueue<Runnable> initWorkQueue(String queueId)
WorkQueuing
ThreadPoolExecutor
.initWorkQueue
in interface WorkQueuing
public boolean workSchedule(String queueId, Work work)
WorkQueuing
ThreadPoolExecutor
and put it in the scheduled set.workSchedule
in interface WorkQueuing
queueId
- the queue idwork
- the work instancepublic BlockingQueue<Runnable> getWorkQueue(String queueId)
public void workRunning(String queueId, Work work)
WorkQueuing
workRunning
in interface WorkQueuing
queueId
- the queue idwork
- the work instancepublic void workCompleted(String queueId, Work work)
WorkQueuing
workCompleted
in interface WorkQueuing
queueId
- the queue idwork
- the work instanceprotected BlockingQueue<Runnable> newBlockingQueue(String queueId)
public List<Work> listWork(String queueId, Work.State state)
WorkQueuing
Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as COMPLETED could be found FAILED.
listWork
in interface WorkQueuing
queueId
- the queue idstate
- the state defining the state to look into, SCHEDULED
, RUNNING
, COMPLETED
, or null
for non-completedpublic List<String> listWorkIds(String queueId, Work.State state)
WorkQueuing
listWorkIds
in interface WorkQueuing
queueId
- the queue idstate
- the state defining the state to look into, SCHEDULED
, RUNNING
, COMPLETED
, or null
for non-completedprotected List<Work> listScheduled(String queueId)
protected List<Work> listRunning(String queueId)
protected List<Work> listCompleted(String queueId)
protected List<String> listScheduledIds(String queueId)
protected List<String> listRunningIds(String queueId)
protected List<String> listNonCompletedIds(String queueId)
protected List<String> listCompletedIds(String queueId)
public int count(String queueId, Work.State state)
WorkQueuing
count
in interface WorkQueuing
queueId
- the queue idstate
- the state, SCHEDULED
, RUNNING
or
COMPLETED
protected int getScheduledSize(String queueId)
protected int getRunningSize(String queueId)
protected int getCompletedSize(String queueId)
public Work find(String workId, Work.State state)
WorkQueuing
find
in interface WorkQueuing
workId
- the id of the work to findstate
- the state defining the state to look into, SCHEDULED
, RUNNING
, COMPLETED
, or null
for SCHEDULED or RUNNINGnull
if not foundpublic boolean isWorkInState(String workId, Work.State state)
WorkQueuing
isWorkInState
in interface WorkQueuing
workId
- the work idstate
- the state, SCHEDULED
, RUNNING
, COMPLETED
, or null
for non-completedtrue
if a work instance with the given id is in the given statepublic Work removeScheduled(String queueId, String workId)
WorkQueuing
removeScheduled
in interface WorkQueuing
queueId
- the queue idworkId
- the id of the work to findnull
public Work.State getWorkState(String workId)
WorkQueuing
This can be Work.State.SCHEDULED
, Work.State.RUNNING
, Work.State.COMPLETED
, Work.State.FAILED
, or
Work.State.CANCELED
.
getWorkState
in interface WorkQueuing
workId
- the id of the work to findnull
if not foundpublic int setSuspending(String queueId)
WorkQueuing
setSuspending
in interface WorkQueuing
public void clearCompletedWork(String queueId, long completionTime)
WorkQueuing
clearCompletedWork
in interface WorkQueuing
queueId
- the queue idcompletionTime
- the completion time (milliseconds since epoch) before which completed work instances are
cleared, or 0
for allprotected byte[] suspendedKey(String queueId)
protected byte[] runningKey(String queueId)
protected byte[] scheduledKey(String queueId)
protected byte[] completedKey(String queueId)
protected String completedKeyString(String queueId)
protected byte[] stateKey()
protected String stateKeyString()
protected byte[] dataKey()
protected String dataKeyString()
protected byte[] serializeWork(Work work) throws IOException
IOException
protected Work deserializeWork(byte[] workBytes)
protected int getRedisListSize(byte[] key) throws IOException
IOException
protected int getRedisSetSize(byte[] key) throws IOException
IOException
public void addScheduledWork(String queueId, Work work) throws IOException
queueId
- the queue idwork
- the work instanceIOException
protected Set<String> getSuspendedQueueIds() throws IOException
IOException
protected Set<String> getScheduledQueueIds()
protected Set<String> getRunningQueueIds()
public Set<String> getCompletedQueueIds()
WorkQueuing
getCompletedQueueIds
in interface WorkQueuing
protected Set<String> getQueueIds(String queuePrefix)
public int scheduleSuspendedWork(String queueId) throws IOException
queueId
- the queue idIOException
public int suspendScheduledWork(String queueId) throws IOException
queueId
- the queue idIOException
protected void workSetRunning(String queueId, Work work) throws IOException
queueId
- the queue idwork
- the workIOException
protected void workSetCompleted(String queueId, Work work) throws IOException
IOException
protected Work.State getWorkStateInfo(String workId)
workId
- the work idnull
if not foundprotected List<String> listWorkIdsList(byte[] queueBytes) throws IOException
IOException
protected List<String> listWorkIdsSet(byte[] queueBytes) throws IOException
IOException
protected List<Work> listWorkList(byte[] queueBytes) throws IOException
IOException
protected List<Work> listWorkSet(byte[] queueBytes) throws IOException
IOException
protected Work getWorkData(byte[] workIdBytes) throws IOException
IOException
protected Work getWorkFromQueue(String queueId) throws IOException
queueId
- the queue idnull
if the scheduled queue is emptyIOException
protected Work removeScheduledWork(String queueId, String workId) throws IOException
IOException
protected void removeAllCompletedWork(String queueId) throws IOException
IOException
protected void removeCompletedWork(String queueId, long completionTime) throws IOException
IOException
Copyright © 2016 Nuxeo SA. All rights reserved.