001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.redis.RedisAdmin;
041import org.nuxeo.ecm.core.redis.RedisCallable;
042import org.nuxeo.ecm.core.redis.RedisExecutor;
043import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
044import org.nuxeo.ecm.core.work.WorkHolder;
045import org.nuxeo.ecm.core.work.WorkQueuing;
046import org.nuxeo.ecm.core.work.api.Work;
047import org.nuxeo.ecm.core.work.api.Work.State;
048import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
049import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
050import org.nuxeo.runtime.api.Framework;
051
052import redis.clients.jedis.Jedis;
053import redis.clients.jedis.exceptions.JedisException;
054
055/**
056 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
057 *
058 * @since 5.8
059 */
060public class RedisWorkQueuing implements WorkQueuing {
061
062    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
063
064    protected static final String UTF_8 = "UTF-8";
065
066    /**
067     * Global hash of Work instance id -> serialoized Work instance.
068     */
069    protected static final String KEY_DATA = "data";
070
071    /**
072     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
073     * a completion time in milliseconds.
074     */
075    protected static final String KEY_STATE = "state";
076
077    /**
078     * Per-queue list of suspended Work instance ids.
079     */
080    protected static final String KEY_SUSPENDED_PREFIX = "prev";
081
082    protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes();
083
084    /**
085     * Per-queue list of scheduled Work instance ids.
086     */
087    protected static final String KEY_QUEUE_PREFIX = "queue";
088
089    protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes();
090
091    /**
092     * Per-queue set of scheduled Work instance ids.
093     */
094    protected static final String KEY_SCHEDULED_PREFIX = "sched";
095
096    protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes();
097
098    /**
099     * Per-queue set of running Work instance ids.
100     */
101    protected static final String KEY_RUNNING_PREFIX = "run";
102
103    protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes();
104
105    /**
106     * Per-queue set of counters.
107     */
108    protected static final String KEY_COMPLETED_PREFIX = "done";
109
110    protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes();
111
112    protected static final String KEY_CANCELED_PREFIX = "cancel";
113
114    protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes();
115
116    protected static final String KEY_COUNT_PREFIX = "count";
117
118    protected static final byte STATE_SCHEDULED_B = 'Q';
119
120    protected static final byte STATE_RUNNING_B = 'R';
121
122    protected static final byte STATE_RUNNING_C = 'C';
123
124    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
125
126    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
127
128    protected static final byte[] STATE_UNKNOWN = new byte[0];
129
130    protected Listener listener;
131
132    protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>();
133
134    protected String redisNamespace;
135
136    // lua scripts
137    protected byte[] initWorkQueueSha;
138
139    protected byte[] metricsWorkQueueSha;
140
141    protected byte[] schedulingWorkSha;
142
143    protected byte[] popWorkSha;
144
145    protected byte[] runningWorkSha;
146
147    protected byte[] cancelledScheduledWorkSha;
148
149    protected byte[] completedWorkSha;
150
151    protected byte[] cancelledRunningWorkSha;
152
153    public RedisWorkQueuing(Listener listener) {
154        this.listener = listener;
155        loadConfig();
156    }
157
158    void loadConfig() {
159        RedisAdmin admin = Framework.getService(RedisAdmin.class);
160        redisNamespace = admin.namespace("work");
161        try {
162            initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes();
163            metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes();
164            schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes();
165            popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes();
166            runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes();
167            cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes();
168            completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes();
169            cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes();
170        } catch (IOException e) {
171            throw new RuntimeException("Cannot load LUA scripts", e);
172        }
173    }
174
175    @Override
176    public NuxeoBlockingQueue init(WorkQueueDescriptor config) {
177        evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList());
178        RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this);
179        allQueued.put(config.id, queue);
180        return queue;
181    }
182
183    @Override
184    public NuxeoBlockingQueue getQueue(String queueId) {
185        return allQueued.get(queueId);
186    }
187
188    @Override
189    public void workSchedule(String queueId, Work work) {
190        getQueue(queueId).offer(new WorkHolder(work));
191    }
192
193    @Override
194    public void workRunning(String queueId, Work work) {
195        try {
196            workSetRunning(queueId, work);
197        } catch (IOException e) {
198            throw new RuntimeException(e);
199        }
200    }
201
202    @Override
203    public void workCanceled(String queueId, Work work) {
204        try {
205            workSetCancelledScheduled(queueId, work);
206        } catch (IOException e) {
207            throw new RuntimeException(e);
208        }
209    }
210
211    @Override
212    public void workCompleted(String queueId, Work work) {
213        try {
214            workSetCompleted(queueId, work);
215        } catch (IOException e) {
216            throw new RuntimeException(e);
217        }
218    }
219
220    @Override
221    public void workReschedule(String queueId, Work work) {
222        try {
223            workSetReschedule(queueId, work);
224        } catch (IOException e) {
225            throw new RuntimeException(e);
226        }
227    }
228
229    @Override
230    public List<Work> listWork(String queueId, State state) {
231        switch (state) {
232        case SCHEDULED:
233            return listScheduled(queueId);
234        case RUNNING:
235            return listRunning(queueId);
236        default:
237            throw new IllegalArgumentException(String.valueOf(state));
238        }
239    }
240
241    @Override
242    public List<String> listWorkIds(String queueId, State state) {
243        if (state == null) {
244            return listNonCompletedIds(queueId);
245        }
246        switch (state) {
247        case SCHEDULED:
248            return listScheduledIds(queueId);
249        case RUNNING:
250            return listRunningIds(queueId);
251        default:
252            throw new IllegalArgumentException(String.valueOf(state));
253        }
254    }
255
256    protected List<Work> listScheduled(String queueId) {
257        try {
258            return listWorkList(queuedKey(queueId));
259        } catch (IOException e) {
260            throw new RuntimeException(e);
261        }
262    }
263
264    protected List<Work> listRunning(String queueId) {
265        try {
266            return listWorkSet(runningKey(queueId));
267        } catch (IOException e) {
268            throw new RuntimeException(e);
269        }
270    }
271
272    protected List<String> listScheduledIds(String queueId) {
273        try {
274            return listWorkIdsList(queuedKey(queueId));
275        } catch (IOException e) {
276            throw new RuntimeException(e);
277        }
278    }
279
280    protected List<String> listRunningIds(String queueId) {
281        try {
282            return listWorkIdsSet(runningKey(queueId));
283        } catch (IOException e) {
284            throw new RuntimeException(e);
285        }
286    }
287
288    protected List<String> listNonCompletedIds(String queueId) {
289        List<String> list = listScheduledIds(queueId);
290        list.addAll(listRunningIds(queueId));
291        return list;
292    }
293
294    @Override
295    public long count(String queueId, State state) {
296        switch (state) {
297        case SCHEDULED:
298            return metrics(queueId).scheduled.longValue();
299        case RUNNING:
300            return metrics(queueId).running.longValue();
301        default:
302            throw new IllegalArgumentException(String.valueOf(state));
303        }
304    }
305
306    @Override
307    public Work find(String workId, State state) {
308        if (isWorkInState(workId, state)) {
309            return getWork(bytes(workId));
310        }
311        return null;
312    }
313
314    @Override
315    public boolean isWorkInState(String workId, State state) {
316        State s = getWorkState(workId);
317        if (state == null) {
318            return s == State.SCHEDULED || s == State.RUNNING;
319        }
320        return s == state;
321    }
322
323    @Override
324    public void removeScheduled(String queueId, String workId) {
325        try {
326            removeScheduledWork(queueId, workId);
327        } catch (IOException cause) {
328            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
329        }
330    }
331
332    @Override
333    public State getWorkState(String workId) {
334        return getWorkStateInfo(workId);
335    }
336
337    @Override
338    public void setActive(String queueId, boolean value) {
339        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
340        if (value) {
341            listener.queueActivated(metrics);
342        } else {
343            listener.queueDeactivated(metrics);
344        }
345    }
346
347    /*
348     * ******************** Redis Interface ********************
349     */
350
351    protected String string(byte[] bytes) {
352        try {
353            return new String(bytes, UTF_8);
354        } catch (IOException e) {
355            throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e);
356        }
357    }
358
359    protected byte[] bytes(String string) {
360        try {
361            return string.getBytes(UTF_8);
362        } catch (IOException e) {
363            throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e);
364        }
365    }
366
367    protected byte[] bytes(Work.State state) {
368        switch (state) {
369        case SCHEDULED:
370            return STATE_SCHEDULED;
371        case RUNNING:
372            return STATE_RUNNING;
373        default:
374            return STATE_UNKNOWN;
375        }
376    }
377
378    protected static String key(String... names) {
379        return String.join(":", names);
380    }
381
382    protected byte[] keyBytes(String prefix, String queueId) {
383        return keyBytes(key(prefix, queueId));
384    }
385
386    protected byte[] keyBytes(String prefix) {
387        return bytes(redisNamespace.concat(prefix));
388    }
389
390    protected byte[] workId(Work work) {
391        return workId(work.getId());
392    }
393
394    protected byte[] workId(String id) {
395        return bytes(id);
396    }
397
398    protected byte[] suspendedKey(String queueId) {
399        return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId));
400    }
401
402    protected byte[] queuedKey(String queueId) {
403        return keyBytes(key(KEY_QUEUE_PREFIX, queueId));
404    }
405
406    protected byte[] countKey(String queueId) {
407        return keyBytes(key(KEY_COUNT_PREFIX, queueId));
408    }
409
410    protected byte[] runningKey(String queueId) {
411        return keyBytes(key(KEY_RUNNING_PREFIX, queueId));
412    }
413
414    protected byte[] scheduledKey(String queueId) {
415        return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId));
416    }
417
418    protected byte[] completedKey(String queueId) {
419        return keyBytes(key(KEY_COMPLETED_PREFIX, queueId));
420    }
421
422    protected byte[] canceledKey(String queueId) {
423        return keyBytes(key(KEY_CANCELED_PREFIX, queueId));
424    }
425
426    protected byte[] stateKey() {
427        return keyBytes(KEY_STATE);
428    }
429
430    protected byte[] dataKey() {
431        return keyBytes(KEY_DATA);
432    }
433
434    protected byte[] serializeWork(Work work) throws IOException {
435        ByteArrayOutputStream baout = new ByteArrayOutputStream();
436        ObjectOutputStream out = new ObjectOutputStream(baout);
437        out.writeObject(work);
438        out.flush();
439        out.close();
440        return baout.toByteArray();
441    }
442
443    protected Work deserializeWork(byte[] workBytes) {
444        if (workBytes == null) {
445            return null;
446        }
447        InputStream bain = new ByteArrayInputStream(workBytes);
448        try (ObjectInputStream in = new ObjectInputStream(bain)) {
449            return (Work) in.readObject();
450        } catch (RuntimeException cause) {
451            throw cause;
452        } catch (IOException | ClassNotFoundException cause) {
453            throw new RuntimeException("Cannot deserialize work", cause);
454        }
455    }
456
457    /**
458     * Finds which queues have suspended work.
459     *
460     * @return a set of queue ids
461     * @since 5.8
462     */
463    protected Set<String> getSuspendedQueueIds() throws IOException {
464        return getQueueIds(KEY_SUSPENDED_PREFIX);
465    }
466
467    protected Set<String> getScheduledQueueIds() {
468        return getQueueIds(KEY_QUEUE_PREFIX);
469    }
470
471    protected Set<String> getRunningQueueIds() {
472        return getQueueIds(KEY_RUNNING_PREFIX);
473    }
474
475    /**
476     * Finds which queues have work for a given state prefix.
477     *
478     * @return a set of queue ids
479     * @since 5.8
480     */
481    protected Set<String> getQueueIds(final String queuePrefix) {
482        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() {
483            @Override
484            public Set<String> call(Jedis jedis) {
485                int offset = keyBytes(queuePrefix).length;
486                Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*")));
487                Set<String> queueIds = new HashSet<String>(keys.size());
488                for (byte[] bytes : keys) {
489                    try {
490                        String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
491                        queueIds.add(queueId);
492                    } catch (IOException e) {
493                        throw new NuxeoException(e);
494                    }
495                }
496                return queueIds;
497            }
498
499        });
500    }
501
502    /**
503     * Resumes all suspended work instances by moving them to the scheduled queue.
504     *
505     * @param queueId the queue id
506     * @return the number of work instances scheduled
507     * @since 5.8
508     */
509    public int scheduleSuspendedWork(final String queueId) throws IOException {
510        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
511            @Override
512            public Integer call(Jedis jedis) {
513                for (int n = 0;; n++) {
514                    byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
515                    if (workIdBytes == null) {
516                        return Integer.valueOf(n);
517                    }
518                }
519            }
520
521        }).intValue();
522    }
523
524    /**
525     * Suspends all scheduled work instances by moving them to the suspended queue.
526     *
527     * @param queueId the queue id
528     * @return the number of work instances suspended
529     * @since 5.8
530     */
531    public int suspendScheduledWork(final String queueId) throws IOException {
532        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() {
533
534            @Override
535            public Integer call(Jedis jedis) {
536                for (int n = 0;; n++) {
537                    byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
538                    if (workIdBytes == null) {
539                        return n;
540                    }
541                }
542            }
543        }).intValue();
544    }
545
546    @Override
547    public WorkQueueMetrics metrics(String queueId) {
548        return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList()));
549    }
550
551    WorkQueueMetrics metrics(String queueId, Number[] counters) {
552        return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]);
553    }
554
555    /**
556     * Persists a work instance and adds it to the scheduled queue.
557     *
558     * @param queueId the queue id
559     * @param work the work instance
560     * @throws IOException
561     */
562    public void workSetScheduled(final String queueId, Work work) throws IOException {
563        listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true))));
564    }
565
566    /**
567     * Switches a work to state completed, and saves its new state.
568     */
569    protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException {
570        listener.queueChanged(work,
571                metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true))));
572    }
573
574    /**
575     * Switches a work to state running.
576     *
577     * @param queueId the queue id
578     * @param work the work
579     */
580    protected void workSetRunning(final String queueId, Work work) throws IOException {
581        listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true))));
582    }
583
584    /**
585     * Switches a work to state completed, and saves its new state.
586     */
587    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
588        listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false))));
589    }
590
591    /**
592     * Switches a work to state canceled, and saves its new state.
593     */
594    protected void workSetReschedule(final String queueId, final Work work) throws IOException {
595        listener.queueChanged(work,
596                metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true))));
597    }
598
599    protected List<byte[]> keys(String queueid) {
600        return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid),
601                runningKey(queueid), completedKey(queueid), canceledKey(queueid));
602    }
603
604    protected List<byte[]> args(String workId) throws IOException {
605        return Arrays.asList(workId(workId));
606    }
607
608    protected List<byte[]> args(Work work, boolean serialize) throws IOException {
609        List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState()));
610        if (serialize) {
611            args = new ArrayList<>(args);
612            args.add(serializeWork(work));
613        }
614        return args;
615    }
616
617    /**
618     * Gets the work state.
619     *
620     * @param workId the work id
621     * @return the state, or {@code null} if not found
622     */
623    protected State getWorkStateInfo(final String workId) {
624        final byte[] workIdBytes = bytes(workId);
625        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() {
626            @Override
627            public State call(Jedis jedis) {
628                // get state
629                byte[] bytes = jedis.hget(stateKey(), workIdBytes);
630                if (bytes == null || bytes.length == 0) {
631                    return null;
632                }
633                switch (bytes[0]) {
634                case STATE_SCHEDULED_B:
635                    return State.SCHEDULED;
636                case STATE_RUNNING_B:
637                    return State.RUNNING;
638                default:
639                    String msg;
640                    try {
641                        msg = new String(bytes, UTF_8);
642                    } catch (UnsupportedEncodingException e) {
643                        msg = Arrays.toString(bytes);
644                    }
645                    log.error("Unknown work state: " + msg + ", work: " + workId);
646                    return null;
647                }
648            }
649        });
650    }
651
652    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
653        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
654
655            @Override
656            public List<String> call(Jedis jedis) {
657                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
658                List<String> list = new ArrayList<String>(keys.size());
659                for (byte[] workIdBytes : keys) {
660                    list.add(string(workIdBytes));
661                }
662                return list;
663            }
664
665        });
666    }
667
668    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
669        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() {
670
671            @Override
672            public List<String> call(Jedis jedis) {
673
674                Set<byte[]> keys = jedis.smembers(queueBytes);
675                List<String> list = new ArrayList<String>(keys.size());
676                for (byte[] workIdBytes : keys) {
677                    list.add(string(workIdBytes));
678                }
679                return list;
680            }
681
682        });
683
684    }
685
686    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
687        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
688            @Override
689            public List<Work> call(Jedis jedis) {
690                List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
691                List<Work> list = new ArrayList<Work>(keys.size());
692                for (byte[] workIdBytes : keys) {
693                    // get data
694                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
695                    Work work = deserializeWork(workBytes);
696                    list.add(work);
697                }
698                return list;
699            }
700        });
701    }
702
703    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
704        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() {
705            @Override
706            public List<Work> call(Jedis jedis) {
707                Set<byte[]> keys = jedis.smembers(queueBytes);
708                List<Work> list = new ArrayList<Work>(keys.size());
709                for (byte[] workIdBytes : keys) {
710                    // get data
711                    byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
712                    Work work = deserializeWork(workBytes);
713                    list.add(work);
714                }
715                return list;
716            }
717        });
718    }
719
720    protected Work getWork(byte[] workIdBytes) {
721        try {
722            return getWorkData(workIdBytes);
723        } catch (IOException e) {
724            throw new RuntimeException(e);
725        }
726    }
727
728    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
729        return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() {
730
731            @Override
732            public Work call(Jedis jedis) {
733                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
734                return deserializeWork(workBytes);
735            }
736
737        });
738    }
739
740    /**
741     * Removes first work from work queue.
742     *
743     * @param queueId the queue id
744     * @return the work, or {@code null} if the scheduled queue is empty
745     */
746    protected Work getWorkFromQueue(final String queueId) throws IOException {
747        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
748        List<byte[]> keys = keys(queueId);
749        List<byte[]> args = Collections.singletonList(STATE_RUNNING);
750        List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args);
751        if (result == null) {
752            return null;
753        }
754
755        List<Number> numbers = (List<Number>) result.get(0);
756        WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers));
757        Object bytes = result.get(1);
758        if (bytes instanceof String) {
759            bytes = bytes((String) bytes);
760        }
761        Work work = deserializeWork((byte[]) bytes);
762
763        listener.queueChanged(work, metrics);
764
765        return work;
766    }
767
768    /**
769     * Removes a given work from queue, move the work from scheduled to completed set.
770     *
771     * @throws IOException
772     */
773    protected void removeScheduledWork(final String queueId, final String workId) throws IOException {
774        evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId));
775    }
776
777    Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException {
778        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
779        List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args);
780        return coerceNullToZero(numbers);
781    }
782
783    protected static Number[] coerceNullToZero(List<Number> numbers) {
784        return coerceNullToZero(numbers.toArray(new Number[numbers.size()]));
785    }
786
787    protected static Number[] coerceNullToZero(Number[] counters) {
788        for (int i = 0; i < counters.length; ++i) {
789            if (counters[i] == null) {
790                counters[i] = 0;
791            }
792        }
793        return counters;
794    }
795
796    @Override
797    public void listen(Listener listener) {
798        this.listener = listener;
799
800    }
801
802}