001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.redis.RedisAdmin;
043import org.nuxeo.ecm.core.redis.RedisCallable;
044import org.nuxeo.ecm.core.redis.RedisExecutor;
045import org.nuxeo.ecm.core.work.WorkHolder;
046import org.nuxeo.ecm.core.work.WorkManagerImpl;
047import org.nuxeo.ecm.core.work.WorkQueueDescriptorRegistry;
048import org.nuxeo.ecm.core.work.WorkQueuing;
049import org.nuxeo.ecm.core.work.api.Work;
050import org.nuxeo.ecm.core.work.api.Work.State;
051import org.nuxeo.runtime.api.Framework;
052
053import redis.clients.jedis.Jedis;
054import redis.clients.jedis.Pipeline;
055import redis.clients.jedis.Protocol;
056import redis.clients.jedis.ScanParams;
057import redis.clients.jedis.ScanResult;
058import redis.clients.util.SafeEncoder;
059
060/**
061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
062 *
063 * @since 5.8
064 */
065public class RedisWorkQueuing implements WorkQueuing {
066
067    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
068
069    protected static final String UTF_8 = "UTF-8";
070
071    /**
072     * Global hash of Work instance id -> serialized Work instance.
073     */
074    protected static final String KEY_DATA = "data";
075
076    /**
077     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
078     * a completion time in milliseconds.
079     */
080    protected static final String KEY_STATE = "state";
081
082    /**
083     * Per-queue list of suspended Work instance ids.
084     */
085    protected static final String KEY_SUSPENDED_PREFIX = "prev:";
086
087    /**
088     * Per-queue list of scheduled Work instance ids.
089     */
090    protected static final String KEY_QUEUE_PREFIX = "queue:";
091
092    /**
093     * Per-queue set of scheduled Work instance ids.
094     */
095    protected static final String KEY_SCHEDULED_PREFIX = "sched:";
096
097    /**
098     * Per-queue set of running Work instance ids.
099     */
100    protected static final String KEY_RUNNING_PREFIX = "run:";
101
102    /**
103     * Per-queue set of completed Work instance ids.
104     */
105    protected static final String KEY_COMPLETED_PREFIX = "done:";
106
107    protected static final byte STATE_SCHEDULED_B = 'Q';
108
109    protected static final byte STATE_CANCELED_B = 'X';
110
111    protected static final byte STATE_RUNNING_B = 'R';
112
113    protected static final byte STATE_COMPLETED_B = 'C';
114
115    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
116
117    protected static final byte[] STATE_CANCELED = new byte[] { STATE_CANCELED_B };
118
119    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
120
121    protected static final byte[] STATE_COMPLETED = new byte[] { STATE_COMPLETED_B };
122
123    protected final WorkManagerImpl mgr;
124
125    // @GuardedBy("this")
126    protected Map<String, BlockingQueue<Runnable>> allQueued = new HashMap<String, BlockingQueue<Runnable>>();
127
128    protected RedisExecutor redisExecutor;
129
130    protected RedisAdmin redisAdmin;
131
132    protected String redisNamespace;
133
134    protected String delCompletedSha;
135
136    public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) {
137        this.mgr = mgr;
138    }
139
140    @Override
141    public void init() {
142        redisExecutor = Framework.getLocalService(RedisExecutor.class);
143        redisAdmin = Framework.getService(RedisAdmin.class);
144        redisNamespace = redisAdmin.namespace("work");
145        try {
146            for (String queueId : getSuspendedQueueIds()) {
147                int n = scheduleSuspendedWork(queueId);
148                log.info("Re-scheduling " + n + " work instances suspended from queue: " + queueId);
149            }
150        } catch (IOException e) {
151            throw new RuntimeException(e);
152        }
153        try {
154            delCompletedSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "del-completed");
155        } catch (IOException e) {
156            throw new RuntimeException(e);
157        }
158    }
159
160    @Override
161    public BlockingQueue<Runnable> initWorkQueue(String queueId) {
162        if (allQueued.containsKey(queueId)) {
163            throw new IllegalStateException(queueId + " is already configured");
164        }
165        final BlockingQueue<Runnable> queue = newBlockingQueue(queueId);
166        allQueued.put(queueId, queue);
167        return queue;
168    }
169
170    @Override
171    public boolean workSchedule(String queueId, Work work) {
172        return getWorkQueue(queueId).offer(new WorkHolder(work));
173    }
174
175    public BlockingQueue<Runnable> getWorkQueue(String queueId) {
176        if (!allQueued.containsKey(queueId)) {
177            throw new IllegalStateException(queueId + " was not configured yet");
178        }
179        return allQueued.get(queueId);
180    }
181
182
183    @Override
184    public void workRunning(String queueId, Work work) {
185        try {
186            workSetRunning(queueId, work);
187        } catch (IOException e) {
188            throw new RuntimeException(e);
189        }
190    }
191
192    @Override
193    public void workCompleted(String queueId, Work work) {
194        try {
195            workSetCompleted(queueId, work);
196        } catch (IOException e) {
197            throw new RuntimeException(e);
198        }
199    }
200
201    protected BlockingQueue<Runnable> newBlockingQueue(String queueId) {
202        return new RedisBlockingQueue(queueId, this);
203    }
204
205    @Override
206    public List<Work> listWork(String queueId, State state) {
207        switch (state) {
208        case SCHEDULED:
209            return listScheduled(queueId);
210        case RUNNING:
211            return listRunning(queueId);
212        case COMPLETED:
213            return listCompleted(queueId);
214        default:
215            throw new IllegalArgumentException(String.valueOf(state));
216        }
217    }
218
219    @Override
220    public List<String> listWorkIds(String queueId, State state) {
221        if (state == null) {
222            return listNonCompletedIds(queueId);
223        }
224        switch (state) {
225        case SCHEDULED:
226            return listScheduledIds(queueId);
227        case RUNNING:
228            return listRunningIds(queueId);
229        case COMPLETED:
230            return listCompletedIds(queueId);
231        default:
232            throw new IllegalArgumentException(String.valueOf(state));
233        }
234    }
235
236    protected List<Work> listScheduled(String queueId) {
237        try {
238            return listWorkList(queuedKey(queueId));
239        } catch (IOException e) {
240            throw new RuntimeException(e);
241        }
242    }
243
244    protected List<Work> listRunning(String queueId) {
245        try {
246            return listWorkSet(runningKey(queueId));
247        } catch (IOException e) {
248            throw new RuntimeException(e);
249        }
250    }
251
252    protected List<Work> listCompleted(String queueId) {
253        try {
254            return listWorkSet(completedKey(queueId));
255        } catch (IOException e) {
256            throw new RuntimeException(e);
257        }
258    }
259
260    protected List<String> listScheduledIds(String queueId) {
261        try {
262            return listWorkIdsList(queuedKey(queueId));
263        } catch (IOException e) {
264            throw new RuntimeException(e);
265        }
266    }
267
268    protected List<String> listRunningIds(String queueId) {
269        try {
270            return listWorkIdsSet(runningKey(queueId));
271        } catch (IOException e) {
272            throw new RuntimeException(e);
273        }
274    }
275
276    protected List<String> listNonCompletedIds(String queueId) {
277        List<String> list = listScheduledIds(queueId);
278        list.addAll(listRunningIds(queueId));
279        return list;
280    }
281
282    protected List<String> listCompletedIds(String queueId) {
283        try {
284            return listWorkIdsSet(completedKey(queueId));
285        } catch (IOException e) {
286            throw new RuntimeException(e);
287        }
288    }
289
290    @Override
291    public int count(String queueId, State state) {
292        switch (state) {
293        case SCHEDULED:
294            return getScheduledSize(queueId);
295        case RUNNING:
296            return getRunningSize(queueId);
297        case COMPLETED:
298            return getCompletedSize(queueId);
299        default:
300            throw new IllegalArgumentException(String.valueOf(state));
301        }
302    }
303
304    protected int getScheduledSize(String queueId) {
305        try {
306            return getRedisSetSize(scheduledKey(queueId));
307        } catch (IOException e) {
308            throw new RuntimeException(e);
309        }
310    }
311
312    protected int getRunningSize(String queueId) {
313        try {
314            return getRedisSetSize(runningKey(queueId));
315        } catch (IOException e) {
316            throw new RuntimeException(e);
317        }
318    }
319
320    protected int getCompletedSize(String queueId) {
321        try {
322            return getRedisSetSize(completedKey(queueId));
323        } catch (IOException e) {
324            throw new RuntimeException(e);
325        }
326    }
327
328    @Override
329    public Work find(String workId, State state) {
330        if (isWorkInState(workId, state)) {
331            return getWork(bytes(workId));
332        }
333        return null;
334    }
335
336    @Override
337    public boolean isWorkInState(String workId, State state) {
338        State s = getWorkState(workId);
339        if (state == null) {
340            return s == State.SCHEDULED || s == State.RUNNING;
341        }
342        return s == state;
343    }
344
345    @Override
346    public Work removeScheduled(String queueId, String workId) {
347        try {
348            return removeScheduledWork(queueId, workId);
349        } catch (IOException cause) {
350            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
351        }
352    }
353
354    @Override
355    public State getWorkState(String workId) {
356        return getWorkStateInfo(workId);
357    }
358
359    @Override
360    public int setSuspending(String queueId) {
361        try {
362            int n = suspendScheduledWork(queueId);
363            log.info("Suspending " + n + " work instances from queue: " + queueId);
364            allQueued.remove(queueId);
365            return n;
366        } catch (IOException e) {
367            throw new RuntimeException(e);
368        }
369    }
370
371    @Override
372    public void clearCompletedWork(String queueId, long completionTime) {
373        try {
374            if (completionTime <= 0) {
375                removeAllCompletedWork(queueId);
376            } else {
377                removeCompletedWork(queueId, completionTime);
378            }
379        } catch (IOException e) {
380            throw new RuntimeException(e);
381        }
382    }
383
384    /*
385     * ******************** Redis Interface ********************
386     */
387
388    protected String string(byte[] bytes) {
389        try {
390            return new String(bytes, UTF_8);
391        } catch (IOException e) {
392            throw new RuntimeException(e);
393        }
394    }
395
396    protected byte[] bytes(String string) {
397        try {
398            return string.getBytes(UTF_8);
399        } catch (IOException e) {
400            // cannot happen for UTF-8
401            throw new RuntimeException(e);
402        }
403    }
404
405    protected byte[] keyBytes(String prefix, String queueId) {
406        return keyBytes(prefix + queueId);
407    }
408
409    protected byte[] keyBytes(String prefix) {
410        return bytes(redisNamespace + prefix);
411    }
412
413    protected byte[] suspendedKey(String queueId) {
414        return keyBytes(KEY_SUSPENDED_PREFIX, queueId);
415    }
416
417    protected byte[] queuedKey(String queueId) {
418        return keyBytes(KEY_QUEUE_PREFIX, queueId);
419    }
420
421    protected byte[] runningKey(String queueId) {
422        return keyBytes(KEY_RUNNING_PREFIX, queueId);
423    }
424
425    protected byte[] scheduledKey(String queueId) {
426        return keyBytes(KEY_SCHEDULED_PREFIX, queueId);
427    }
428
429    protected byte[] completedKey(String queueId) {
430        return keyBytes(KEY_COMPLETED_PREFIX, queueId);
431    }
432
433    protected String completedKeyString(String queueId) {
434        return redisNamespace + KEY_COMPLETED_PREFIX + queueId;
435    }
436
437    protected byte[] stateKey() {
438        return keyBytes(KEY_STATE);
439    }
440
441    protected String stateKeyString() {
442        return redisNamespace + KEY_STATE;
443    }
444
445    protected byte[] dataKey() {
446        return keyBytes(KEY_DATA);
447    }
448
449    protected String dataKeyString() {
450        return redisNamespace + KEY_DATA;
451    }
452
453    protected byte[] serializeWork(Work work) throws IOException {
454        ByteArrayOutputStream baout = new ByteArrayOutputStream();
455        ObjectOutputStream out = new ObjectOutputStream(baout);
456        out.writeObject(work);
457        out.flush();
458        out.close();
459        return baout.toByteArray();
460    }
461
462    protected Work deserializeWork(byte[] workBytes) {
463        if (workBytes == null) {
464            return null;
465        }
466        InputStream bain = new ByteArrayInputStream(workBytes);
467        try (ObjectInputStream in = new ObjectInputStream(bain)) {
468            return (Work) in.readObject();
469        } catch (RuntimeException cause) {
470            throw cause;
471        } catch (IOException | ClassNotFoundException cause) {
472            throw new RuntimeException("Cannot deserialize work", cause);
473        }
474    }
475
476    protected int getRedisListSize(final byte[] key) throws IOException {
477        return redisExecutor.execute(new RedisCallable<Long>() {
478
479            @Override
480            public Long call(Jedis jedis) {
481                return jedis.llen(key);
482            }
483
484        }).intValue();
485    }
486
487    protected int getRedisSetSize(final byte[] key) throws IOException {
488        return redisExecutor.execute(new RedisCallable<Long>() {
489
490            @Override
491            public Long call(Jedis jedis) {
492                return jedis.scard(key);
493            }
494
495        }).intValue();
496    }
497
498
499    /**
500     * Persists a work instance and adds it to the scheduled queue.
501     *
502     * @param queueId the queue id
503     * @param work the work instance
504     * @throws IOException
505     */
506    public void addScheduledWork(final String queueId, Work work) throws IOException {
507        log.debug("Add scheduled " + work);
508        final byte[] workIdBytes = bytes(work.getId());
509
510        // serialize Work
511        final byte[] workBytes = serializeWork(work);
512
513        redisExecutor.execute(new RedisCallable<Void>() {
514
515            @Override
516            public Void call(Jedis jedis) {
517                if (redisExecutor.supportPipelined()) {
518                    Pipeline pipe = jedis.pipelined();
519                    pipe.hset(dataKey(), workIdBytes, workBytes);
520                    pipe.hset(stateKey(), workIdBytes, STATE_SCHEDULED);
521                    pipe.lpush(queuedKey(queueId), workIdBytes);
522                    pipe.sadd(scheduledKey(queueId), workIdBytes);
523                    pipe.sync();
524                } else {
525                    jedis.hset(dataKey(), workIdBytes, workBytes);
526                    jedis.hset(stateKey(), workIdBytes, STATE_SCHEDULED);
527                    jedis.lpush(queuedKey(queueId), workIdBytes);
528                    jedis.sadd(scheduledKey(queueId), workIdBytes);
529                }
530                return null;
531            }
532
533        });
534    }
535
536    /**
537     * Finds which queues have suspended work.
538     *
539     * @return a set of queue ids
540     * @since 5.8
541     */
542    protected Set<String> getSuspendedQueueIds() throws IOException {
543        return getQueueIds(KEY_SUSPENDED_PREFIX);
544    }
545
546    protected Set<String> getScheduledQueueIds() {
547        return getQueueIds(KEY_QUEUE_PREFIX);
548    }
549
550    protected Set<String> getRunningQueueIds() {
551        return getQueueIds(KEY_RUNNING_PREFIX);
552    }
553
554    @Override
555    public Set<String> getCompletedQueueIds() {
556        return getQueueIds(KEY_COMPLETED_PREFIX);
557    }
558
559    /**
560     * Finds which queues have work for a given state prefix.
561     *
562     * @return a set of queue ids
563     * @since 5.8
564     */
565    protected Set<String> getQueueIds(final String queuePrefix) {
566        return redisExecutor.execute(new RedisCallable<Set<String>>() {
567            @Override
568            public Set<String> call(Jedis jedis) {
569                int offset = keyBytes(queuePrefix).length;
570                Set<byte[]> keys = jedis.keys(keyBytes(queuePrefix, "*"));
571                Set<String> queueIds = new HashSet<String>(keys.size());
572                for (byte[] bytes : keys) {
573                    try {
574                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
575                        queueIds.add(queueId);
576                    } catch (IOException e) {
577                        throw new NuxeoException(e);
578                    }
579                }
580                return queueIds;
581            }
582
583        });
584    }
585
586    /**
587     * Resumes all suspended work instances by moving them to the scheduled queue.
588     *
589     * @param queueId the queue id
590     * @return the number of work instances scheduled
591     * @since 5.8
592     */
593    public int scheduleSuspendedWork(final String queueId) throws IOException {
594        return redisExecutor.execute(new RedisCallable<Integer>() {
595            @Override
596            public Integer call(Jedis jedis) {
597                for (int n = 0;; n++) {
598                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
599                    if (workIdBytes == null) {
600                        return Integer.valueOf(n);
601                    }
602                }
603            }
604
605        }).intValue();
606    }
607
608    /**
609     * Suspends all scheduled work instances by moving them to the suspended queue.
610     *
611     * @param queueId the queue id
612     * @return the number of work instances suspended
613     * @since 5.8
614     */
615    public int suspendScheduledWork(final String queueId) throws IOException {
616        return redisExecutor.execute(new RedisCallable<Integer>() {
617
618            @Override
619            public Integer call(Jedis jedis) {
620                for (int n = 0;; n++) {
621                    byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
622                    if (workIdBytes == null) {
623                        return n;
624                    }
625                }
626            }
627        }).intValue();
628    }
629
630    /**
631     * Switches a work to state running.
632     *
633     * @param queueId the queue id
634     * @param work the work
635     */
636    protected void workSetRunning(final String queueId, Work work) throws IOException {
637        final byte[] workIdBytes = bytes(work.getId());
638        redisExecutor.execute(new RedisCallable<Void>() {
639
640            @Override
641            public Void call(Jedis jedis) {
642                if (redisExecutor.supportPipelined()) {
643                    Pipeline pipe = jedis.pipelined();
644                    pipe.sadd(runningKey(queueId), workIdBytes);
645                    pipe.srem(scheduledKey(queueId), workIdBytes);
646                    pipe.hset(stateKey(), workIdBytes, STATE_RUNNING);
647                    pipe.sync();
648                } else {
649                    jedis.sadd(runningKey(queueId), workIdBytes);
650                    jedis.srem(scheduledKey(queueId), workIdBytes);
651                    jedis.hset(stateKey(), workIdBytes, STATE_RUNNING);
652                }
653                return null;
654            }
655        });
656    }
657
658    /**
659     * Switches a work to state completed, and saves its new state.
660     */
661    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
662        final byte[] workIdBytes = bytes(work.getId());
663        final byte[] workBytes = serializeWork(work);
664        redisExecutor.execute(new RedisCallable<Void>() {
665
666            @Override
667            public Void call(Jedis jedis) {
668                byte[] completedBytes = bytes(((char) STATE_COMPLETED_B) + String.valueOf(work.getCompletionTime()));
669                if (redisExecutor.supportPipelined()) {
670                    Pipeline pipe = jedis.pipelined();
671                    // store (updated) content in hash
672                    pipe.hset(dataKey(), workIdBytes, workBytes);
673                    // remove key from running set
674                    pipe.srem(runningKey(queueId), workIdBytes);
675                    // put key in completed set
676                    pipe.sadd(completedKey(queueId), workIdBytes);
677                    // set state to completed
678                    pipe.hset(stateKey(), workIdBytes, completedBytes);
679                    pipe.sync();
680                } else {
681                    jedis.hset(dataKey(), workIdBytes, workBytes);
682                    jedis.srem(runningKey(queueId), workIdBytes);
683                    jedis.sadd(completedKey(queueId), workIdBytes);
684                    jedis.hset(stateKey(), workIdBytes, completedBytes);
685                }
686                return null;
687            }
688        });
689    }
690
691    /**
692     * Gets the work state.
693     *
694     * @param workId the work id
695     * @return the state, or {@code null} if not found
696     */
697    protected State getWorkStateInfo(final String workId) {
698        final byte[] workIdBytes = bytes(workId);
699        return redisExecutor.execute(new RedisCallable<State>() {
700            @Override
701            public State call(Jedis jedis) {
702                // get state
703                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
704                if (bytes == null || bytes.length == 0) {
705                    return null;
706                }
707                switch (bytes[0]) {
708                case STATE_SCHEDULED_B:
709                    return State.SCHEDULED;
710                case STATE_CANCELED_B:
711                    return State.CANCELED;
712                case STATE_RUNNING_B:
713                    return State.RUNNING;
714                case STATE_COMPLETED_B:
715                    return State.COMPLETED;
716                default:
717                    String msg;
718                    try {
719                        msg = new String(bytes, UTF_8);
720                    } catch (UnsupportedEncodingException e) {
721                        msg = Arrays.toString(bytes);
722                    }
723                    log.error("Unknown work state: " + msg + ", work: " + workId);
724                    return null;
725                }
726            }
727        });
728    }
729
730    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
731        return redisExecutor.execute(new RedisCallable<List<String>>() {
732
733            @Override
734            public List<String> call(Jedis jedis) {
735                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
736                List<String> list = new ArrayList<String>(keys.size());
737                for (byte[] workIdBytes : keys) {
738                    list.add(string(workIdBytes));
739                }
740                return list;
741            }
742
743        });
744    }
745
746    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
747        return redisExecutor.execute(new RedisCallable<List<String>>() {
748
749            @Override
750            public List<String> call(Jedis jedis) {
751
752                Set<byte[]> keys = jedis.smembers(queueBytes);
753                List<String> list = new ArrayList<String>(keys.size());
754                for (byte[] workIdBytes : keys) {
755                    list.add(string(workIdBytes));
756                }
757                return list;
758            }
759
760        });
761
762    }
763
764    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
765        return redisExecutor.execute(new RedisCallable<List<Work>>() {
766            @Override
767            public List<Work> call(Jedis jedis) {
768                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
769                List<Work> list = new ArrayList<Work>(keys.size());
770                for (byte[] workIdBytes : keys) {
771                    // get data
772                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
773                    Work work = deserializeWork(workBytes);
774                    list.add(work);
775                }
776                return list;
777            }
778        });
779    }
780
781    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
782        return redisExecutor.execute(new RedisCallable<List<Work>>() {
783            @Override
784            public List<Work> call(Jedis jedis) {
785                Set<byte[]> keys = jedis.smembers(queueBytes);
786                List<Work> list = new ArrayList<Work>(keys.size());
787                for (byte[] workIdBytes : keys) {
788                    // get data
789                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
790                    Work work = deserializeWork(workBytes);
791                    list.add(work);
792                }
793                return list;
794            }
795        });
796    }
797
798    protected Work getWork(byte[] workIdBytes) {
799        try {
800            return getWorkData(workIdBytes);
801        } catch (IOException e) {
802            throw new RuntimeException(e);
803        }
804    }
805
806    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
807        return redisExecutor.execute(new RedisCallable<Work>() {
808
809            @Override
810            public Work call(Jedis jedis) {
811                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
812                return deserializeWork(workBytes);
813            }
814
815        });
816    }
817
818    /**
819     * Removes first work from work queue.
820     *
821     * @param queueId the queue id
822     * @return the work, or {@code null} if the scheduled queue is empty
823     */
824    protected Work getWorkFromQueue(final String queueId) throws IOException {
825        return redisExecutor.execute(new RedisCallable<Work>() {
826
827            @Override
828            public Work call(Jedis jedis) {
829                // pop from queue
830                byte[] workIdBytes = jedis.rpop(queuedKey(queueId));
831                if (workIdBytes == null) {
832                    return null;
833                }
834                // get data
835                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
836                return deserializeWork(workBytes);
837            }
838
839        });
840    }
841
842    /**
843     * Removes a given work from queue, move the work from scheduled to completed set.
844     *
845     * @throws IOException
846     */
847    protected Work removeScheduledWork(final String queueId, final String workId) throws IOException {
848        final byte[] workIdBytes = bytes(workId);
849        return redisExecutor.execute(new RedisCallable<Work>() {
850
851            @Override
852            public Work call(Jedis jedis) {
853                // remove from queue
854                Long n = jedis.lrem(queuedKey(queueId), 0, workIdBytes);
855                if (n == null || n.intValue() == 0) {
856                    return null;
857                }
858                // remove from set
859                jedis.srem(scheduledKey(queueId), workIdBytes);
860                // set state to completed at current time
861                byte[] completedBytes = bytes(String.valueOf(System.currentTimeMillis()));
862                jedis.hset(stateKey(), workIdBytes, completedBytes);
863                // get data
864                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
865                return deserializeWork(workBytes);
866            }
867
868        });
869    }
870
871    /**
872     * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
873     * not support SSCAN.
874     *
875     * @since 7.3
876     */
877    public static class SScanner {
878
879        // results of SMEMBERS for last key, in embedded mode
880        protected List<String> smembers;
881
882        protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) {
883            ScanResult<String> scanResult;
884            try {
885                scanResult = jedis.sscan(key, cursor, params);
886            } catch (Exception e) {
887                // when testing with embedded fake redis, we may get an un-declared exception
888                if (!(e.getCause() instanceof NoSuchMethodException)) {
889                    throw e;
890                }
891                // no SSCAN available in embedded, do a full SMEMBERS
892                if (smembers == null) {
893                    Set<String> set = jedis.smembers(key);
894                    smembers = new ArrayList<>(set);
895                }
896                Collection<byte[]> bparams = params.getParams();
897                int count = 1000;
898                for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) {
899                    byte[] param = it.next();
900                    if (param.equals(Protocol.Keyword.MATCH.raw)) {
901                        throw new UnsupportedOperationException("MATCH not supported");
902                    }
903                    if (param.equals(Protocol.Keyword.COUNT.raw)) {
904                        count = Integer.parseInt(SafeEncoder.encode(it.next()));
905                    }
906                }
907                int pos = Integer.parseInt(cursor); // don't check range, callers are cool
908                int end = Math.min(pos + count, smembers.size());
909                int nextPos = end == smembers.size() ? 0 : end;
910                scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end));
911                if (nextPos == 0) {
912                    smembers = null;
913                }
914            }
915            return scanResult;
916        }
917    }
918
919    /**
920     * Don't pass more than approx. this number of parameters to Lua calls.
921     */
922    private static final int BATCH_SIZE = 5000;
923
924    protected void removeAllCompletedWork(final String queueId) throws IOException {
925        removeCompletedWork(queueId, 0);
926    }
927
928    protected void removeCompletedWork(final String queueId, final long completionTime) throws IOException {
929        redisExecutor.execute(new RedisCallable<Void>() {
930
931            @Override
932            public Void call(Jedis jedis) {
933                String completedKey = completedKeyString(queueId);
934                String stateKey = stateKeyString();
935                String dataKey = dataKeyString();
936                SScanner sscanner = new SScanner();
937                ScanParams scanParams = new ScanParams().count(BATCH_SIZE);
938                String cursor = "0";
939                do {
940                    ScanResult<String> scanResult = sscanner.sscan(jedis, completedKey, cursor, scanParams);
941                    cursor = scanResult.getStringCursor();
942                    List<String> workIds = scanResult.getResult();
943                    if (!workIds.isEmpty()) {
944                        // delete these works if before the completion time
945                        List<String> keys = Arrays.asList(completedKey, stateKey, dataKey);
946                        List<String> args = new ArrayList<String>(1 + workIds.size());
947                        args.add(String.valueOf(completionTime));
948                        args.addAll(workIds);
949                        jedis.evalsha(delCompletedSha, keys, args);
950                    }
951                } while (!"0".equals(cursor));
952                return null;
953            }
954        });
955    }
956
957}