001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.api.NuxeoException;
041import org.nuxeo.ecm.core.redis.RedisAdmin;
042import org.nuxeo.ecm.core.redis.RedisCallable;
043import org.nuxeo.ecm.core.redis.RedisExecutor;
044import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
045import org.nuxeo.ecm.core.work.WorkHolder;
046import org.nuxeo.ecm.core.work.WorkQueuing;
047import org.nuxeo.ecm.core.work.api.Work;
048import org.nuxeo.ecm.core.work.api.Work.State;
049import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
050import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
051import org.nuxeo.runtime.api.Framework;
052
053import redis.clients.jedis.Jedis;
054import redis.clients.jedis.Protocol;
055import redis.clients.jedis.ScanParams;
056import redis.clients.jedis.ScanResult;
057import redis.clients.jedis.exceptions.JedisException;
058import redis.clients.util.SafeEncoder;
059
060/**
061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
062 *
063 * @since 5.8
064 */
065public class RedisWorkQueuing implements WorkQueuing {
066
067    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
068
069    protected static final String UTF_8 = "UTF-8";
070
071    /**
072     * Global hash of Work instance id -> serialoized Work instance.
073     */
074    protected static final String KEY_DATA = "data";
075
076    /**
077     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
078     * a completion time in milliseconds.
079     */
080    protected static final String KEY_STATE = "state";
081
082    /**
083     * Per-queue list of suspended Work instance ids.
084     */
085    protected static final String KEY_SUSPENDED_PREFIX = "prev";
086
087    protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes();
088
089    /**
090     * Per-queue list of scheduled Work instance ids.
091     */
092    protected static final String KEY_QUEUE_PREFIX = "queue";
093
094    protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes();
095
096    /**
097     * Per-queue set of scheduled Work instance ids.
098     */
099    protected static final String KEY_SCHEDULED_PREFIX = "sched";
100
101    protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes();
102
103    /**
104     * Per-queue set of running Work instance ids.
105     */
106    protected static final String KEY_RUNNING_PREFIX = "run";
107
108    protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes();
109
110    /**
111     * Per-queue set of counters.
112     */
113    protected static final String KEY_COMPLETED_PREFIX = "done";
114
115    protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes();
116
117    protected static final String KEY_CANCELED_PREFIX = "cancel";
118
119    protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes();
120
121    protected static final String KEY_COUNT_PREFIX = "count";
122
123    protected static final byte STATE_SCHEDULED_B = 'Q';
124
125    protected static final byte STATE_RUNNING_B = 'R';
126
127    protected static final byte STATE_RUNNING_C = 'C';
128
129    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
130
131    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
132
133    protected static final byte[] STATE_UNKNOWN = new byte[0];
134
135    protected Listener listener;
136
137    protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>();
138
139    protected String redisNamespace;
140
141    // lua scripts
142    protected byte[] initWorkQueueSha;
143
144    protected byte[] metricsWorkQueueSha;
145
146    protected byte[] schedulingWorkSha;
147
148    protected byte[] runningWorkSha;
149
150    protected byte[] cancelledScheduledWorkSha;
151
152    protected byte[] completedWorkSha;
153
154    protected byte[] cancelledRunningWorkSha;
155
156    public RedisWorkQueuing(Listener listener) {
157        this.listener = listener;
158        loadConfig();
159    }
160
161    void loadConfig() {
162        RedisAdmin admin = Framework.getService(RedisAdmin.class);
163        redisNamespace = admin.namespace("work");
164        try {
165            initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue")
166                    .getBytes();
167            metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue")
168                    .getBytes();
169            schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work")
170                    .getBytes();
171            runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work")
172                    .getBytes();
173            cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work")
174                    .getBytes();
175            completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work")
176                    .getBytes();
177            cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work")
178                    .getBytes();
179        } catch (IOException e) {
180            throw new RuntimeException("Cannot load LUA scripts", e);
181        }
182    }
183
184    @Override
185    public NuxeoBlockingQueue init(WorkQueueDescriptor config) {
186        evalSha(metricsWorkQueueSha, keys(config.id), Collections.emptyList());
187        RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this);
188        allQueued.put(config.id, queue);
189        return queue;
190    }
191
192    @Override
193    public NuxeoBlockingQueue getQueue(String queueId) {
194        return allQueued.get(queueId);
195    }
196
197
198    @Override
199    public void workSchedule(String queueId, Work work) {
200        getQueue(queueId).offer(new WorkHolder(work));
201    }
202
203    @Override
204    public void workRunning(String queueId, Work work) {
205        try {
206            workSetRunning(queueId, work);
207        } catch (IOException e) {
208            throw new RuntimeException(e);
209        }
210    }
211
212    @Override
213    public void workCanceled(String queueId, Work work) {
214        try {
215            workSetCancelledScheduled(queueId, work);
216        } catch (IOException e) {
217            throw new RuntimeException(e);
218        }
219    }
220
221    @Override
222    public void workCompleted(String queueId, Work work) {
223        try {
224            workSetCompleted(queueId, work);
225        } catch (IOException e) {
226            throw new RuntimeException(e);
227        }
228    }
229
230    @Override
231    public void workReschedule(String queueId, Work work) {
232        try {
233            workSetReschedule(queueId, work);
234        } catch (IOException e) {
235            throw new RuntimeException(e);
236        }
237    }
238
239    @Override
240    public List<Work> listWork(String queueId, State state) {
241        switch (state) {
242        case SCHEDULED:
243            return listScheduled(queueId);
244        case RUNNING:
245            return listRunning(queueId);
246        default:
247            throw new IllegalArgumentException(String.valueOf(state));
248        }
249    }
250
251    @Override
252    public List<String> listWorkIds(String queueId, State state) {
253        if (state == null) {
254            return listNonCompletedIds(queueId);
255        }
256        switch (state) {
257        case SCHEDULED:
258            return listScheduledIds(queueId);
259        case RUNNING:
260            return listRunningIds(queueId);
261        default:
262            throw new IllegalArgumentException(String.valueOf(state));
263        }
264    }
265
266    protected List<Work> listScheduled(String queueId) {
267        try {
268            return listWorkList(queuedKey(queueId));
269        } catch (IOException e) {
270            throw new RuntimeException(e);
271        }
272    }
273
274    protected List<Work> listRunning(String queueId) {
275        try {
276            return listWorkSet(runningKey(queueId));
277        } catch (IOException e) {
278            throw new RuntimeException(e);
279        }
280    }
281
282    protected List<String> listScheduledIds(String queueId) {
283        try {
284            return listWorkIdsList(queuedKey(queueId));
285        } catch (IOException e) {
286            throw new RuntimeException(e);
287        }
288    }
289
290    protected List<String> listRunningIds(String queueId) {
291        try {
292            return listWorkIdsSet(runningKey(queueId));
293        } catch (IOException e) {
294            throw new RuntimeException(e);
295        }
296    }
297
298    protected List<String> listNonCompletedIds(String queueId) {
299        List<String> list = listScheduledIds(queueId);
300        list.addAll(listRunningIds(queueId));
301        return list;
302    }
303
304    @Override
305    public long count(String queueId, State state) {
306        switch (state) {
307        case SCHEDULED:
308            return metrics(queueId).scheduled.longValue();
309        case RUNNING:
310            return metrics(queueId).running.longValue();
311        default:
312            throw new IllegalArgumentException(String.valueOf(state));
313        }
314    }
315
316    @Override
317    public Work find(String workId, State state) {
318        if (isWorkInState(workId, state)) {
319            return getWork(bytes(workId));
320        }
321        return null;
322    }
323
324    @Override
325    public boolean isWorkInState(String workId, State state) {
326        State s = getWorkState(workId);
327        if (state == null) {
328            return s == State.SCHEDULED || s == State.RUNNING;
329        }
330        return s == state;
331    }
332
333    @Override
334    public void removeScheduled(String queueId, String workId) {
335        try {
336            removeScheduledWork(queueId, workId);
337        } catch (IOException cause) {
338            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
339        }
340    }
341
342    @Override
343    public State getWorkState(String workId) {
344        return getWorkStateInfo(workId);
345    }
346
347    @Override
348    public void setActive(String queueId, boolean value) {
349        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
350        if (value) {
351            listener.queueActivated(metrics);
352        } else {
353            listener.queueDeactivated(metrics);
354        }
355    }
356
357    @Override
358    public int setSuspending(String queueId) {
359        try {
360            int n = suspendScheduledWork(queueId);
361            log.info("Suspending " + n + " work instances from queue: " + queueId);
362            allQueued.remove(queueId);
363            return n;
364        } catch (IOException e) {
365            throw new RuntimeException(e);
366        }
367    }
368
369    /*
370     * ******************** Redis Interface ********************
371     */
372
373    protected String string(byte[] bytes) {
374        try {
375            return new String(bytes, UTF_8);
376        } catch (IOException e) {
377            throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e);
378        }
379    }
380
381    protected byte[] bytes(String string) {
382        try {
383            return string.getBytes(UTF_8);
384        } catch (IOException e) {
385            throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e);
386        }
387    }
388
389    protected byte[] bytes(Work.State state) {
390        switch (state) {
391        case SCHEDULED:
392            return STATE_SCHEDULED;
393        case RUNNING:
394            return STATE_RUNNING;
395        default:
396            return STATE_UNKNOWN;
397        }
398    }
399
400    protected static String key(String... names) {
401        return String.join(":", names);
402    }
403
404    protected byte[] keyBytes(String prefix, String queueId) {
405        return keyBytes(key(prefix, queueId));
406    }
407
408    protected byte[] keyBytes(String prefix) {
409        return bytes(redisNamespace.concat(prefix));
410    }
411
412    protected byte[] workId(Work work) {
413        return workId(work.getId());
414    }
415
416    protected byte[] workId(String id) {
417        return bytes(id);
418    }
419
420    protected byte[] suspendedKey(String queueId) {
421        return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId));
422    }
423
424    protected byte[] queuedKey(String queueId) {
425        return keyBytes(key(KEY_QUEUE_PREFIX, queueId));
426    }
427
428    protected byte[] countKey(String queueId) {
429        return keyBytes(key(KEY_COUNT_PREFIX, queueId));
430    }
431
432    protected byte[] runningKey(String queueId) {
433        return keyBytes(key(KEY_RUNNING_PREFIX, queueId));
434    }
435
436    protected byte[] scheduledKey(String queueId) {
437        return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId));
438    }
439
440    protected byte[] completedKey(String queueId) {
441        return keyBytes(key(KEY_COMPLETED_PREFIX, queueId));
442    }
443
444    protected byte[] canceledKey(String queueId) {
445        return keyBytes(key(KEY_CANCELED_PREFIX, queueId));
446    }
447
448    protected byte[] stateKey() {
449        return keyBytes(KEY_STATE);
450    }
451
452    protected byte[] dataKey() {
453        return keyBytes(KEY_DATA);
454    }
455
456    protected byte[] serializeWork(Work work) throws IOException {
457        ByteArrayOutputStream baout = new ByteArrayOutputStream();
458        ObjectOutputStream out = new ObjectOutputStream(baout);
459        out.writeObject(work);
460        out.flush();
461        out.close();
462        return baout.toByteArray();
463    }
464
465    protected Work deserializeWork(byte[] workBytes) {
466        if (workBytes == null) {
467            return null;
468        }
469        InputStream bain = new ByteArrayInputStream(workBytes);
470        try (ObjectInputStream in = new ObjectInputStream(bain)) {
471            return (Work) in.readObject();
472        } catch (RuntimeException cause) {
473            throw cause;
474        } catch (IOException | ClassNotFoundException cause) {
475            throw new RuntimeException("Cannot deserialize work", cause);
476        }
477    }
478
479    /**
480     * Finds which queues have suspended work.
481     *
482     * @return a set of queue ids
483     * @since 5.8
484     */
485    protected Set<String> getSuspendedQueueIds() throws IOException {
486        return getQueueIds(KEY_SUSPENDED_PREFIX);
487    }
488
489    protected Set<String> getScheduledQueueIds() {
490        return getQueueIds(KEY_QUEUE_PREFIX);
491    }
492
493    protected Set<String> getRunningQueueIds() {
494        return getQueueIds(KEY_RUNNING_PREFIX);
495    }
496
497    /**
498     * Finds which queues have work for a given state prefix.
499     *
500     * @return a set of queue ids
501     * @since 5.8
502     */
503    protected Set<String> getQueueIds(final String queuePrefix) {
504        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() {
505            @Override
506            public Set<String> call(Jedis jedis) {
507                int offset = keyBytes(queuePrefix).length;
508                Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*")));
509                Set<String> queueIds = new HashSet<String>(keys.size());
510                for (byte[] bytes : keys) {
511                    try {
512                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
513                        queueIds.add(queueId);
514                    } catch (IOException e) {
515                        throw new NuxeoException(e);
516                    }
517                }
518                return queueIds;
519            }
520
521        });
522    }
523
524    /**
525     * Resumes all suspended work instances by moving them to the scheduled queue.
526     *
527     * @param queueId the queue id
528     * @return the number of work instances scheduled
529     * @since 5.8
530     */
531    public int scheduleSuspendedWork(final String queueId) throws IOException {
532        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
533            @Override
534            public Integer call(Jedis jedis) {
535                for (int n = 0;; n++) {
536                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
537                    if (workIdBytes == null) {
538                        return Integer.valueOf(n);
539                    }
540                }
541            }
542
543        }).intValue();
544    }
545
546    /**
547     * Suspends all scheduled work instances by moving them to the suspended queue.
548     *
549     * @param queueId the queue id
550     * @return the number of work instances suspended
551     * @since 5.8
552     */
553    public int suspendScheduledWork(final String queueId) throws IOException {
554        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
555
556            @Override
557            public Integer call(Jedis jedis) {
558                for (int n = 0;; n++) {
559                    byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
560                    if (workIdBytes == null) {
561                        return n;
562                    }
563                }
564            }
565        }).intValue();
566    }
567
568    @Override
569    public WorkQueueMetrics metrics(String queueId) {
570        return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList()));
571    }
572
573    WorkQueueMetrics metrics(String queueId, Number[] counters) {
574        return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]);
575    }
576
577    /**
578     * Persists a work instance and adds it to the scheduled queue.
579     *
580     * @param queueId the queue id
581     * @param work the work instance
582     * @throws IOException
583     */
584    public void workSetScheduled(final String queueId, Work work) throws IOException {
585        listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true))));
586    }
587
588    /**
589     * Switches a work to state completed, and saves its new state.
590     */
591    protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException {
592        listener.queueChanged(work,
593                metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true))));
594    }
595
596    /**
597     * Switches a work to state running.
598     *
599     * @param queueId the queue id
600     * @param work the work
601     */
602    protected void workSetRunning(final String queueId, Work work) throws IOException {
603        listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true))));
604    }
605
606    /**
607     * Switches a work to state completed, and saves its new state.
608     */
609    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
610        listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false))));
611    }
612
613    /**
614     * Switches a work to state canceled, and saves its new state.
615     */
616    protected void workSetReschedule(final String queueId, final Work work) throws IOException {
617        listener.queueChanged(work,
618                metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true))));
619    }
620
621    protected List<byte[]> keys(String queueid) {
622        return Arrays.asList(dataKey(),
623                stateKey(),
624                countKey(queueid),
625                scheduledKey(queueid),
626                queuedKey(queueid),
627                runningKey(queueid),
628                completedKey(queueid),
629                canceledKey(queueid));
630    }
631
632    protected List<byte[]> args(String workId) throws IOException {
633        return Arrays.asList(workId(workId));
634    }
635
636    protected List<byte[]> args(Work work, boolean serialize) throws IOException {
637        List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState()));
638        if (serialize) {
639            args = new ArrayList<>(args);
640            args.add(serializeWork(work));
641        }
642        return args;
643    }
644
645    /**
646     * Gets the work state.
647     *
648     * @param workId the work id
649     * @return the state, or {@code null} if not found
650     */
651    protected State getWorkStateInfo(final String workId) {
652        final byte[] workIdBytes = bytes(workId);
653        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() {
654            @Override
655            public State call(Jedis jedis) {
656                // get state
657                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
658                if (bytes == null || bytes.length == 0) {
659                    return null;
660                }
661                switch (bytes[0]) {
662                case STATE_SCHEDULED_B:
663                    return State.SCHEDULED;
664                case STATE_RUNNING_B:
665                    return State.RUNNING;
666                default:
667                    String msg;
668                    try {
669                        msg = new String(bytes, UTF_8);
670                    } catch (UnsupportedEncodingException e) {
671                        msg = Arrays.toString(bytes);
672                    }
673                    log.error("Unknown work state: " + msg + ", work: " + workId);
674                    return null;
675                }
676            }
677        });
678    }
679
680    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
681        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
682
683            @Override
684            public List<String> call(Jedis jedis) {
685                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
686                List<String> list = new ArrayList<String>(keys.size());
687                for (byte[] workIdBytes : keys) {
688                    list.add(string(workIdBytes));
689                }
690                return list;
691            }
692
693        });
694    }
695
696    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
697        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
698
699            @Override
700            public List<String> call(Jedis jedis) {
701
702                Set<byte[]> keys = jedis.smembers(queueBytes);
703                List<String> list = new ArrayList<String>(keys.size());
704                for (byte[] workIdBytes : keys) {
705                    list.add(string(workIdBytes));
706                }
707                return list;
708            }
709
710        });
711
712    }
713
714    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
715        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
716            @Override
717            public List<Work> call(Jedis jedis) {
718                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
719                List<Work> list = new ArrayList<Work>(keys.size());
720                for (byte[] workIdBytes : keys) {
721                    // get data
722                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
723                    Work work = deserializeWork(workBytes);
724                    list.add(work);
725                }
726                return list;
727            }
728        });
729    }
730
731    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
732        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
733            @Override
734            public List<Work> call(Jedis jedis) {
735                Set<byte[]> keys = jedis.smembers(queueBytes);
736                List<Work> list = new ArrayList<Work>(keys.size());
737                for (byte[] workIdBytes : keys) {
738                    // get data
739                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
740                    Work work = deserializeWork(workBytes);
741                    list.add(work);
742                }
743                return list;
744            }
745        });
746    }
747
748    protected Work getWork(byte[] workIdBytes) {
749        try {
750            return getWorkData(workIdBytes);
751        } catch (IOException e) {
752            throw new RuntimeException(e);
753        }
754    }
755
756    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
757        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() {
758
759            @Override
760            public Work call(Jedis jedis) {
761                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
762                return deserializeWork(workBytes);
763            }
764
765        });
766    }
767
768    /**
769     * Removes first work from work queue.
770     *
771     * @param queueId the queue id
772     * @return the work, or {@code null} if the scheduled queue is empty
773     */
774    protected Work getWorkFromQueue(final String queueId) throws IOException {
775        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() {
776
777            @Override
778            public Work call(Jedis jedis) {
779                // pop from queue
780                byte[] workIdBytes = jedis.rpop(queuedKey(queueId));
781                if (workIdBytes == null) {
782                    return null;
783                }
784                // get data
785                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
786                return deserializeWork(workBytes);
787            }
788
789        });
790    }
791
792    /**
793     * Removes a given work from queue, move the work from scheduled to completed set.
794     *
795     * @throws IOException
796     */
797    protected void removeScheduledWork(final String queueId, final String workId) throws IOException {
798        evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId));
799    }
800
801    /**
802     * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does
803     * not support SSCAN.
804     *
805     * @since 7.3
806     */
807    public static class SScanner {
808
809        // results of SMEMBERS for last key, in embedded mode
810        protected List<String> smembers;
811
812        protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) {
813            ScanResult<String> scanResult;
814            try {
815                scanResult = jedis.sscan(key, cursor, params);
816            } catch (Exception e) {
817                // when testing with embedded fake redis, we may get an un-declared exception
818                if (!(e.getCause() instanceof NoSuchMethodException)) {
819                    throw e;
820                }
821                // no SSCAN available in embedded, do a full SMEMBERS
822                if (smembers == null) {
823                    Set<String> set = jedis.smembers(key);
824                    smembers = new ArrayList<>(set);
825                }
826                Collection<byte[]> bparams = params.getParams();
827                int count = 1000;
828                for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) {
829                    byte[] param = it.next();
830                    if (param.equals(Protocol.Keyword.MATCH.raw)) {
831                        throw new UnsupportedOperationException("MATCH not supported");
832                    }
833                    if (param.equals(Protocol.Keyword.COUNT.raw)) {
834                        count = Integer.parseInt(SafeEncoder.encode(it.next()));
835                    }
836                }
837                int pos = Integer.parseInt(cursor); // don't check range, callers are cool
838                int end = Math.min(pos + count, smembers.size());
839                int nextPos = end == smembers.size() ? 0 : end;
840                scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end));
841                if (nextPos == 0) {
842                    smembers = null;
843                }
844            }
845            return scanResult;
846        }
847    }
848
849    Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException {
850        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Number[]>() {
851
852            @Override
853            public Number[] call(Jedis jedis) {
854                return coerce((List<Number>) jedis.evalsha(sha, keys, args));
855            }
856
857            Number[] coerce(List<Number> numbers) {
858                Number[] counter = numbers.toArray(new Number[numbers.size()]);
859                for (int i = 0; i < counter.length; ++i) {
860                    if (counter[i] == null) {
861                        counter[i] = 0;
862                    }
863                }
864                return counter;
865            }
866        });
867    }
868
869    @Override
870    public void listen(Listener listener) {
871        this.listener =listener;
872
873    }
874
875}