001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.redis.RedisAdmin;
043import org.nuxeo.ecm.core.redis.RedisCallable;
044import org.nuxeo.ecm.core.redis.RedisExecutor;
045import org.nuxeo.ecm.core.work.WorkHolder;
046import org.nuxeo.ecm.core.work.WorkManagerImpl;
047import org.nuxeo.ecm.core.work.WorkQueueDescriptorRegistry;
048import org.nuxeo.ecm.core.work.WorkQueuing;
049import org.nuxeo.ecm.core.work.api.Work;
050import org.nuxeo.ecm.core.work.api.Work.State;
051import org.nuxeo.runtime.api.Framework;
052
053import redis.clients.jedis.Jedis;
054import redis.clients.jedis.Pipeline;
055import redis.clients.jedis.Protocol;
056import redis.clients.jedis.ScanParams;
057import redis.clients.jedis.ScanResult;
058import redis.clients.util.SafeEncoder;
059
060/**
061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
062 *
063 * @since 5.8
064 */
065public class RedisWorkQueuing implements WorkQueuing {
066
067    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
068
069    protected static final String UTF_8 = "UTF-8";
070
071    /**
072     * Global hash of Work instance id -> serialized Work instance.
073     */
074    protected static final String KEY_DATA = "data";
075
076    /**
077     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
078     * a completion time in milliseconds.
079     */
080    protected static final String KEY_STATE = "state";
081
082    /**
083     * Per-queue list of suspended Work instance ids.
084     */
085    protected static final String KEY_SUSPENDED_PREFIX = "prev:";
086
087    /**
088     * Per-queue list of scheduled Work instance ids.
089     */
090    protected static final String KEY_QUEUE_PREFIX = "queue:";
091
092    /**
093     * Per-queue set of scheduled Work instance ids.
094     */
095    protected static final String KEY_SCHEDULED_PREFIX = "sched:";
096
097    /**
098     * Per-queue set of running Work instance ids.
099     */
100    protected static final String KEY_RUNNING_PREFIX = "run:";
101
102    /**
103     * Per-queue set of completed Work instance ids.
104     */
105    protected static final String KEY_COMPLETED_PREFIX = "done:";
106
107    protected static final byte STATE_SCHEDULED_B = 'Q';
108
109    protected static final byte STATE_CANCELED_B = 'X';
110
111    protected static final byte STATE_RUNNING_B = 'R';
112
113    protected static final byte STATE_COMPLETED_B = 'C';
114
115    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
116
117    protected static final byte[] STATE_CANCELED = new byte[] { STATE_CANCELED_B };
118
119    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
120
121    protected static final byte[] STATE_COMPLETED = new byte[] { STATE_COMPLETED_B };
122
123    protected final WorkManagerImpl mgr;
124
125    // @GuardedBy("this")
126    protected Map<String, BlockingQueue<Runnable>> allQueued = new HashMap<String, BlockingQueue<Runnable>>();
127
128    protected RedisExecutor redisExecutor;
129
130    protected RedisAdmin redisAdmin;
131
132    protected String redisNamespace;
133
134    // lua scripts
135    protected String schedulingWorkSha;
136    protected String runningWorkSha;
137    protected String completedWorkSha;
138    protected String cleanCompletedWorkSha;
139
140    public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) {
141        this.mgr = mgr;
142    }
143
144    @Override
145    public void init() {
146        redisExecutor = Framework.getLocalService(RedisExecutor.class);
147        redisAdmin = Framework.getService(RedisAdmin.class);
148        redisNamespace = redisAdmin.namespace("work");
149        try {
150            for (String queueId : getSuspendedQueueIds()) {
151                int n = scheduleSuspendedWork(queueId);
152                log.info("Re-scheduling " + n + " work instances suspended from queue: " + queueId);
153            }
154        } catch (IOException e) {
155            throw new RuntimeException(e);
156        }
157        try {
158            schedulingWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "scheduling-work");
159            runningWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "running-work");
160            completedWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "completed-work");
161            cleanCompletedWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "clean-completed-work");
162        } catch (IOException e) {
163            throw new RuntimeException(e);
164        }
165    }
166
167    @Override
168    public BlockingQueue<Runnable> initWorkQueue(String queueId) {
169        if (allQueued.containsKey(queueId)) {
170            throw new IllegalStateException(queueId + " is already configured");
171        }
172        final BlockingQueue<Runnable> queue = newBlockingQueue(queueId);
173        allQueued.put(queueId, queue);
174        return queue;
175    }
176
177    @Override
178    public boolean workSchedule(String queueId, Work work) {
179        return getWorkQueue(queueId).offer(new WorkHolder(work));
180    }
181
182    public BlockingQueue<Runnable> getWorkQueue(String queueId) {
183        if (!allQueued.containsKey(queueId)) {
184            throw new IllegalStateException(queueId + " was not configured yet");
185        }
186        return allQueued.get(queueId);
187    }
188
189
190    @Override
191    public void workRunning(String queueId, Work work) {
192        try {
193            workSetRunning(queueId, work);
194        } catch (IOException e) {
195            throw new RuntimeException(e);
196        }
197    }
198
199    @Override
200    public void workCompleted(String queueId, Work work) {
201        try {
202            workSetCompleted(queueId, work);
203        } catch (IOException e) {
204            throw new RuntimeException(e);
205        }
206    }
207
208    protected BlockingQueue<Runnable> newBlockingQueue(String queueId) {
209        return new RedisBlockingQueue(queueId, this);
210    }
211
212    @Override
213    public List<Work> listWork(String queueId, State state) {
214        switch (state) {
215        case SCHEDULED:
216            return listScheduled(queueId);
217        case RUNNING:
218            return listRunning(queueId);
219        case COMPLETED:
220            return listCompleted(queueId);
221        default:
222            throw new IllegalArgumentException(String.valueOf(state));
223        }
224    }
225
226    @Override
227    public List<String> listWorkIds(String queueId, State state) {
228        if (state == null) {
229            return listNonCompletedIds(queueId);
230        }
231        switch (state) {
232        case SCHEDULED:
233            return listScheduledIds(queueId);
234        case RUNNING:
235            return listRunningIds(queueId);
236        case COMPLETED:
237            return listCompletedIds(queueId);
238        default:
239            throw new IllegalArgumentException(String.valueOf(state));
240        }
241    }
242
243    protected List<Work> listScheduled(String queueId) {
244        try {
245            return listWorkList(queuedKey(queueId));
246        } catch (IOException e) {
247            throw new RuntimeException(e);
248        }
249    }
250
251    protected List<Work> listRunning(String queueId) {
252        try {
253            return listWorkSet(runningKey(queueId));
254        } catch (IOException e) {
255            throw new RuntimeException(e);
256        }
257    }
258
259    protected List<Work> listCompleted(String queueId) {
260        try {
261            return listWorkSet(completedKey(queueId));
262        } catch (IOException e) {
263            throw new RuntimeException(e);
264        }
265    }
266
267    protected List<String> listScheduledIds(String queueId) {
268        try {
269            return listWorkIdsList(queuedKey(queueId));
270        } catch (IOException e) {
271            throw new RuntimeException(e);
272        }
273    }
274
275    protected List<String> listRunningIds(String queueId) {
276        try {
277            return listWorkIdsSet(runningKey(queueId));
278        } catch (IOException e) {
279            throw new RuntimeException(e);
280        }
281    }
282
283    protected List<String> listNonCompletedIds(String queueId) {
284        List<String> list = listScheduledIds(queueId);
285        list.addAll(listRunningIds(queueId));
286        return list;
287    }
288
289    protected List<String> listCompletedIds(String queueId) {
290        try {
291            return listWorkIdsSet(completedKey(queueId));
292        } catch (IOException e) {
293            throw new RuntimeException(e);
294        }
295    }
296
297    @Override
298    public int count(String queueId, State state) {
299        switch (state) {
300        case SCHEDULED:
301            return getScheduledSize(queueId);
302        case RUNNING:
303            return getRunningSize(queueId);
304        case COMPLETED:
305            return getCompletedSize(queueId);
306        default:
307            throw new IllegalArgumentException(String.valueOf(state));
308        }
309    }
310
311    protected int getScheduledSize(String queueId) {
312        try {
313            return getRedisSetSize(scheduledKey(queueId));
314        } catch (IOException e) {
315            throw new RuntimeException(e);
316        }
317    }
318
319    protected int getRunningSize(String queueId) {
320        try {
321            return getRedisSetSize(runningKey(queueId));
322        } catch (IOException e) {
323            throw new RuntimeException(e);
324        }
325    }
326
327    protected int getCompletedSize(String queueId) {
328        try {
329            return getRedisSetSize(completedKey(queueId));
330        } catch (IOException e) {
331            throw new RuntimeException(e);
332        }
333    }
334
335    @Override
336    public Work find(String workId, State state) {
337        if (isWorkInState(workId, state)) {
338            return getWork(bytes(workId));
339        }
340        return null;
341    }
342
343    @Override
344    public boolean isWorkInState(String workId, State state) {
345        State s = getWorkState(workId);
346        if (state == null) {
347            return s == State.SCHEDULED || s == State.RUNNING;
348        }
349        return s == state;
350    }
351
352    @Override
353    public Work removeScheduled(String queueId, String workId) {
354        try {
355            return removeScheduledWork(queueId, workId);
356        } catch (IOException cause) {
357            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
358        }
359    }
360
361    @Override
362    public State getWorkState(String workId) {
363        return getWorkStateInfo(workId);
364    }
365
366    @Override
367    public int setSuspending(String queueId) {
368        try {
369            int n = suspendScheduledWork(queueId);
370            log.info("Suspending " + n + " work instances from queue: " + queueId);
371            allQueued.remove(queueId);
372            return n;
373        } catch (IOException e) {
374            throw new RuntimeException(e);
375        }
376    }
377
378    @Override
379    public void clearCompletedWork(String queueId, long completionTime) {
380        try {
381            if (completionTime <= 0) {
382                removeAllCompletedWork(queueId);
383            } else {
384                removeCompletedWork(queueId, completionTime);
385            }
386        } catch (IOException e) {
387            throw new RuntimeException(e);
388        }
389    }
390
391    /*
392     * ******************** Redis Interface ********************
393     */
394
395    protected String string(byte[] bytes) {
396        try {
397            return new String(bytes, UTF_8);
398        } catch (IOException e) {
399            throw new RuntimeException(e);
400        }
401    }
402
403    protected byte[] bytes(String string) {
404        try {
405            return string.getBytes(UTF_8);
406        } catch (IOException e) {
407            // cannot happen for UTF-8
408            throw new RuntimeException(e);
409        }
410    }
411
412    protected byte[] keyBytes(String prefix, String queueId) {
413        return keyBytes(prefix + queueId);
414    }
415
416    protected byte[] keyBytes(String prefix) {
417        return bytes(redisNamespace + prefix);
418    }
419
420    protected byte[] suspendedKey(String queueId) {
421        return keyBytes(KEY_SUSPENDED_PREFIX, queueId);
422    }
423
424    protected byte[] queuedKey(String queueId) {
425        return keyBytes(KEY_QUEUE_PREFIX, queueId);
426    }
427
428    protected byte[] runningKey(String queueId) {
429        return keyBytes(KEY_RUNNING_PREFIX, queueId);
430    }
431
432    protected byte[] scheduledKey(String queueId) {
433        return keyBytes(KEY_SCHEDULED_PREFIX, queueId);
434    }
435
436    protected byte[] completedKey(String queueId) {
437        return keyBytes(KEY_COMPLETED_PREFIX, queueId);
438    }
439
440    protected String completedKeyString(String queueId) {
441        return redisNamespace + KEY_COMPLETED_PREFIX + queueId;
442    }
443
444    protected byte[] stateKey() {
445        return keyBytes(KEY_STATE);
446    }
447
448    protected String stateKeyString() {
449        return redisNamespace + KEY_STATE;
450    }
451
452    protected byte[] dataKey() {
453        return keyBytes(KEY_DATA);
454    }
455
456    protected String dataKeyString() {
457        return redisNamespace + KEY_DATA;
458    }
459
460    protected byte[] serializeWork(Work work) throws IOException {
461        ByteArrayOutputStream baout = new ByteArrayOutputStream();
462        ObjectOutputStream out = new ObjectOutputStream(baout);
463        out.writeObject(work);
464        out.flush();
465        out.close();
466        return baout.toByteArray();
467    }
468
469    protected Work deserializeWork(byte[] workBytes) {
470        if (workBytes == null) {
471            return null;
472        }
473        InputStream bain = new ByteArrayInputStream(workBytes);
474        try (ObjectInputStream in = new ObjectInputStream(bain)) {
475            return (Work) in.readObject();
476        } catch (RuntimeException cause) {
477            throw cause;
478        } catch (IOException | ClassNotFoundException cause) {
479            throw new RuntimeException("Cannot deserialize work", cause);
480        }
481    }
482
483    protected int getRedisListSize(final byte[] key) throws IOException {
484        return redisExecutor.execute(new RedisCallable<Long>() {
485
486            @Override
487            public Long call(Jedis jedis) {
488                return jedis.llen(key);
489            }
490
491        }).intValue();
492    }
493
494    protected int getRedisSetSize(final byte[] key) throws IOException {
495        return redisExecutor.execute(new RedisCallable<Long>() {
496
497            @Override
498            public Long call(Jedis jedis) {
499                return jedis.scard(key);
500            }
501
502        }).intValue();
503    }
504
505
506    /**
507     * Persists a work instance and adds it to the scheduled queue.
508     *
509     * @param queueId the queue id
510     * @param work the work instance
511     * @throws IOException
512     */
513    public void addScheduledWork(final String queueId, Work work) throws IOException {
514        final byte[] workId = bytes(work.getId());
515        final byte[] workData = serializeWork(work);
516        final List<byte[]> keys = Arrays.asList(dataKey(), stateKey(), scheduledKey(queueId),queuedKey(queueId));
517        final List<byte[]> args = Arrays.asList(workId, workData, STATE_SCHEDULED);
518        redisExecutor.execute(new RedisCallable<Void>() {
519
520            @Override
521            public Void call(Jedis jedis) {
522                jedis.evalsha(schedulingWorkSha.getBytes(), keys, args);
523                if (log.isDebugEnabled()) {
524                    log.debug("Add scheduled " + work);
525                }
526                return null;
527            }
528
529        });
530    }
531
532    /**
533     * Finds which queues have suspended work.
534     *
535     * @return a set of queue ids
536     * @since 5.8
537     */
538    protected Set<String> getSuspendedQueueIds() throws IOException {
539        return getQueueIds(KEY_SUSPENDED_PREFIX);
540    }
541
542    protected Set<String> getScheduledQueueIds() {
543        return getQueueIds(KEY_QUEUE_PREFIX);
544    }
545
546    protected Set<String> getRunningQueueIds() {
547        return getQueueIds(KEY_RUNNING_PREFIX);
548    }
549
550    @Override
551    public Set<String> getCompletedQueueIds() {
552        return getQueueIds(KEY_COMPLETED_PREFIX);
553    }
554
555    /**
556     * Finds which queues have work for a given state prefix.
557     *
558     * @return a set of queue ids
559     * @since 5.8
560     */
561    protected Set<String> getQueueIds(final String queuePrefix) {
562        return redisExecutor.execute(new RedisCallable<Set<String>>() {
563            @Override
564            public Set<String> call(Jedis jedis) {
565                int offset = keyBytes(queuePrefix).length;
566                Set<byte[]> keys = jedis.keys(keyBytes(queuePrefix, "*"));
567                Set<String> queueIds = new HashSet<String>(keys.size());
568                for (byte[] bytes : keys) {
569                    try {
570                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
571                        queueIds.add(queueId);
572                    } catch (IOException e) {
573                        throw new NuxeoException(e);
574                    }
575                }
576                return queueIds;
577            }
578
579        });
580    }
581
582    /**
583     * Resumes all suspended work instances by moving them to the scheduled queue.
584     *
585     * @param queueId the queue id
586     * @return the number of work instances scheduled
587     * @since 5.8
588     */
589    public int scheduleSuspendedWork(final String queueId) throws IOException {
590        return redisExecutor.execute(new RedisCallable<Integer>() {
591            @Override
592            public Integer call(Jedis jedis) {
593                for (int n = 0;; n++) {
594                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
595                    if (workIdBytes == null) {
596                        return Integer.valueOf(n);
597                    }
598                }
599            }
600
601        }).intValue();
602    }
603
604    /**
605     * Suspends all scheduled work instances by moving them to the suspended queue.
606     *
607     * @param queueId the queue id
608     * @return the number of work instances suspended
609     * @since 5.8
610     */
611    public int suspendScheduledWork(final String queueId) throws IOException {
612        return redisExecutor.execute(new RedisCallable<Integer>() {
613
614            @Override
615            public Integer call(Jedis jedis) {
616                for (int n = 0;; n++) {
617                    byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
618                    if (workIdBytes == null) {
619                        return n;
620                    }
621                }
622            }
623        }).intValue();
624    }
625
626    /**
627     * Switches a work to state running.
628     *
629     * @param queueId the queue id
630     * @param work the work
631     */
632    protected void workSetRunning(final String queueId, Work work) throws IOException {
633        final byte[] workId = bytes(work.getId());
634        final List<byte[]> keys = Arrays.asList(stateKey(), scheduledKey(queueId), runningKey(queueId));
635        final List<byte[]> args = Arrays.asList(workId, STATE_RUNNING);
636        redisExecutor.execute(new RedisCallable<Void>() {
637
638            @Override
639            public Void call(Jedis jedis) {
640                jedis.evalsha(runningWorkSha.getBytes(), keys, args);
641                if (log.isDebugEnabled()) {
642                    log.debug("Mark work as running " + work);
643                }
644                return null;
645            }
646        });
647    }
648
649    /**
650     * Switches a work to state completed, and saves its new state.
651     */
652    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
653        final byte[] workId = bytes(work.getId());
654        final byte[] workData = serializeWork(work);
655        final byte[] state = bytes(((char) STATE_COMPLETED_B) + String.valueOf(work.getCompletionTime()));
656        final List<byte[]> keys = Arrays.asList(dataKey(), stateKey(), runningKey(queueId), completedKey(queueId));
657        final List<byte[]> args = Arrays.asList(workId, workData, state);
658        redisExecutor.execute(new RedisCallable<Void>() {
659
660            @Override
661            public Void call(Jedis jedis) {
662                jedis.evalsha(completedWorkSha.getBytes(), keys, args);
663                if (log.isDebugEnabled()) {
664                    log.debug("Mark work as completed " + work);
665                }
666                return null;
667            }
668        });
669    }
670
671    /**
672     * Gets the work state.
673     *
674     * @param workId the work id
675     * @return the state, or {@code null} if not found
676     */
677    protected State getWorkStateInfo(final String workId) {
678        final byte[] workIdBytes = bytes(workId);
679        return redisExecutor.execute(new RedisCallable<State>() {
680            @Override
681            public State call(Jedis jedis) {
682                // get state
683                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
684                if (bytes == null || bytes.length == 0) {
685                    return null;
686                }
687                switch (bytes[0]) {
688                case STATE_SCHEDULED_B:
689                    return State.SCHEDULED;
690                case STATE_CANCELED_B:
691                    return State.CANCELED;
692                case STATE_RUNNING_B:
693                    return State.RUNNING;
694                case STATE_COMPLETED_B:
695                    return State.COMPLETED;
696                default:
697                    String msg;
698                    try {
699                        msg = new String(bytes, UTF_8);
700                    } catch (UnsupportedEncodingException e) {
701                        msg = Arrays.toString(bytes);
702                    }
703                    log.error("Unknown work state: " + msg + ", work: " + workId);
704                    return null;
705                }
706            }
707        });
708    }
709
710    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
711        return redisExecutor.execute(new RedisCallable<List<String>>() {
712
713            @Override
714            public List<String> call(Jedis jedis) {
715                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
716                List<String> list = new ArrayList<String>(keys.size());
717                for (byte[] workIdBytes : keys) {
718                    list.add(string(workIdBytes));
719                }
720                return list;
721            }
722
723        });
724    }
725
726    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
727        return redisExecutor.execute(new RedisCallable<List<String>>() {
728
729            @Override
730            public List<String> call(Jedis jedis) {
731
732                Set<byte[]> keys = jedis.smembers(queueBytes);
733                List<String> list = new ArrayList<String>(keys.size());
734                for (byte[] workIdBytes : keys) {
735                    list.add(string(workIdBytes));
736                }
737                return list;
738            }
739
740        });
741
742    }
743
744    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
745        return redisExecutor.execute(new RedisCallable<List<Work>>() {
746            @Override
747            public List<Work> call(Jedis jedis) {
748                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
749                List<Work> list = new ArrayList<Work>(keys.size());
750                for (byte[] workIdBytes : keys) {
751                    // get data
752                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
753                    Work work = deserializeWork(workBytes);
754                    list.add(work);
755                }
756                return list;
757            }
758        });
759    }
760
761    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
762        return redisExecutor.execute(new RedisCallable<List<Work>>() {
763            @Override
764            public List<Work> call(Jedis jedis) {
765                Set<byte[]> keys = jedis.smembers(queueBytes);
766                List<Work> list = new ArrayList<Work>(keys.size());
767                for (byte[] workIdBytes : keys) {
768                    // get data
769                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
770                    Work work = deserializeWork(workBytes);
771                    list.add(work);
772                }
773                return list;
774            }
775        });
776    }
777
778    protected Work getWork(byte[] workIdBytes) {
779        try {
780            return getWorkData(workIdBytes);
781        } catch (IOException e) {
782            throw new RuntimeException(e);
783        }
784    }
785
786    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
787        return redisExecutor.execute(new RedisCallable<Work>() {
788
789            @Override
790            public Work call(Jedis jedis) {
791                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
792                return deserializeWork(workBytes);
793            }
794
795        });
796    }
797
798    /**
799     * Removes first work from work queue.
800     *
801     * @param queueId the queue id
802     * @return the work, or {@code null} if the scheduled queue is empty
803     */
804    protected Work getWorkFromQueue(final String queueId) throws IOException {
805        return redisExecutor.execute(new RedisCallable<Work>() {
806
807            @Override
808            public Work call(Jedis jedis) {
809                // pop from queue
810                byte[] workIdBytes = jedis.rpop(queuedKey(queueId));
811                if (workIdBytes == null) {
812                    return null;
813                }
814                // get data
815                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
816                return deserializeWork(workBytes);
817            }
818
819        });
820    }
821
822    /**
823     * Removes a given work from queue, move the work from scheduled to completed set.
824     *
825     * @throws IOException
826     */
827    protected Work removeScheduledWork(final String queueId, final String workId) throws IOException {
828        final byte[] workIdBytes = bytes(workId);
829        return redisExecutor.execute(new RedisCallable<Work>() {
830
831            @Override
832            public Work call(Jedis jedis) {
833                // remove from queue
834                Long n = jedis.lrem(queuedKey(queueId), 0, workIdBytes);
835                if (n == null || n.intValue() == 0) {
836                    return null;
837                }
838                // remove from set
839                jedis.srem(scheduledKey(queueId), workIdBytes);
840                // set state to completed at current time
841                byte[] completedBytes = bytes(String.valueOf(System.currentTimeMillis()));
842                jedis.hset(stateKey(), workIdBytes, completedBytes);
843                // get data
844                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
845                Work work = deserializeWork(workBytes);
846                log.debug("Remove scheduled " + work);
847                return work;
848            }
849
850        });
851    }
852
853    /**
854     * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
855     * not support SSCAN.
856     *
857     * @since 7.3
858     */
859    public static class SScanner {
860
861        // results of SMEMBERS for last key, in embedded mode
862        protected List<String> smembers;
863
864        protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) {
865            ScanResult<String> scanResult;
866            try {
867                scanResult = jedis.sscan(key, cursor, params);
868            } catch (Exception e) {
869                // when testing with embedded fake redis, we may get an un-declared exception
870                if (!(e.getCause() instanceof NoSuchMethodException)) {
871                    throw e;
872                }
873                // no SSCAN available in embedded, do a full SMEMBERS
874                if (smembers == null) {
875                    Set<String> set = jedis.smembers(key);
876                    smembers = new ArrayList<>(set);
877                }
878                Collection<byte[]> bparams = params.getParams();
879                int count = 1000;
880                for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) {
881                    byte[] param = it.next();
882                    if (param.equals(Protocol.Keyword.MATCH.raw)) {
883                        throw new UnsupportedOperationException("MATCH not supported");
884                    }
885                    if (param.equals(Protocol.Keyword.COUNT.raw)) {
886                        count = Integer.parseInt(SafeEncoder.encode(it.next()));
887                    }
888                }
889                int pos = Integer.parseInt(cursor); // don't check range, callers are cool
890                int end = Math.min(pos + count, smembers.size());
891                int nextPos = end == smembers.size() ? 0 : end;
892                scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end));
893                if (nextPos == 0) {
894                    smembers = null;
895                }
896            }
897            return scanResult;
898        }
899    }
900
901    /**
902     * Don't pass more than approx. this number of parameters to Lua calls.
903     */
904    private static final int BATCH_SIZE = 5000;
905
906    protected void removeAllCompletedWork(final String queueId) throws IOException {
907        removeCompletedWork(queueId, 0);
908    }
909
910    protected void removeCompletedWork(final String queueId, final long completionTime) throws IOException {
911        redisExecutor.execute(new RedisCallable<Void>() {
912
913            @Override
914            public Void call(Jedis jedis) {
915                String completedKey = completedKeyString(queueId);
916                String stateKey = stateKeyString();
917                String dataKey = dataKeyString();
918                SScanner sscanner = new SScanner();
919                ScanParams scanParams = new ScanParams().count(BATCH_SIZE);
920                String cursor = "0";
921                do {
922                    ScanResult<String> scanResult = sscanner.sscan(jedis, completedKey, cursor, scanParams);
923                    cursor = scanResult.getStringCursor();
924                    List<String> workIds = scanResult.getResult();
925                    if (!workIds.isEmpty()) {
926                        // delete these works if before the completion time
927                        List<String> keys = Arrays.asList(completedKey, stateKey, dataKey);
928                        List<String> args = new ArrayList<String>(1 + workIds.size());
929                        args.add(String.valueOf(completionTime));
930                        args.addAll(workIds);
931                        jedis.evalsha(cleanCompletedWorkSha, keys, args);
932                    }
933                } while (!"0".equals(cursor));
934                return null;
935            }
936        });
937    }
938
939}