001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.core.redis.contribs;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.io.UnsupportedEncodingException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.BlockingQueue;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.redis.RedisAdmin;
041import org.nuxeo.ecm.core.redis.RedisCallable;
042import org.nuxeo.ecm.core.redis.RedisExecutor;
043import org.nuxeo.ecm.core.work.WorkManagerImpl;
044import org.nuxeo.ecm.core.work.WorkQueueDescriptorRegistry;
045import org.nuxeo.ecm.core.work.WorkQueuing;
046import org.nuxeo.ecm.core.work.api.Work;
047import org.nuxeo.ecm.core.work.api.Work.State;
048import org.nuxeo.runtime.api.Framework;
049
050import redis.clients.jedis.Jedis;
051import redis.clients.jedis.Pipeline;
052import redis.clients.jedis.Protocol;
053import redis.clients.jedis.ScanParams;
054import redis.clients.jedis.ScanResult;
055import redis.clients.util.SafeEncoder;
056
057/**
058 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
059 *
060 * @since 5.8
061 */
062public class RedisWorkQueuing implements WorkQueuing {
063
064    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
065
066    protected static final String UTF_8 = "UTF-8";
067
068    /**
069     * Global hash of Work instance id -> serialized Work instance.
070     */
071    protected static final String KEY_DATA = "data";
072
073    /**
074     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
075     * a completion time in milliseconds.
076     */
077    protected static final String KEY_STATE = "state";
078
079    /**
080     * Per-queue list of suspended Work instance ids.
081     */
082    protected static final String KEY_SUSPENDED_PREFIX = "prev:";
083
084    /**
085     * Per-queue list of scheduled Work instance ids.
086     */
087    protected static final String KEY_SCHEDULED_PREFIX = "queue:";
088
089    /**
090     * Per-queue set of running Work instance ids.
091     */
092    protected static final String KEY_RUNNING_PREFIX = "run:";
093
094    /**
095     * Per-queue set of completed Work instance ids.
096     */
097    protected static final String KEY_COMPLETED_PREFIX = "done:";
098
099    protected static final byte STATE_SCHEDULED_B = 'Q';
100
101    protected static final byte STATE_CANCELED_B = 'X';
102
103    protected static final byte STATE_RUNNING_B = 'R';
104
105    protected static final byte STATE_COMPLETED_B = 'C';
106
107    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
108
109    protected static final byte[] STATE_CANCELED = new byte[] { STATE_CANCELED_B };
110
111    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
112
113    protected static final byte[] STATE_COMPLETED = new byte[] { STATE_COMPLETED_B };
114
115    protected final WorkManagerImpl mgr;
116
117    // @GuardedBy("this")
118    protected Map<String, BlockingQueue<Runnable>> allScheduled = new HashMap<String, BlockingQueue<Runnable>>();
119
120    protected RedisExecutor redisExecutor;
121
122    protected RedisAdmin redisAdmin;
123
124    protected String redisNamespace;
125
126    protected String delCompletedSha;
127
128    public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) {
129        this.mgr = mgr;
130    }
131
132    @Override
133    public void init() {
134        redisExecutor = Framework.getLocalService(RedisExecutor.class);
135        redisAdmin = Framework.getService(RedisAdmin.class);
136        redisNamespace = redisAdmin.namespace("work");
137        try {
138            for (String queueId : getSuspendedQueueIds()) {
139                int n = scheduleSuspendedWork(queueId);
140                log.info("Re-scheduling " + n + " work instances suspended from queue: " + queueId);
141            }
142        } catch (IOException e) {
143            throw new RuntimeException(e);
144        }
145        try {
146            delCompletedSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "del-completed");
147        } catch (IOException e) {
148            throw new RuntimeException(e);
149        }
150    }
151
152    @Override
153    public BlockingQueue<Runnable> initScheduleQueue(String queueId) {
154        if (allScheduled.containsKey(queueId)) {
155            throw new IllegalStateException(queueId + " is already configured");
156        }
157        final BlockingQueue<Runnable> scheduled = newBlockingQueue(queueId);
158        allScheduled.put(queueId, scheduled);
159        return scheduled;
160    }
161
162    @Override
163    public BlockingQueue<Runnable> getScheduledQueue(String queueId) {
164        if (!allScheduled.containsKey(queueId)) {
165            throw new IllegalStateException(queueId + " was not configured yet");
166        }
167        return allScheduled.get(queueId);
168    }
169
170    @Override
171    public void workRunning(String queueId, Work work) {
172        try {
173            workSetRunning(queueId, work);
174        } catch (IOException e) {
175            throw new RuntimeException(e);
176        }
177    }
178
179    @Override
180    public void workCompleted(String queueId, Work work) {
181        try {
182            workSetCompleted(queueId, work);
183        } catch (IOException e) {
184            throw new RuntimeException(e);
185        }
186    }
187
188    protected BlockingQueue<Runnable> newBlockingQueue(String queueId) {
189        return new RedisBlockingQueue(queueId, this);
190    }
191
192    @Override
193    public List<Work> listWork(String queueId, State state) {
194        switch (state) {
195        case SCHEDULED:
196            return listScheduled(queueId);
197        case RUNNING:
198            return listRunning(queueId);
199        case COMPLETED:
200            return listCompleted(queueId);
201        default:
202            throw new IllegalArgumentException(String.valueOf(state));
203        }
204    }
205
206    @Override
207    public List<String> listWorkIds(String queueId, State state) {
208        if (state == null) {
209            return listNonCompletedIds(queueId);
210        }
211        switch (state) {
212        case SCHEDULED:
213            return listScheduledIds(queueId);
214        case RUNNING:
215            return listRunningIds(queueId);
216        case COMPLETED:
217            return listCompletedIds(queueId);
218        default:
219            throw new IllegalArgumentException(String.valueOf(state));
220        }
221    }
222
223    protected List<Work> listScheduled(String queueId) {
224        try {
225            return listWorkList(scheduledKey(queueId));
226        } catch (IOException e) {
227            throw new RuntimeException(e);
228        }
229    }
230
231    protected List<Work> listRunning(String queueId) {
232        try {
233            return listWorkSet(runningKey(queueId));
234        } catch (IOException e) {
235            throw new RuntimeException(e);
236        }
237    }
238
239    protected List<Work> listCompleted(String queueId) {
240        try {
241            return listWorkSet(completedKey(queueId));
242        } catch (IOException e) {
243            throw new RuntimeException(e);
244        }
245    }
246
247    protected List<String> listScheduledIds(String queueId) {
248        try {
249            return listWorkIdsList(scheduledKey(queueId));
250        } catch (IOException e) {
251            throw new RuntimeException(e);
252        }
253    }
254
255    protected List<String> listRunningIds(String queueId) {
256        try {
257            return listWorkIdsSet(runningKey(queueId));
258        } catch (IOException e) {
259            throw new RuntimeException(e);
260        }
261    }
262
263    protected List<String> listNonCompletedIds(String queueId) {
264        List<String> list = listScheduledIds(queueId);
265        list.addAll(listRunningIds(queueId));
266        return list;
267    }
268
269    protected List<String> listCompletedIds(String queueId) {
270        try {
271            return listWorkIdsSet(completedKey(queueId));
272        } catch (IOException e) {
273            throw new RuntimeException(e);
274        }
275    }
276
277    @Override
278    public int getQueueSize(String queueId, State state) {
279        switch (state) {
280        case SCHEDULED:
281            return getScheduledSize(queueId);
282        case RUNNING:
283            return getRunningSize(queueId);
284        case COMPLETED:
285            return getCompletedSize(queueId);
286        default:
287            throw new IllegalArgumentException(String.valueOf(state));
288        }
289    }
290
291    protected int getScheduledSize(String queueId) {
292        try {
293            return getScheduledQueueSize(queueId);
294        } catch (IOException e) {
295            throw new RuntimeException(e);
296        }
297    }
298
299    protected int getRunningSize(String queueId) {
300        try {
301            return getRunningQueueSize(queueId);
302        } catch (IOException e) {
303            throw new RuntimeException(e);
304        }
305    }
306
307    protected int getCompletedSize(String queueId) {
308        try {
309            return getCompletedQueueSize(queueId);
310        } catch (IOException e) {
311            throw new RuntimeException(e);
312        }
313    }
314
315    @Override
316    public Work find(String workId, State state) {
317        if (isWorkInState(workId, state)) {
318            return getWork(bytes(workId));
319        }
320        return null;
321    }
322
323    @Override
324    public boolean isWorkInState(String workId, State state) {
325        State s = getWorkState(workId);
326        if (state == null) {
327            return s == State.SCHEDULED || s == State.RUNNING;
328        }
329        return s == state;
330    }
331
332    @Override
333    public Work removeScheduled(String queueId, String workId) {
334        try {
335            return removeScheduledWork(queueId, workId);
336        } catch (IOException cause) {
337            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
338        }
339    }
340
341    @Override
342    public State getWorkState(String workId) {
343        return getWorkStateInfo(workId);
344    }
345
346    @Override
347    public int setSuspending(String queueId) {
348        try {
349            int n = suspendScheduledWork(queueId);
350            log.info("Suspending " + n + " work instances from queue: " + queueId);
351            return n;
352        } catch (IOException e) {
353            throw new RuntimeException(e);
354        }
355    }
356
357    @Override
358    public void clearCompletedWork(String queueId, long completionTime) {
359        try {
360            if (completionTime <= 0) {
361                removeAllCompletedWork(queueId);
362            } else {
363                removeCompletedWork(queueId, completionTime);
364            }
365        } catch (IOException e) {
366            throw new RuntimeException(e);
367        }
368    }
369
370    /*
371     * ******************** Redis Interface ********************
372     */
373
374    protected String string(byte[] bytes) {
375        try {
376            return new String(bytes, UTF_8);
377        } catch (IOException e) {
378            throw new RuntimeException(e);
379        }
380    }
381
382    protected byte[] bytes(String string) {
383        try {
384            return string.getBytes(UTF_8);
385        } catch (IOException e) {
386            // cannot happen for UTF-8
387            throw new RuntimeException(e);
388        }
389    }
390
391    protected byte[] keyBytes(String prefix, String queueId) {
392        return keyBytes(prefix + queueId);
393    }
394
395    protected byte[] keyBytes(String prefix) {
396        return bytes(redisNamespace + prefix);
397    }
398
399    protected byte[] suspendedKey(String queueId) {
400        return keyBytes(KEY_SUSPENDED_PREFIX, queueId);
401    }
402
403    protected byte[] scheduledKey(String queueId) {
404        return keyBytes(KEY_SCHEDULED_PREFIX, queueId);
405    }
406
407    protected byte[] runningKey(String queueId) {
408        return keyBytes(KEY_RUNNING_PREFIX, queueId);
409    }
410
411    protected byte[] completedKey(String queueId) {
412        return keyBytes(KEY_COMPLETED_PREFIX, queueId);
413    }
414
415    protected String completedKeyString(String queueId) {
416        return redisNamespace + KEY_COMPLETED_PREFIX + queueId;
417    }
418
419    protected byte[] stateKey() {
420        return keyBytes(KEY_STATE);
421    }
422
423    protected String stateKeyString() {
424        return redisNamespace + KEY_STATE;
425    }
426
427    protected byte[] dataKey() {
428        return keyBytes(KEY_DATA);
429    }
430
431    protected String dataKeyString() {
432        return redisNamespace + KEY_DATA;
433    }
434
435    protected byte[] serializeWork(Work work) throws IOException {
436        ByteArrayOutputStream baout = new ByteArrayOutputStream();
437        ObjectOutputStream out = new ObjectOutputStream(baout);
438        out.writeObject(work);
439        out.flush();
440        out.close();
441        return baout.toByteArray();
442    }
443
444    protected Work deserializeWork(byte[] workBytes) {
445        if (workBytes == null) {
446            return null;
447        }
448        InputStream bain = new ByteArrayInputStream(workBytes);
449        try (ObjectInputStream in = new ObjectInputStream(bain)) {
450            return (Work) in.readObject();
451        } catch (RuntimeException cause) {
452            throw cause;
453        } catch (IOException | ClassNotFoundException cause) {
454            throw new RuntimeException("Cannot deserialize work", cause);
455        }
456    }
457
458    protected int getScheduledQueueSize(final String queueId) throws IOException {
459        return redisExecutor.execute(new RedisCallable<Long>() {
460
461            @Override
462            public Long call(Jedis jedis) {
463                return jedis.llen(scheduledKey(queueId));
464            }
465
466        }).intValue();
467    }
468
469    protected int getRunningQueueSize(final String queueId) throws IOException {
470        return redisExecutor.execute(new RedisCallable<Long>() {
471
472            @Override
473            public Long call(Jedis jedis) {
474                return jedis.scard(runningKey(queueId));
475            }
476
477        }).intValue();
478    }
479
480    protected int getCompletedQueueSize(final String queueId) throws IOException {
481        return redisExecutor.execute(new RedisCallable<Long>() {
482
483            @Override
484            public Long call(Jedis jedis) {
485                return jedis.scard(completedKey(queueId));
486            }
487
488        }).intValue();
489    }
490
491    /**
492     * Persists a work instance and adds it to the scheduled queue.
493     *
494     * @param queueId the queue id
495     * @param work the work instance
496     * @throws IOException
497     */
498    public void addScheduledWork(final String queueId, Work work) throws IOException {
499        log.debug("Add scheduled " + work);
500        final byte[] workIdBytes = bytes(work.getId());
501
502        // serialize Work
503        final byte[] workBytes = serializeWork(work);
504
505        redisExecutor.execute(new RedisCallable<Void>() {
506
507            @Override
508            public Void call(Jedis jedis) {
509                if (redisExecutor.supportPipelined()) {
510                    Pipeline pipe = jedis.pipelined();
511                    pipe.hset(dataKey(), workIdBytes, workBytes);
512                    pipe.hset(stateKey(), workIdBytes, STATE_SCHEDULED);
513                    pipe.lpush(scheduledKey(queueId), workIdBytes);
514                    pipe.sync();
515                } else {
516                    jedis.hset(dataKey(), workIdBytes, workBytes);
517                    jedis.hset(stateKey(), workIdBytes, STATE_SCHEDULED);
518                    jedis.lpush(scheduledKey(queueId), workIdBytes);
519                }
520                return null;
521            }
522
523        });
524    }
525
526    /**
527     * Finds which queues have suspended work.
528     *
529     * @return a set of queue ids
530     * @since 5.8
531     */
532    protected Set<String> getSuspendedQueueIds() throws IOException {
533        return getQueueIds(KEY_SUSPENDED_PREFIX);
534    }
535
536    protected Set<String> getScheduledQueueIds() {
537        return getQueueIds(KEY_SCHEDULED_PREFIX);
538    }
539
540    protected Set<String> getRunningQueueIds() {
541        return getQueueIds(KEY_RUNNING_PREFIX);
542    }
543
544    @Override
545    public Set<String> getCompletedQueueIds() {
546        return getQueueIds(KEY_COMPLETED_PREFIX);
547    }
548
549    /**
550     * Finds which queues have work for a given state prefix.
551     *
552     * @return a set of queue ids
553     * @since 5.8
554     */
555    protected Set<String> getQueueIds(final String queuePrefix) {
556        return redisExecutor.execute(new RedisCallable<Set<String>>() {
557            @Override
558            public Set<String> call(Jedis jedis) {
559                int offset = keyBytes(queuePrefix).length;
560                Set<byte[]> keys = jedis.keys(keyBytes(queuePrefix, "*"));
561                Set<String> queueIds = new HashSet<String>(keys.size());
562                for (byte[] bytes : keys) {
563                    try {
564                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
565                        queueIds.add(queueId);
566                    } catch (IOException e) {
567                        throw new NuxeoException(e);
568                    }
569                }
570                return queueIds;
571            }
572
573        });
574    }
575
576    /**
577     * Resumes all suspended work instances by moving them to the scheduled queue.
578     *
579     * @param queueId the queue id
580     * @return the number of work instances scheduled
581     * @since 5.8
582     */
583    public int scheduleSuspendedWork(final String queueId) throws IOException {
584        return redisExecutor.execute(new RedisCallable<Integer>() {
585            @Override
586            public Integer call(Jedis jedis) {
587                for (int n = 0;; n++) {
588                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), scheduledKey(queueId));
589                    if (workIdBytes == null) {
590                        return Integer.valueOf(n);
591                    }
592                }
593            }
594
595        }).intValue();
596    }
597
598    /**
599     * Suspends all scheduled work instances by moving them to the suspended queue.
600     *
601     * @param queueId the queue id
602     * @return the number of work instances suspended
603     * @since 5.8
604     */
605    public int suspendScheduledWork(final String queueId) throws IOException {
606        return redisExecutor.execute(new RedisCallable<Integer>() {
607
608            @Override
609            public Integer call(Jedis jedis) {
610                for (int n = 0;; n++) {
611                    byte[] workIdBytes = jedis.rpoplpush(scheduledKey(queueId), suspendedKey(queueId));
612                    if (workIdBytes == null) {
613                        return n;
614                    }
615                }
616            }
617        }).intValue();
618    }
619
620    /**
621     * Switches a work to state running.
622     *
623     * @param queueId the queue id
624     * @param work the work
625     */
626    protected void workSetRunning(final String queueId, Work work) throws IOException {
627        final byte[] workIdBytes = bytes(work.getId());
628        redisExecutor.execute(new RedisCallable<Void>() {
629
630            @Override
631            public Void call(Jedis jedis) {
632                if (redisExecutor.supportPipelined()) {
633                    Pipeline pipe = jedis.pipelined();
634                    pipe.sadd(runningKey(queueId), workIdBytes);
635                    pipe.hset(stateKey(), workIdBytes, STATE_RUNNING);
636                    pipe.sync();
637                } else {
638                    jedis.sadd(runningKey(queueId), workIdBytes);
639                    jedis.hset(stateKey(), workIdBytes, STATE_RUNNING);
640                }
641                return null;
642            }
643        });
644    }
645
646    /**
647     * Switches a work to state completed, and saves its new state.
648     */
649    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
650        final byte[] workIdBytes = bytes(work.getId());
651        final byte[] workBytes = serializeWork(work);
652        redisExecutor.execute(new RedisCallable<Void>() {
653
654            @Override
655            public Void call(Jedis jedis) {
656                byte[] completedBytes = bytes(((char) STATE_COMPLETED_B) + String.valueOf(work.getCompletionTime()));
657                if (redisExecutor.supportPipelined()) {
658                    Pipeline pipe = jedis.pipelined();
659                    // store (updated) content in hash
660                    pipe.hset(dataKey(), workIdBytes, workBytes);
661                    // remove key from running set
662                    pipe.srem(runningKey(queueId), workIdBytes);
663                    // put key in completed set
664                    pipe.sadd(completedKey(queueId), workIdBytes);
665                    // set state to completed
666                    pipe.hset(stateKey(), workIdBytes, completedBytes);
667                    pipe.sync();
668                } else {
669                    jedis.hset(dataKey(), workIdBytes, workBytes);
670                    jedis.srem(runningKey(queueId), workIdBytes);
671                    jedis.sadd(completedKey(queueId), workIdBytes);
672                    jedis.hset(stateKey(), workIdBytes, completedBytes);
673                }
674                return null;
675            }
676        });
677    }
678
679    /**
680     * Gets the work state.
681     *
682     * @param workId the work id
683     * @return the state, or {@code null} if not found
684     */
685    protected State getWorkStateInfo(final String workId) {
686        final byte[] workIdBytes = bytes(workId);
687        return redisExecutor.execute(new RedisCallable<State>() {
688            @Override
689            public State call(Jedis jedis) {
690                // get state
691                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
692                if (bytes == null || bytes.length == 0) {
693                    return null;
694                }
695                switch (bytes[0]) {
696                case STATE_SCHEDULED_B:
697                    return State.SCHEDULED;
698                case STATE_CANCELED_B:
699                    return State.CANCELED;
700                case STATE_RUNNING_B:
701                    return State.RUNNING;
702                case STATE_COMPLETED_B:
703                    return State.COMPLETED;
704                default:
705                    String msg;
706                    try {
707                        msg = new String(bytes, UTF_8);
708                    } catch (UnsupportedEncodingException e) {
709                        msg = Arrays.toString(bytes);
710                    }
711                    log.error("Unknown work state: " + msg + ", work: " + workId);
712                    return null;
713                }
714            }
715        });
716    }
717
718    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
719        return redisExecutor.execute(new RedisCallable<List<String>>() {
720
721            @Override
722            public List<String> call(Jedis jedis) {
723                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
724                List<String> list = new ArrayList<String>(keys.size());
725                for (byte[] workIdBytes : keys) {
726                    list.add(string(workIdBytes));
727                }
728                return list;
729            }
730
731        });
732    }
733
734    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
735        return redisExecutor.execute(new RedisCallable<List<String>>() {
736
737            @Override
738            public List<String> call(Jedis jedis) {
739
740                Set<byte[]> keys = jedis.smembers(queueBytes);
741                List<String> list = new ArrayList<String>(keys.size());
742                for (byte[] workIdBytes : keys) {
743                    list.add(string(workIdBytes));
744                }
745                return list;
746            }
747
748        });
749
750    }
751
752    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
753        return redisExecutor.execute(new RedisCallable<List<Work>>() {
754            @Override
755            public List<Work> call(Jedis jedis) {
756                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
757                List<Work> list = new ArrayList<Work>(keys.size());
758                for (byte[] workIdBytes : keys) {
759                    // get data
760                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
761                    Work work = deserializeWork(workBytes);
762                    list.add(work);
763                }
764                return list;
765            }
766        });
767    }
768
769    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
770        return redisExecutor.execute(new RedisCallable<List<Work>>() {
771            @Override
772            public List<Work> call(Jedis jedis) {
773                Set<byte[]> keys = jedis.smembers(queueBytes);
774                List<Work> list = new ArrayList<Work>(keys.size());
775                for (byte[] workIdBytes : keys) {
776                    // get data
777                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
778                    Work work = deserializeWork(workBytes);
779                    list.add(work);
780                }
781                return list;
782            }
783        });
784    }
785
786    protected Work getWork(byte[] workIdBytes) {
787        try {
788            return getWorkData(workIdBytes);
789        } catch (IOException e) {
790            throw new RuntimeException(e);
791        }
792    }
793
794    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
795        return redisExecutor.execute(new RedisCallable<Work>() {
796
797            @Override
798            public Work call(Jedis jedis) {
799                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
800                return deserializeWork(workBytes);
801            }
802
803        });
804    }
805
806    /**
807     * Removes first work from scheduled queue.
808     *
809     * @param queueId the queue id
810     * @return the work, or {@code null} if the scheduled queue is empty
811     */
812    protected Work removeScheduledWork(final String queueId) throws IOException {
813        return redisExecutor.execute(new RedisCallable<Work>() {
814
815            @Override
816            public Work call(Jedis jedis) {
817                // pop from queue
818                byte[] workIdBytes = jedis.rpop(scheduledKey(queueId));
819                if (workIdBytes == null) {
820                    return null;
821                }
822                // get data
823                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
824                return deserializeWork(workBytes);
825            }
826
827        });
828    }
829
830    /**
831     * Removes a given work from scheduled queue and set state to completed.
832     *
833     * @throws IOException
834     */
835    protected Work removeScheduledWork(final String queueId, final String workId) throws IOException {
836        final byte[] workIdBytes = bytes(workId);
837        return redisExecutor.execute(new RedisCallable<Work>() {
838
839            @Override
840            public Work call(Jedis jedis) {
841                // remove from queue
842                Long n = jedis.lrem(scheduledKey(queueId), 0, workIdBytes);
843                if (n == null || n.intValue() == 0) {
844                    return null;
845                }
846                // set state to completed at current time
847                byte[] completedBytes = bytes(String.valueOf(System.currentTimeMillis()));
848                jedis.hset(stateKey(), workIdBytes, completedBytes);
849                // get data
850                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
851                return deserializeWork(workBytes);
852            }
853
854        });
855    }
856
857    /**
858     * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
859     * not support SSCAN.
860     *
861     * @since 7.3
862     */
863    public static class SScanner {
864
865        // results of SMEMBERS for last key, in embedded mode
866        protected List<String> smembers;
867
868        protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) {
869            ScanResult<String> scanResult;
870            try {
871                scanResult = jedis.sscan(key, cursor, params);
872            } catch (Exception e) {
873                // when testing with embedded fake redis, we may get an un-declared exception
874                if (!(e.getCause() instanceof NoSuchMethodException)) {
875                    throw e;
876                }
877                // no SSCAN available in embedded, do a full SMEMBERS
878                if (smembers == null) {
879                    Set<String> set = jedis.smembers(key);
880                    smembers = new ArrayList<>(set);
881                }
882                Collection<byte[]> bparams = params.getParams();
883                int count = 1000;
884                for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) {
885                    byte[] param = it.next();
886                    if (param.equals(Protocol.Keyword.MATCH.raw)) {
887                        throw new UnsupportedOperationException("MATCH not supported");
888                    }
889                    if (param.equals(Protocol.Keyword.COUNT.raw)) {
890                        count = Integer.parseInt(SafeEncoder.encode(it.next()));
891                    }
892                }
893                int pos = Integer.parseInt(cursor); // don't check range, callers are cool
894                int end = Math.min(pos + count, smembers.size());
895                int nextPos = end == smembers.size() ? 0 : end;
896                scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end));
897                if (nextPos == 0) {
898                    smembers = null;
899                }
900            }
901            return scanResult;
902        }
903    }
904
905    /**
906     * Don't pass more than approx. this number of parameters to Lua calls.
907     */
908    private static final int BATCH_SIZE = 5000;
909
910    protected void removeAllCompletedWork(final String queueId) throws IOException {
911        removeCompletedWork(queueId, 0);
912    }
913
914    protected void removeCompletedWork(final String queueId, final long completionTime) throws IOException {
915        redisExecutor.execute(new RedisCallable<Void>() {
916
917            @Override
918            public Void call(Jedis jedis) {
919                String completedKey = completedKeyString(queueId);
920                String stateKey = stateKeyString();
921                String dataKey = dataKeyString();
922                SScanner sscanner = new SScanner();
923                ScanParams scanParams = new ScanParams().count(BATCH_SIZE);
924                String cursor = "0";
925                do {
926                    ScanResult<String> scanResult = sscanner.sscan(jedis, completedKey, cursor, scanParams);
927                    cursor = scanResult.getStringCursor();
928                    List<String> workIds = scanResult.getResult();
929                    if (!workIds.isEmpty()) {
930                        // delete these works if before the completion time
931                        List<String> keys = Arrays.asList(completedKey, stateKey, dataKey);
932                        List<String> args = new ArrayList<String>(1 + workIds.size());
933                        args.add(String.valueOf(completionTime));
934                        args.addAll(workIds);
935                        jedis.evalsha(delCompletedSha, keys, args);
936                    }
937                } while (!"0".equals(cursor));
938                return null;
939            }
940        });
941    }
942
943}