001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.redis.RedisAdmin;
042import org.nuxeo.ecm.core.redis.RedisCallable;
043import org.nuxeo.ecm.core.redis.RedisExecutor;
044import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
045import org.nuxeo.ecm.core.work.WorkHolder;
046import org.nuxeo.ecm.core.work.WorkQueuing;
047import org.nuxeo.ecm.core.work.api.Work;
048import org.nuxeo.ecm.core.work.api.Work.State;
049import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
050import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
051import org.nuxeo.runtime.api.Framework;
052
053import redis.clients.jedis.Jedis;
054import redis.clients.jedis.Protocol;
055import redis.clients.jedis.ScanParams;
056import redis.clients.jedis.ScanResult;
057import redis.clients.jedis.exceptions.JedisException;
058import redis.clients.util.SafeEncoder;
059
060/**
061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
062 *
063 * @since 5.8
064 */
065public class RedisWorkQueuing implements WorkQueuing {
066
067    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
068
069    protected static final String UTF_8 = "UTF-8";
070
071    /**
072     * Global hash of Work instance id -> serialoized Work instance.
073     */
074    protected static final String KEY_DATA = "data";
075
076    /**
077     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
078     * a completion time in milliseconds.
079     */
080    protected static final String KEY_STATE = "state";
081
082    /**
083     * Per-queue list of suspended Work instance ids.
084     */
085    protected static final String KEY_SUSPENDED_PREFIX = "prev";
086
087    protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes();
088
089    /**
090     * Per-queue list of scheduled Work instance ids.
091     */
092    protected static final String KEY_QUEUE_PREFIX = "queue";
093
094    protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes();
095
096    /**
097     * Per-queue set of scheduled Work instance ids.
098     */
099    protected static final String KEY_SCHEDULED_PREFIX = "sched";
100
101    protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes();
102
103    /**
104     * Per-queue set of running Work instance ids.
105     */
106    protected static final String KEY_RUNNING_PREFIX = "run";
107
108    protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes();
109
110    /**
111     * Per-queue set of counters.
112     */
113    protected static final String KEY_COMPLETED_PREFIX = "done";
114
115    protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes();
116
117    protected static final String KEY_CANCELED_PREFIX = "cancel";
118
119    protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes();
120
121    protected static final String KEY_COUNT_PREFIX = "count";
122
123    protected static final byte STATE_SCHEDULED_B = 'Q';
124
125    protected static final byte STATE_RUNNING_B = 'R';
126
127    protected static final byte STATE_RUNNING_C = 'C';
128
129    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
130
131    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
132
133    protected static final byte[] STATE_UNKNOWN = new byte[0];
134
135    protected Listener listener;
136
137    protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>();
138
139    protected String redisNamespace;
140
141    // lua scripts
142    protected byte[] initWorkQueueSha;
143
144    protected byte[] metricsWorkQueueSha;
145
146    protected byte[] schedulingWorkSha;
147
148    protected byte[] popWorkSha;
149
150    protected byte[] runningWorkSha;
151
152    protected byte[] cancelledScheduledWorkSha;
153
154    protected byte[] completedWorkSha;
155
156    protected byte[] cancelledRunningWorkSha;
157
158    public RedisWorkQueuing(Listener listener) {
159        this.listener = listener;
160        loadConfig();
161    }
162
163    void loadConfig() {
164        RedisAdmin admin = Framework.getService(RedisAdmin.class);
165        redisNamespace = admin.namespace("work");
166        try {
167            initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue")
168                    .getBytes();
169            metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue")
170                    .getBytes();
171            schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work")
172                    .getBytes();
173            popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work")
174                    .getBytes();
175            runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work")
176                    .getBytes();
177            cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work")
178                    .getBytes();
179            completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work")
180                    .getBytes();
181            cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work")
182                    .getBytes();
183        } catch (IOException e) {
184            throw new RuntimeException("Cannot load LUA scripts", e);
185        }
186    }
187
188    @Override
189    public NuxeoBlockingQueue init(WorkQueueDescriptor config) {
190        evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList());
191        RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this);
192        allQueued.put(config.id, queue);
193        return queue;
194    }
195
196    @Override
197    public NuxeoBlockingQueue getQueue(String queueId) {
198        return allQueued.get(queueId);
199    }
200
201
202    @Override
203    public void workSchedule(String queueId, Work work) {
204        getQueue(queueId).offer(new WorkHolder(work));
205    }
206
207    @Override
208    public void workRunning(String queueId, Work work) {
209        try {
210            workSetRunning(queueId, work);
211        } catch (IOException e) {
212            throw new RuntimeException(e);
213        }
214    }
215
216    @Override
217    public void workCanceled(String queueId, Work work) {
218        try {
219            workSetCancelledScheduled(queueId, work);
220        } catch (IOException e) {
221            throw new RuntimeException(e);
222        }
223    }
224
225    @Override
226    public void workCompleted(String queueId, Work work) {
227        try {
228            workSetCompleted(queueId, work);
229        } catch (IOException e) {
230            throw new RuntimeException(e);
231        }
232    }
233
234    @Override
235    public void workReschedule(String queueId, Work work) {
236        try {
237            workSetReschedule(queueId, work);
238        } catch (IOException e) {
239            throw new RuntimeException(e);
240        }
241    }
242
243    @Override
244    public List<Work> listWork(String queueId, State state) {
245        switch (state) {
246        case SCHEDULED:
247            return listScheduled(queueId);
248        case RUNNING:
249            return listRunning(queueId);
250        default:
251            throw new IllegalArgumentException(String.valueOf(state));
252        }
253    }
254
255    @Override
256    public List<String> listWorkIds(String queueId, State state) {
257        if (state == null) {
258            return listNonCompletedIds(queueId);
259        }
260        switch (state) {
261        case SCHEDULED:
262            return listScheduledIds(queueId);
263        case RUNNING:
264            return listRunningIds(queueId);
265        default:
266            throw new IllegalArgumentException(String.valueOf(state));
267        }
268    }
269
270    protected List<Work> listScheduled(String queueId) {
271        try {
272            return listWorkList(queuedKey(queueId));
273        } catch (IOException e) {
274            throw new RuntimeException(e);
275        }
276    }
277
278    protected List<Work> listRunning(String queueId) {
279        try {
280            return listWorkSet(runningKey(queueId));
281        } catch (IOException e) {
282            throw new RuntimeException(e);
283        }
284    }
285
286    protected List<String> listScheduledIds(String queueId) {
287        try {
288            return listWorkIdsList(queuedKey(queueId));
289        } catch (IOException e) {
290            throw new RuntimeException(e);
291        }
292    }
293
294    protected List<String> listRunningIds(String queueId) {
295        try {
296            return listWorkIdsSet(runningKey(queueId));
297        } catch (IOException e) {
298            throw new RuntimeException(e);
299        }
300    }
301
302    protected List<String> listNonCompletedIds(String queueId) {
303        List<String> list = listScheduledIds(queueId);
304        list.addAll(listRunningIds(queueId));
305        return list;
306    }
307
308    @Override
309    public long count(String queueId, State state) {
310        switch (state) {
311        case SCHEDULED:
312            return metrics(queueId).scheduled.longValue();
313        case RUNNING:
314            return metrics(queueId).running.longValue();
315        default:
316            throw new IllegalArgumentException(String.valueOf(state));
317        }
318    }
319
320    @Override
321    public Work find(String workId, State state) {
322        if (isWorkInState(workId, state)) {
323            return getWork(bytes(workId));
324        }
325        return null;
326    }
327
328    @Override
329    public boolean isWorkInState(String workId, State state) {
330        State s = getWorkState(workId);
331        if (state == null) {
332            return s == State.SCHEDULED || s == State.RUNNING;
333        }
334        return s == state;
335    }
336
337    @Override
338    public void removeScheduled(String queueId, String workId) {
339        try {
340            removeScheduledWork(queueId, workId);
341        } catch (IOException cause) {
342            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
343        }
344    }
345
346    @Override
347    public State getWorkState(String workId) {
348        return getWorkStateInfo(workId);
349    }
350
351    @Override
352    public void setActive(String queueId, boolean value) {
353        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
354        if (value) {
355            listener.queueActivated(metrics);
356        } else {
357            listener.queueDeactivated(metrics);
358        }
359    }
360
361    /*
362     * ******************** Redis Interface ********************
363     */
364
365    protected String string(byte[] bytes) {
366        try {
367            return new String(bytes, UTF_8);
368        } catch (IOException e) {
369            throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e);
370        }
371    }
372
373    protected byte[] bytes(String string) {
374        try {
375            return string.getBytes(UTF_8);
376        } catch (IOException e) {
377            throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e);
378        }
379    }
380
381    protected byte[] bytes(Work.State state) {
382        switch (state) {
383        case SCHEDULED:
384            return STATE_SCHEDULED;
385        case RUNNING:
386            return STATE_RUNNING;
387        default:
388            return STATE_UNKNOWN;
389        }
390    }
391
392    protected static String key(String... names) {
393        return String.join(":", names);
394    }
395
396    protected byte[] keyBytes(String prefix, String queueId) {
397        return keyBytes(key(prefix, queueId));
398    }
399
400    protected byte[] keyBytes(String prefix) {
401        return bytes(redisNamespace.concat(prefix));
402    }
403
404    protected byte[] workId(Work work) {
405        return workId(work.getId());
406    }
407
408    protected byte[] workId(String id) {
409        return bytes(id);
410    }
411
412    protected byte[] suspendedKey(String queueId) {
413        return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId));
414    }
415
416    protected byte[] queuedKey(String queueId) {
417        return keyBytes(key(KEY_QUEUE_PREFIX, queueId));
418    }
419
420    protected byte[] countKey(String queueId) {
421        return keyBytes(key(KEY_COUNT_PREFIX, queueId));
422    }
423
424    protected byte[] runningKey(String queueId) {
425        return keyBytes(key(KEY_RUNNING_PREFIX, queueId));
426    }
427
428    protected byte[] scheduledKey(String queueId) {
429        return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId));
430    }
431
432    protected byte[] completedKey(String queueId) {
433        return keyBytes(key(KEY_COMPLETED_PREFIX, queueId));
434    }
435
436    protected byte[] canceledKey(String queueId) {
437        return keyBytes(key(KEY_CANCELED_PREFIX, queueId));
438    }
439
440    protected byte[] stateKey() {
441        return keyBytes(KEY_STATE);
442    }
443
444    protected byte[] dataKey() {
445        return keyBytes(KEY_DATA);
446    }
447
448    protected byte[] serializeWork(Work work) throws IOException {
449        ByteArrayOutputStream baout = new ByteArrayOutputStream();
450        ObjectOutputStream out = new ObjectOutputStream(baout);
451        out.writeObject(work);
452        out.flush();
453        out.close();
454        return baout.toByteArray();
455    }
456
457    protected Work deserializeWork(byte[] workBytes) {
458        if (workBytes == null) {
459            return null;
460        }
461        InputStream bain = new ByteArrayInputStream(workBytes);
462        try (ObjectInputStream in = new ObjectInputStream(bain)) {
463            return (Work) in.readObject();
464        } catch (RuntimeException cause) {
465            throw cause;
466        } catch (IOException | ClassNotFoundException cause) {
467            throw new RuntimeException("Cannot deserialize work", cause);
468        }
469    }
470
471    /**
472     * Finds which queues have suspended work.
473     *
474     * @return a set of queue ids
475     * @since 5.8
476     */
477    protected Set<String> getSuspendedQueueIds() throws IOException {
478        return getQueueIds(KEY_SUSPENDED_PREFIX);
479    }
480
481    protected Set<String> getScheduledQueueIds() {
482        return getQueueIds(KEY_QUEUE_PREFIX);
483    }
484
485    protected Set<String> getRunningQueueIds() {
486        return getQueueIds(KEY_RUNNING_PREFIX);
487    }
488
489    /**
490     * Finds which queues have work for a given state prefix.
491     *
492     * @return a set of queue ids
493     * @since 5.8
494     */
495    protected Set<String> getQueueIds(final String queuePrefix) {
496        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() {
497            @Override
498            public Set<String> call(Jedis jedis) {
499                int offset = keyBytes(queuePrefix).length;
500                Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*")));
501                Set<String> queueIds = new HashSet<String>(keys.size());
502                for (byte[] bytes : keys) {
503                    try {
504                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
505                        queueIds.add(queueId);
506                    } catch (IOException e) {
507                        throw new NuxeoException(e);
508                    }
509                }
510                return queueIds;
511            }
512
513        });
514    }
515
516    /**
517     * Resumes all suspended work instances by moving them to the scheduled queue.
518     *
519     * @param queueId the queue id
520     * @return the number of work instances scheduled
521     * @since 5.8
522     */
523    public int scheduleSuspendedWork(final String queueId) throws IOException {
524        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
525            @Override
526            public Integer call(Jedis jedis) {
527                for (int n = 0;; n++) {
528                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
529                    if (workIdBytes == null) {
530                        return Integer.valueOf(n);
531                    }
532                }
533            }
534
535        }).intValue();
536    }
537
538    /**
539     * Suspends all scheduled work instances by moving them to the suspended queue.
540     *
541     * @param queueId the queue id
542     * @return the number of work instances suspended
543     * @since 5.8
544     */
545    public int suspendScheduledWork(final String queueId) throws IOException {
546        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
547
548            @Override
549            public Integer call(Jedis jedis) {
550                for (int n = 0;; n++) {
551                    byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
552                    if (workIdBytes == null) {
553                        return n;
554                    }
555                }
556            }
557        }).intValue();
558    }
559
560    @Override
561    public WorkQueueMetrics metrics(String queueId) {
562        return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList()));
563    }
564
565    WorkQueueMetrics metrics(String queueId, Number[] counters) {
566        return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]);
567    }
568
569    /**
570     * Persists a work instance and adds it to the scheduled queue.
571     *
572     * @param queueId the queue id
573     * @param work the work instance
574     * @throws IOException
575     */
576    public void workSetScheduled(final String queueId, Work work) throws IOException {
577        listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true))));
578    }
579
580    /**
581     * Switches a work to state completed, and saves its new state.
582     */
583    protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException {
584        listener.queueChanged(work,
585                metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true))));
586    }
587
588    /**
589     * Switches a work to state running.
590     *
591     * @param queueId the queue id
592     * @param work the work
593     */
594    protected void workSetRunning(final String queueId, Work work) throws IOException {
595        listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true))));
596    }
597
598    /**
599     * Switches a work to state completed, and saves its new state.
600     */
601    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
602        listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false))));
603    }
604
605    /**
606     * Switches a work to state canceled, and saves its new state.
607     */
608    protected void workSetReschedule(final String queueId, final Work work) throws IOException {
609        listener.queueChanged(work,
610                metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true))));
611    }
612
613    protected List<byte[]> keys(String queueid) {
614        return Arrays.asList(dataKey(),
615                stateKey(),
616                countKey(queueid),
617                scheduledKey(queueid),
618                queuedKey(queueid),
619                runningKey(queueid),
620                completedKey(queueid),
621                canceledKey(queueid));
622    }
623
624    protected List<byte[]> args(String workId) throws IOException {
625        return Arrays.asList(workId(workId));
626    }
627
628    protected List<byte[]> args(Work work, boolean serialize) throws IOException {
629        List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState()));
630        if (serialize) {
631            args = new ArrayList<>(args);
632            args.add(serializeWork(work));
633        }
634        return args;
635    }
636
637    /**
638     * Gets the work state.
639     *
640     * @param workId the work id
641     * @return the state, or {@code null} if not found
642     */
643    protected State getWorkStateInfo(final String workId) {
644        final byte[] workIdBytes = bytes(workId);
645        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() {
646            @Override
647            public State call(Jedis jedis) {
648                // get state
649                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
650                if (bytes == null || bytes.length == 0) {
651                    return null;
652                }
653                switch (bytes[0]) {
654                case STATE_SCHEDULED_B:
655                    return State.SCHEDULED;
656                case STATE_RUNNING_B:
657                    return State.RUNNING;
658                default:
659                    String msg;
660                    try {
661                        msg = new String(bytes, UTF_8);
662                    } catch (UnsupportedEncodingException e) {
663                        msg = Arrays.toString(bytes);
664                    }
665                    log.error("Unknown work state: " + msg + ", work: " + workId);
666                    return null;
667                }
668            }
669        });
670    }
671
672    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
673        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
674
675            @Override
676            public List<String> call(Jedis jedis) {
677                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
678                List<String> list = new ArrayList<String>(keys.size());
679                for (byte[] workIdBytes : keys) {
680                    list.add(string(workIdBytes));
681                }
682                return list;
683            }
684
685        });
686    }
687
688    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
689        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
690
691            @Override
692            public List<String> call(Jedis jedis) {
693
694                Set<byte[]> keys = jedis.smembers(queueBytes);
695                List<String> list = new ArrayList<String>(keys.size());
696                for (byte[] workIdBytes : keys) {
697                    list.add(string(workIdBytes));
698                }
699                return list;
700            }
701
702        });
703
704    }
705
706    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
707        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
708            @Override
709            public List<Work> call(Jedis jedis) {
710                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
711                List<Work> list = new ArrayList<Work>(keys.size());
712                for (byte[] workIdBytes : keys) {
713                    // get data
714                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
715                    Work work = deserializeWork(workBytes);
716                    list.add(work);
717                }
718                return list;
719            }
720        });
721    }
722
723    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
724        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
725            @Override
726            public List<Work> call(Jedis jedis) {
727                Set<byte[]> keys = jedis.smembers(queueBytes);
728                List<Work> list = new ArrayList<Work>(keys.size());
729                for (byte[] workIdBytes : keys) {
730                    // get data
731                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
732                    Work work = deserializeWork(workBytes);
733                    list.add(work);
734                }
735                return list;
736            }
737        });
738    }
739
740    protected Work getWork(byte[] workIdBytes) {
741        try {
742            return getWorkData(workIdBytes);
743        } catch (IOException e) {
744            throw new RuntimeException(e);
745        }
746    }
747
748    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
749        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() {
750
751            @Override
752            public Work call(Jedis jedis) {
753                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
754                return deserializeWork(workBytes);
755            }
756
757        });
758    }
759
760    /**
761     * Removes first work from work queue.
762     *
763     * @param queueId the queue id
764     * @return the work, or {@code null} if the scheduled queue is empty
765     */
766    protected Work getWorkFromQueue(final String queueId) throws IOException {
767        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
768        List<byte[]> keys = keys(queueId);
769        List<byte[]> args = Collections.singletonList(STATE_RUNNING);
770        Object id = redisExecutor.evalsha(popWorkSha, keys, args);
771        if (id == null) {
772            return null;
773        }
774        if (id instanceof String) {
775            id = bytes((String) id);
776        }
777        byte[] workIdBytes = (byte[]) id;
778        byte[] workBytes = redisExecutor.execute(jedis -> jedis.hget(dataKey(), workIdBytes));
779        return deserializeWork(workBytes);
780    }
781
782    /**
783     * Removes a given work from queue, move the work from scheduled to completed set.
784     *
785     * @throws IOException
786     */
787    protected void removeScheduledWork(final String queueId, final String workId) throws IOException {
788        evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId));
789    }
790
791    /**
792     * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
793     * not support SSCAN.
794     *
795     * @since 7.3
796     */
797    public static class SScanner {
798
799        // results of SMEMBERS for last key, in embedded mode
800        protected List<String> smembers;
801
802        protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) {
803            ScanResult<String> scanResult;
804            try {
805                scanResult = jedis.sscan(key, cursor, params);
806            } catch (Exception e) {
807                // when testing with embedded fake redis, we may get an un-declared exception
808                if (!(e.getCause() instanceof NoSuchMethodException)) {
809                    throw e;
810                }
811                // no SSCAN available in embedded, do a full SMEMBERS
812                if (smembers == null) {
813                    Set<String> set = jedis.smembers(key);
814                    smembers = new ArrayList<>(set);
815                }
816                Collection<byte[]> bparams = params.getParams();
817                int count = 1000;
818                for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) {
819                    byte[] param = it.next();
820                    if (param.equals(Protocol.Keyword.MATCH.raw)) {
821                        throw new UnsupportedOperationException("MATCH not supported");
822                    }
823                    if (param.equals(Protocol.Keyword.COUNT.raw)) {
824                        count = Integer.parseInt(SafeEncoder.encode(it.next()));
825                    }
826                }
827                int pos = Integer.parseInt(cursor); // don't check range, callers are cool
828                int end = Math.min(pos + count, smembers.size());
829                int nextPos = end == smembers.size() ? 0 : end;
830                scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end));
831                if (nextPos == 0) {
832                    smembers = null;
833                }
834            }
835            return scanResult;
836        }
837    }
838
839    Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException {
840        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
841        List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args);
842        return coerceNullToZero(numbers);
843    }
844
845    protected static Number[] coerceNullToZero(List<Number> numbers) {
846        Number[] counters = numbers.toArray(new Number[numbers.size()]);
847        for (int i = 0; i < counters.length; ++i) {
848            if (counters[i] == null) {
849                counters[i] = 0;
850            }
851        }
852        return counters;
853    }
854
855    @Override
856    public void listen(Listener listener) {
857        this.listener = listener;
858
859    }
860
861}