001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.core.redis.RedisAdmin;
043import org.nuxeo.ecm.core.redis.RedisCallable;
044import org.nuxeo.ecm.core.redis.RedisExecutor;
045import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
046import org.nuxeo.ecm.core.work.WorkHolder;
047import org.nuxeo.ecm.core.work.WorkQueuing;
048import org.nuxeo.ecm.core.work.api.Work;
049import org.nuxeo.ecm.core.work.api.Work.State;
050import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
051import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
052import org.nuxeo.runtime.api.Framework;
053
054import redis.clients.jedis.Jedis;
055import redis.clients.jedis.Protocol;
056import redis.clients.jedis.ScanParams;
057import redis.clients.jedis.ScanResult;
058import redis.clients.jedis.exceptions.JedisException;
059import redis.clients.util.SafeEncoder;
060
061/**
062 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
063 *
064 * @since 5.8
065 */
066public class RedisWorkQueuing implements WorkQueuing {
067
068    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
069
070    protected static final String UTF_8 = "UTF-8";
071
072    /**
073     * Global hash of Work instance id -> serialoized Work instance.
074     */
075    protected static final String KEY_DATA = "data";
076
077    /**
078     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
079     * a completion time in milliseconds.
080     */
081    protected static final String KEY_STATE = "state";
082
083    /**
084     * Per-queue list of suspended Work instance ids.
085     */
086    protected static final String KEY_SUSPENDED_PREFIX = "prev";
087
088    protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes();
089
090    /**
091     * Per-queue list of scheduled Work instance ids.
092     */
093    protected static final String KEY_QUEUE_PREFIX = "queue";
094
095    protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes();
096
097    /**
098     * Per-queue set of scheduled Work instance ids.
099     */
100    protected static final String KEY_SCHEDULED_PREFIX = "sched";
101
102    protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes();
103
104    /**
105     * Per-queue set of running Work instance ids.
106     */
107    protected static final String KEY_RUNNING_PREFIX = "run";
108
109    protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes();
110
111    /**
112     * Per-queue set of counters.
113     */
114    protected static final String KEY_COMPLETED_PREFIX = "done";
115
116    protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes();
117
118    protected static final String KEY_CANCELED_PREFIX = "cancel";
119
120    protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes();
121
122    protected static final String KEY_COUNT_PREFIX = "count";
123
124    protected static final byte STATE_SCHEDULED_B = 'Q';
125
126    protected static final byte STATE_RUNNING_B = 'R';
127
128    protected static final byte STATE_RUNNING_C = 'C';
129
130    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
131
132    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
133
134    protected static final byte[] STATE_UNKNOWN = new byte[0];
135
136    protected Listener listener;
137
138    protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>();
139
140    protected String redisNamespace;
141
142    // lua scripts
143    protected byte[] initWorkQueueSha;
144
145    protected byte[] metricsWorkQueueSha;
146
147    protected byte[] schedulingWorkSha;
148
149    protected byte[] popWorkSha;
150
151    protected byte[] runningWorkSha;
152
153    protected byte[] cancelledScheduledWorkSha;
154
155    protected byte[] completedWorkSha;
156
157    protected byte[] cancelledRunningWorkSha;
158
159    public RedisWorkQueuing(Listener listener) {
160        this.listener = listener;
161        loadConfig();
162    }
163
164    void loadConfig() {
165        RedisAdmin admin = Framework.getService(RedisAdmin.class);
166        redisNamespace = admin.namespace("work");
167        try {
168            initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes();
169            metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes();
170            schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes();
171            popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes();
172            runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes();
173            cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes();
174            completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes();
175            cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes();
176        } catch (IOException e) {
177            throw new RuntimeException("Cannot load LUA scripts", e);
178        }
179    }
180
181    @Override
182    public NuxeoBlockingQueue init(WorkQueueDescriptor config) {
183        evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList());
184        RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this);
185        allQueued.put(config.id, queue);
186        return queue;
187    }
188
189    @Override
190    public NuxeoBlockingQueue getQueue(String queueId) {
191        return allQueued.get(queueId);
192    }
193
194    @Override
195    public void workSchedule(String queueId, Work work) {
196        getQueue(queueId).offer(new WorkHolder(work));
197    }
198
199    @Override
200    public void workRunning(String queueId, Work work) {
201        try {
202            workSetRunning(queueId, work);
203        } catch (IOException e) {
204            throw new RuntimeException(e);
205        }
206    }
207
208    @Override
209    public void workCanceled(String queueId, Work work) {
210        try {
211            workSetCancelledScheduled(queueId, work);
212        } catch (IOException e) {
213            throw new RuntimeException(e);
214        }
215    }
216
217    @Override
218    public void workCompleted(String queueId, Work work) {
219        try {
220            workSetCompleted(queueId, work);
221        } catch (IOException e) {
222            throw new RuntimeException(e);
223        }
224    }
225
226    @Override
227    public void workReschedule(String queueId, Work work) {
228        try {
229            workSetReschedule(queueId, work);
230        } catch (IOException e) {
231            throw new RuntimeException(e);
232        }
233    }
234
235    @Override
236    public List<Work> listWork(String queueId, State state) {
237        switch (state) {
238        case SCHEDULED:
239            return listScheduled(queueId);
240        case RUNNING:
241            return listRunning(queueId);
242        default:
243            throw new IllegalArgumentException(String.valueOf(state));
244        }
245    }
246
247    @Override
248    public List<String> listWorkIds(String queueId, State state) {
249        if (state == null) {
250            return listNonCompletedIds(queueId);
251        }
252        switch (state) {
253        case SCHEDULED:
254            return listScheduledIds(queueId);
255        case RUNNING:
256            return listRunningIds(queueId);
257        default:
258            throw new IllegalArgumentException(String.valueOf(state));
259        }
260    }
261
262    protected List<Work> listScheduled(String queueId) {
263        try {
264            return listWorkList(queuedKey(queueId));
265        } catch (IOException e) {
266            throw new RuntimeException(e);
267        }
268    }
269
270    protected List<Work> listRunning(String queueId) {
271        try {
272            return listWorkSet(runningKey(queueId));
273        } catch (IOException e) {
274            throw new RuntimeException(e);
275        }
276    }
277
278    protected List<String> listScheduledIds(String queueId) {
279        try {
280            return listWorkIdsList(queuedKey(queueId));
281        } catch (IOException e) {
282            throw new RuntimeException(e);
283        }
284    }
285
286    protected List<String> listRunningIds(String queueId) {
287        try {
288            return listWorkIdsSet(runningKey(queueId));
289        } catch (IOException e) {
290            throw new RuntimeException(e);
291        }
292    }
293
294    protected List<String> listNonCompletedIds(String queueId) {
295        List<String> list = listScheduledIds(queueId);
296        list.addAll(listRunningIds(queueId));
297        return list;
298    }
299
300    @Override
301    public long count(String queueId, State state) {
302        switch (state) {
303        case SCHEDULED:
304            return metrics(queueId).scheduled.longValue();
305        case RUNNING:
306            return metrics(queueId).running.longValue();
307        default:
308            throw new IllegalArgumentException(String.valueOf(state));
309        }
310    }
311
312    @Override
313    public Work find(String workId, State state) {
314        if (isWorkInState(workId, state)) {
315            return getWork(bytes(workId));
316        }
317        return null;
318    }
319
320    @Override
321    public boolean isWorkInState(String workId, State state) {
322        State s = getWorkState(workId);
323        if (state == null) {
324            return s == State.SCHEDULED || s == State.RUNNING;
325        }
326        return s == state;
327    }
328
329    @Override
330    public void removeScheduled(String queueId, String workId) {
331        try {
332            removeScheduledWork(queueId, workId);
333        } catch (IOException cause) {
334            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
335        }
336    }
337
338    @Override
339    public State getWorkState(String workId) {
340        return getWorkStateInfo(workId);
341    }
342
343    @Override
344    public void setActive(String queueId, boolean value) {
345        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
346        if (value) {
347            listener.queueActivated(metrics);
348        } else {
349            listener.queueDeactivated(metrics);
350        }
351    }
352
353    /*
354     * ******************** Redis Interface ********************
355     */
356
357    protected String string(byte[] bytes) {
358        try {
359            return new String(bytes, UTF_8);
360        } catch (IOException e) {
361            throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e);
362        }
363    }
364
365    protected byte[] bytes(String string) {
366        try {
367            return string.getBytes(UTF_8);
368        } catch (IOException e) {
369            throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e);
370        }
371    }
372
373    protected byte[] bytes(Work.State state) {
374        switch (state) {
375        case SCHEDULED:
376            return STATE_SCHEDULED;
377        case RUNNING:
378            return STATE_RUNNING;
379        default:
380            return STATE_UNKNOWN;
381        }
382    }
383
384    protected static String key(String... names) {
385        return String.join(":", names);
386    }
387
388    protected byte[] keyBytes(String prefix, String queueId) {
389        return keyBytes(key(prefix, queueId));
390    }
391
392    protected byte[] keyBytes(String prefix) {
393        return bytes(redisNamespace.concat(prefix));
394    }
395
396    protected byte[] workId(Work work) {
397        return workId(work.getId());
398    }
399
400    protected byte[] workId(String id) {
401        return bytes(id);
402    }
403
404    protected byte[] suspendedKey(String queueId) {
405        return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId));
406    }
407
408    protected byte[] queuedKey(String queueId) {
409        return keyBytes(key(KEY_QUEUE_PREFIX, queueId));
410    }
411
412    protected byte[] countKey(String queueId) {
413        return keyBytes(key(KEY_COUNT_PREFIX, queueId));
414    }
415
416    protected byte[] runningKey(String queueId) {
417        return keyBytes(key(KEY_RUNNING_PREFIX, queueId));
418    }
419
420    protected byte[] scheduledKey(String queueId) {
421        return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId));
422    }
423
424    protected byte[] completedKey(String queueId) {
425        return keyBytes(key(KEY_COMPLETED_PREFIX, queueId));
426    }
427
428    protected byte[] canceledKey(String queueId) {
429        return keyBytes(key(KEY_CANCELED_PREFIX, queueId));
430    }
431
432    protected byte[] stateKey() {
433        return keyBytes(KEY_STATE);
434    }
435
436    protected byte[] dataKey() {
437        return keyBytes(KEY_DATA);
438    }
439
440    protected byte[] serializeWork(Work work) throws IOException {
441        ByteArrayOutputStream baout = new ByteArrayOutputStream();
442        ObjectOutputStream out = new ObjectOutputStream(baout);
443        out.writeObject(work);
444        out.flush();
445        out.close();
446        return baout.toByteArray();
447    }
448
449    protected Work deserializeWork(byte[] workBytes) {
450        if (workBytes == null) {
451            return null;
452        }
453        InputStream bain = new ByteArrayInputStream(workBytes);
454        try (ObjectInputStream in = new ObjectInputStream(bain)) {
455            return (Work) in.readObject();
456        } catch (RuntimeException cause) {
457            throw cause;
458        } catch (IOException | ClassNotFoundException cause) {
459            throw new RuntimeException("Cannot deserialize work", cause);
460        }
461    }
462
463    /**
464     * Finds which queues have suspended work.
465     *
466     * @return a set of queue ids
467     * @since 5.8
468     */
469    protected Set<String> getSuspendedQueueIds() throws IOException {
470        return getQueueIds(KEY_SUSPENDED_PREFIX);
471    }
472
473    protected Set<String> getScheduledQueueIds() {
474        return getQueueIds(KEY_QUEUE_PREFIX);
475    }
476
477    protected Set<String> getRunningQueueIds() {
478        return getQueueIds(KEY_RUNNING_PREFIX);
479    }
480
481    /**
482     * Finds which queues have work for a given state prefix.
483     *
484     * @return a set of queue ids
485     * @since 5.8
486     */
487    protected Set<String> getQueueIds(final String queuePrefix) {
488        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() {
489            @Override
490            public Set<String> call(Jedis jedis) {
491                int offset = keyBytes(queuePrefix).length;
492                Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*")));
493                Set<String> queueIds = new HashSet<String>(keys.size());
494                for (byte[] bytes : keys) {
495                    try {
496                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
497                        queueIds.add(queueId);
498                    } catch (IOException e) {
499                        throw new NuxeoException(e);
500                    }
501                }
502                return queueIds;
503            }
504
505        });
506    }
507
508    /**
509     * Resumes all suspended work instances by moving them to the scheduled queue.
510     *
511     * @param queueId the queue id
512     * @return the number of work instances scheduled
513     * @since 5.8
514     */
515    public int scheduleSuspendedWork(final String queueId) throws IOException {
516        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
517            @Override
518            public Integer call(Jedis jedis) {
519                for (int n = 0;; n++) {
520                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
521                    if (workIdBytes == null) {
522                        return Integer.valueOf(n);
523                    }
524                }
525            }
526
527        }).intValue();
528    }
529
530    /**
531     * Suspends all scheduled work instances by moving them to the suspended queue.
532     *
533     * @param queueId the queue id
534     * @return the number of work instances suspended
535     * @since 5.8
536     */
537    public int suspendScheduledWork(final String queueId) throws IOException {
538        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
539
540            @Override
541            public Integer call(Jedis jedis) {
542                for (int n = 0;; n++) {
543                    byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
544                    if (workIdBytes == null) {
545                        return n;
546                    }
547                }
548            }
549        }).intValue();
550    }
551
552    @Override
553    public WorkQueueMetrics metrics(String queueId) {
554        return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList()));
555    }
556
557    WorkQueueMetrics metrics(String queueId, Number[] counters) {
558        return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]);
559    }
560
561    /**
562     * Persists a work instance and adds it to the scheduled queue.
563     *
564     * @param queueId the queue id
565     * @param work the work instance
566     * @throws IOException
567     */
568    public void workSetScheduled(final String queueId, Work work) throws IOException {
569        listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true))));
570    }
571
572    /**
573     * Switches a work to state completed, and saves its new state.
574     */
575    protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException {
576        listener.queueChanged(work,
577                metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true))));
578    }
579
580    /**
581     * Switches a work to state running.
582     *
583     * @param queueId the queue id
584     * @param work the work
585     */
586    protected void workSetRunning(final String queueId, Work work) throws IOException {
587        listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true))));
588    }
589
590    /**
591     * Switches a work to state completed, and saves its new state.
592     */
593    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
594        listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false))));
595    }
596
597    /**
598     * Switches a work to state canceled, and saves its new state.
599     */
600    protected void workSetReschedule(final String queueId, final Work work) throws IOException {
601        listener.queueChanged(work,
602                metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true))));
603    }
604
605    protected List<byte[]> keys(String queueid) {
606        return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid),
607                runningKey(queueid), completedKey(queueid), canceledKey(queueid));
608    }
609
610    protected List<byte[]> args(String workId) throws IOException {
611        return Arrays.asList(workId(workId));
612    }
613
614    protected List<byte[]> args(Work work, boolean serialize) throws IOException {
615        List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState()));
616        if (serialize) {
617            args = new ArrayList<>(args);
618            args.add(serializeWork(work));
619        }
620        return args;
621    }
622
623    /**
624     * Gets the work state.
625     *
626     * @param workId the work id
627     * @return the state, or {@code null} if not found
628     */
629    protected State getWorkStateInfo(final String workId) {
630        final byte[] workIdBytes = bytes(workId);
631        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() {
632            @Override
633            public State call(Jedis jedis) {
634                // get state
635                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
636                if (bytes == null || bytes.length == 0) {
637                    return null;
638                }
639                switch (bytes[0]) {
640                case STATE_SCHEDULED_B:
641                    return State.SCHEDULED;
642                case STATE_RUNNING_B:
643                    return State.RUNNING;
644                default:
645                    String msg;
646                    try {
647                        msg = new String(bytes, UTF_8);
648                    } catch (UnsupportedEncodingException e) {
649                        msg = Arrays.toString(bytes);
650                    }
651                    log.error("Unknown work state: " + msg + ", work: " + workId);
652                    return null;
653                }
654            }
655        });
656    }
657
658    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
659        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
660
661            @Override
662            public List<String> call(Jedis jedis) {
663                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
664                List<String> list = new ArrayList<String>(keys.size());
665                for (byte[] workIdBytes : keys) {
666                    list.add(string(workIdBytes));
667                }
668                return list;
669            }
670
671        });
672    }
673
674    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
675        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
676
677            @Override
678            public List<String> call(Jedis jedis) {
679
680                Set<byte[]> keys = jedis.smembers(queueBytes);
681                List<String> list = new ArrayList<String>(keys.size());
682                for (byte[] workIdBytes : keys) {
683                    list.add(string(workIdBytes));
684                }
685                return list;
686            }
687
688        });
689
690    }
691
692    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
693        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
694            @Override
695            public List<Work> call(Jedis jedis) {
696                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
697                List<Work> list = new ArrayList<Work>(keys.size());
698                for (byte[] workIdBytes : keys) {
699                    // get data
700                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
701                    Work work = deserializeWork(workBytes);
702                    list.add(work);
703                }
704                return list;
705            }
706        });
707    }
708
709    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
710        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
711            @Override
712            public List<Work> call(Jedis jedis) {
713                Set<byte[]> keys = jedis.smembers(queueBytes);
714                List<Work> list = new ArrayList<Work>(keys.size());
715                for (byte[] workIdBytes : keys) {
716                    // get data
717                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
718                    Work work = deserializeWork(workBytes);
719                    list.add(work);
720                }
721                return list;
722            }
723        });
724    }
725
726    protected Work getWork(byte[] workIdBytes) {
727        try {
728            return getWorkData(workIdBytes);
729        } catch (IOException e) {
730            throw new RuntimeException(e);
731        }
732    }
733
734    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
735        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() {
736
737            @Override
738            public Work call(Jedis jedis) {
739                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
740                return deserializeWork(workBytes);
741            }
742
743        });
744    }
745
746    /**
747     * Removes first work from work queue.
748     *
749     * @param queueId the queue id
750     * @return the work, or {@code null} if the scheduled queue is empty
751     */
752    protected Work getWorkFromQueue(final String queueId) throws IOException {
753        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
754        List<byte[]> keys = keys(queueId);
755        List<byte[]> args = Collections.singletonList(STATE_RUNNING);
756        List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args);
757        if (result == null) {
758            return null;
759        }
760
761        List<Number> numbers = (List<Number>) result.get(0);
762        WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers));
763        Object bytes = result.get(1);
764        if (bytes instanceof String) {
765            bytes = bytes((String) bytes);
766        }
767        Work work = deserializeWork((byte[]) bytes);
768
769        listener.queueChanged(work, metrics);
770
771        return work;
772    }
773
774    /**
775     * Removes a given work from queue, move the work from scheduled to completed set.
776     *
777     * @throws IOException
778     */
779    protected void removeScheduledWork(final String queueId, final String workId) throws IOException {
780        evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId));
781    }
782
783    /**
784     * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
785     * not support SSCAN.
786     *
787     * @since 7.3
788     */
789    public static class SScanner {
790
791        // results of SMEMBERS for last key, in embedded mode
792        protected List<String> smembers;
793
794        protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) {
795            ScanResult<String> scanResult;
796            try {
797                scanResult = jedis.sscan(key, cursor, params);
798            } catch (Exception e) {
799                // when testing with embedded fake redis, we may get an un-declared exception
800                if (!(e.getCause() instanceof NoSuchMethodException)) {
801                    throw e;
802                }
803                // no SSCAN available in embedded, do a full SMEMBERS
804                if (smembers == null) {
805                    Set<String> set = jedis.smembers(key);
806                    smembers = new ArrayList<>(set);
807                }
808                Collection<byte[]> bparams = params.getParams();
809                int count = 1000;
810                for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) {
811                    byte[] param = it.next();
812                    if (param.equals(Protocol.Keyword.MATCH.raw)) {
813                        throw new UnsupportedOperationException("MATCH not supported");
814                    }
815                    if (param.equals(Protocol.Keyword.COUNT.raw)) {
816                        count = Integer.parseInt(SafeEncoder.encode(it.next()));
817                    }
818                }
819                int pos = Integer.parseInt(cursor); // don't check range, callers are cool
820                int end = Math.min(pos + count, smembers.size());
821                int nextPos = end == smembers.size() ? 0 : end;
822                scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end));
823                if (nextPos == 0) {
824                    smembers = null;
825                }
826            }
827            return scanResult;
828        }
829    }
830
831    Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException {
832        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
833        List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args);
834        return coerceNullToZero(numbers);
835    }
836
837    protected static Number[] coerceNullToZero(List<Number> numbers) {
838        return coerceNullToZero(numbers.toArray(new Number[numbers.size()]));
839    }
840
841    protected static Number[] coerceNullToZero(Number[] counters) {
842        for (int i = 0; i < counters.length; ++i) {
843            if (counters[i] == null) {
844                counters[i] = 0;
845            }
846        }
847        return counters;
848    }
849
850    @Override
851    public void listen(Listener listener) {
852        this.listener = listener;
853
854    }
855
856}