001/*
002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.ObjectInputStream;
026import java.io.ObjectOutputStream;
027import java.io.UnsupportedEncodingException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.nuxeo.ecm.core.api.NuxeoException;
040import org.nuxeo.ecm.core.redis.RedisAdmin;
041import org.nuxeo.ecm.core.redis.RedisExecutor;
042import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
043import org.nuxeo.ecm.core.work.WorkHolder;
044import org.nuxeo.ecm.core.work.WorkQueuing;
045import org.nuxeo.ecm.core.work.api.Work;
046import org.nuxeo.ecm.core.work.api.Work.State;
047import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
048import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
049import org.nuxeo.runtime.api.Framework;
050
051import redis.clients.jedis.exceptions.JedisException;
052
053/**
054 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
055 *
056 * @since 5.8
057 */
058public class RedisWorkQueuing implements WorkQueuing {
059
060    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
061
062    protected static final String UTF_8 = "UTF-8";
063
064    /**
065     * Global hash of Work instance id -> serialoized Work instance.
066     */
067    protected static final String KEY_DATA = "data";
068
069    /**
070     * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by
071     * a completion time in milliseconds.
072     */
073    protected static final String KEY_STATE = "state";
074
075    /**
076     * Per-queue list of suspended Work instance ids.
077     */
078    protected static final String KEY_SUSPENDED_PREFIX = "prev";
079
080    protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes();
081
082    /**
083     * Per-queue list of scheduled Work instance ids.
084     */
085    protected static final String KEY_QUEUE_PREFIX = "queue";
086
087    protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes();
088
089    /**
090     * Per-queue set of scheduled Work instance ids.
091     */
092    protected static final String KEY_SCHEDULED_PREFIX = "sched";
093
094    protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes();
095
096    /**
097     * Per-queue set of running Work instance ids.
098     */
099    protected static final String KEY_RUNNING_PREFIX = "run";
100
101    protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes();
102
103    /**
104     * Per-queue set of counters.
105     */
106    protected static final String KEY_COMPLETED_PREFIX = "done";
107
108    protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes();
109
110    protected static final String KEY_CANCELED_PREFIX = "cancel";
111
112    protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes();
113
114    protected static final String KEY_COUNT_PREFIX = "count";
115
116    protected static final byte STATE_SCHEDULED_B = 'Q';
117
118    protected static final byte STATE_RUNNING_B = 'R';
119
120    protected static final byte STATE_RUNNING_C = 'C';
121
122    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
123
124    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
125
126    protected static final byte[] STATE_UNKNOWN = new byte[0];
127
128    protected Listener listener;
129
130    protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>();
131
132    protected String redisNamespace;
133
134    // lua scripts
135    protected byte[] initWorkQueueSha;
136
137    protected byte[] metricsWorkQueueSha;
138
139    protected byte[] schedulingWorkSha;
140
141    protected byte[] popWorkSha;
142
143    protected byte[] runningWorkSha;
144
145    protected byte[] cancelledScheduledWorkSha;
146
147    protected byte[] completedWorkSha;
148
149    protected byte[] cancelledRunningWorkSha;
150
151    public RedisWorkQueuing(Listener listener) {
152        this.listener = listener;
153        loadConfig();
154    }
155
156    void loadConfig() {
157        RedisAdmin admin = Framework.getService(RedisAdmin.class);
158        redisNamespace = admin.namespace("work");
159        try {
160            initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes();
161            metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes();
162            schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes();
163            popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes();
164            runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes();
165            cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes();
166            completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes();
167            cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes();
168        } catch (IOException e) {
169            throw new RuntimeException("Cannot load LUA scripts", e);
170        }
171    }
172
173    @Override
174    public NuxeoBlockingQueue init(WorkQueueDescriptor config) {
175        evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList());
176        RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this);
177        allQueued.put(config.id, queue);
178        return queue;
179    }
180
181    @Override
182    public NuxeoBlockingQueue getQueue(String queueId) {
183        return allQueued.get(queueId);
184    }
185
186    @Override
187    public void workSchedule(String queueId, Work work) {
188        getQueue(queueId).offer(new WorkHolder(work));
189    }
190
191    @Override
192    public void workRunning(String queueId, Work work) {
193        try {
194            workSetRunning(queueId, work);
195        } catch (IOException e) {
196            throw new RuntimeException(e);
197        }
198    }
199
200    @Override
201    public void workCanceled(String queueId, Work work) {
202        try {
203            workSetCancelledScheduled(queueId, work);
204        } catch (IOException e) {
205            throw new RuntimeException(e);
206        }
207    }
208
209    @Override
210    public void workCompleted(String queueId, Work work) {
211        try {
212            workSetCompleted(queueId, work);
213        } catch (IOException e) {
214            throw new RuntimeException(e);
215        }
216    }
217
218    @Override
219    public void workReschedule(String queueId, Work work) {
220        try {
221            workSetReschedule(queueId, work);
222        } catch (IOException e) {
223            throw new RuntimeException(e);
224        }
225    }
226
227    @Override
228    public List<Work> listWork(String queueId, State state) {
229        switch (state) {
230        case SCHEDULED:
231            return listScheduled(queueId);
232        case RUNNING:
233            return listRunning(queueId);
234        default:
235            throw new IllegalArgumentException(String.valueOf(state));
236        }
237    }
238
239    @Override
240    public List<String> listWorkIds(String queueId, State state) {
241        if (state == null) {
242            return listNonCompletedIds(queueId);
243        }
244        switch (state) {
245        case SCHEDULED:
246            return listScheduledIds(queueId);
247        case RUNNING:
248            return listRunningIds(queueId);
249        default:
250            throw new IllegalArgumentException(String.valueOf(state));
251        }
252    }
253
254    protected List<Work> listScheduled(String queueId) {
255        try {
256            return listWorkList(queuedKey(queueId));
257        } catch (IOException e) {
258            throw new RuntimeException(e);
259        }
260    }
261
262    protected List<Work> listRunning(String queueId) {
263        try {
264            return listWorkSet(runningKey(queueId));
265        } catch (IOException e) {
266            throw new RuntimeException(e);
267        }
268    }
269
270    protected List<String> listScheduledIds(String queueId) {
271        try {
272            return listWorkIdsList(queuedKey(queueId));
273        } catch (IOException e) {
274            throw new RuntimeException(e);
275        }
276    }
277
278    protected List<String> listRunningIds(String queueId) {
279        try {
280            return listWorkIdsSet(runningKey(queueId));
281        } catch (IOException e) {
282            throw new RuntimeException(e);
283        }
284    }
285
286    protected List<String> listNonCompletedIds(String queueId) {
287        List<String> list = listScheduledIds(queueId);
288        list.addAll(listRunningIds(queueId));
289        return list;
290    }
291
292    @Override
293    public long count(String queueId, State state) {
294        switch (state) {
295        case SCHEDULED:
296            return metrics(queueId).scheduled.longValue();
297        case RUNNING:
298            return metrics(queueId).running.longValue();
299        default:
300            throw new IllegalArgumentException(String.valueOf(state));
301        }
302    }
303
304    @Override
305    public Work find(String workId, State state) {
306        if (isWorkInState(workId, state)) {
307            return getWork(bytes(workId));
308        }
309        return null;
310    }
311
312    @Override
313    public boolean isWorkInState(String workId, State state) {
314        State s = getWorkState(workId);
315        if (state == null) {
316            return s == State.SCHEDULED || s == State.RUNNING;
317        }
318        return s == state;
319    }
320
321    @Override
322    public void removeScheduled(String queueId, String workId) {
323        try {
324            removeScheduledWork(queueId, workId);
325        } catch (IOException cause) {
326            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
327        }
328    }
329
330    @Override
331    public State getWorkState(String workId) {
332        return getWorkStateInfo(workId);
333    }
334
335    @Override
336    public void setActive(String queueId, boolean value) {
337        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
338        if (value) {
339            listener.queueActivated(metrics);
340        } else {
341            listener.queueDeactivated(metrics);
342        }
343    }
344
345    /*
346     * ******************** Redis Interface ********************
347     */
348
349    protected String string(byte[] bytes) {
350        try {
351            return new String(bytes, UTF_8);
352        } catch (IOException e) {
353            throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e);
354        }
355    }
356
357    protected byte[] bytes(String string) {
358        try {
359            return string.getBytes(UTF_8);
360        } catch (IOException e) {
361            throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e);
362        }
363    }
364
365    protected byte[] bytes(Work.State state) {
366        switch (state) {
367        case SCHEDULED:
368            return STATE_SCHEDULED;
369        case RUNNING:
370            return STATE_RUNNING;
371        default:
372            return STATE_UNKNOWN;
373        }
374    }
375
376    protected static String key(String... names) {
377        return String.join(":", names);
378    }
379
380    protected byte[] keyBytes(String prefix, String queueId) {
381        return keyBytes(key(prefix, queueId));
382    }
383
384    protected byte[] keyBytes(String prefix) {
385        return bytes(redisNamespace.concat(prefix));
386    }
387
388    protected byte[] workId(Work work) {
389        return workId(work.getId());
390    }
391
392    protected byte[] workId(String id) {
393        return bytes(id);
394    }
395
396    protected byte[] suspendedKey(String queueId) {
397        return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId));
398    }
399
400    protected byte[] queuedKey(String queueId) {
401        return keyBytes(key(KEY_QUEUE_PREFIX, queueId));
402    }
403
404    protected byte[] countKey(String queueId) {
405        return keyBytes(key(KEY_COUNT_PREFIX, queueId));
406    }
407
408    protected byte[] runningKey(String queueId) {
409        return keyBytes(key(KEY_RUNNING_PREFIX, queueId));
410    }
411
412    protected byte[] scheduledKey(String queueId) {
413        return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId));
414    }
415
416    protected byte[] completedKey(String queueId) {
417        return keyBytes(key(KEY_COMPLETED_PREFIX, queueId));
418    }
419
420    protected byte[] canceledKey(String queueId) {
421        return keyBytes(key(KEY_CANCELED_PREFIX, queueId));
422    }
423
424    protected byte[] stateKey() {
425        return keyBytes(KEY_STATE);
426    }
427
428    protected byte[] dataKey() {
429        return keyBytes(KEY_DATA);
430    }
431
432    protected byte[] serializeWork(Work work) throws IOException {
433        ByteArrayOutputStream baout = new ByteArrayOutputStream();
434        ObjectOutputStream out = new ObjectOutputStream(baout);
435        out.writeObject(work);
436        out.flush();
437        out.close();
438        return baout.toByteArray();
439    }
440
441    protected Work deserializeWork(byte[] workBytes) {
442        if (workBytes == null) {
443            return null;
444        }
445        InputStream bain = new ByteArrayInputStream(workBytes);
446        try (ObjectInputStream in = new ObjectInputStream(bain)) {
447            return (Work) in.readObject();
448        } catch (RuntimeException cause) {
449            throw cause;
450        } catch (IOException | ClassNotFoundException cause) {
451            throw new RuntimeException("Cannot deserialize work", cause);
452        }
453    }
454
455    /**
456     * Finds which queues have suspended work.
457     *
458     * @return a set of queue ids
459     * @since 5.8
460     */
461    protected Set<String> getSuspendedQueueIds() throws IOException {
462        return getQueueIds(KEY_SUSPENDED_PREFIX);
463    }
464
465    protected Set<String> getScheduledQueueIds() {
466        return getQueueIds(KEY_QUEUE_PREFIX);
467    }
468
469    protected Set<String> getRunningQueueIds() {
470        return getQueueIds(KEY_RUNNING_PREFIX);
471    }
472
473    /**
474     * Finds which queues have work for a given state prefix.
475     *
476     * @return a set of queue ids
477     * @since 5.8
478     */
479    protected Set<String> getQueueIds(final String queuePrefix) {
480        return Framework.getService(RedisExecutor.class).execute(jedis -> {
481            int offset = keyBytes(queuePrefix).length;
482            Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*")));
483            Set<String> queueIds = new HashSet<>(keys.size());
484            for (byte[] bytes : keys) {
485                try {
486                    String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
487                    queueIds.add(queueId);
488                } catch (IOException e) {
489                    throw new NuxeoException(e);
490                }
491            }
492            return queueIds;
493        });
494    }
495
496    /**
497     * Resumes all suspended work instances by moving them to the scheduled queue.
498     *
499     * @param queueId the queue id
500     * @return the number of work instances scheduled
501     * @since 5.8
502     */
503    public int scheduleSuspendedWork(final String queueId) throws IOException {
504        return Framework.getService(RedisExecutor.class).execute(jedis -> {
505            for (int n = 0;; n++) {
506                byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
507                if (workIdBytes == null) {
508                    return Integer.valueOf(n);
509                }
510            }
511        }).intValue();
512    }
513
514    /**
515     * Suspends all scheduled work instances by moving them to the suspended queue.
516     *
517     * @param queueId the queue id
518     * @return the number of work instances suspended
519     * @since 5.8
520     */
521    public int suspendScheduledWork(final String queueId) throws IOException {
522        return Framework.getService(RedisExecutor.class).execute(jedis -> {
523            for (int n = 0;; n++) {
524                byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
525                if (workIdBytes == null) {
526                    return n;
527                }
528            }
529        }).intValue();
530    }
531
532    @Override
533    public WorkQueueMetrics metrics(String queueId) {
534        return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList()));
535    }
536
537    WorkQueueMetrics metrics(String queueId, Number[] counters) {
538        return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]);
539    }
540
541    /**
542     * Persists a work instance and adds it to the scheduled queue.
543     *
544     * @param queueId the queue id
545     * @param work the work instance
546     * @throws IOException
547     */
548    public void workSetScheduled(final String queueId, Work work) throws IOException {
549        listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true))));
550    }
551
552    /**
553     * Switches a work to state completed, and saves its new state.
554     */
555    protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException {
556        listener.queueChanged(work,
557                metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true))));
558    }
559
560    /**
561     * Switches a work to state running.
562     *
563     * @param queueId the queue id
564     * @param work the work
565     */
566    protected void workSetRunning(final String queueId, Work work) throws IOException {
567        listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true))));
568    }
569
570    /**
571     * Switches a work to state completed, and saves its new state.
572     */
573    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
574        listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false))));
575    }
576
577    /**
578     * Switches a work to state canceled, and saves its new state.
579     */
580    protected void workSetReschedule(final String queueId, final Work work) throws IOException {
581        listener.queueChanged(work,
582                metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true))));
583    }
584
585    protected List<byte[]> keys(String queueid) {
586        return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid),
587                runningKey(queueid), completedKey(queueid), canceledKey(queueid));
588    }
589
590    protected List<byte[]> args(String workId) throws IOException {
591        return Arrays.asList(workId(workId));
592    }
593
594    protected List<byte[]> args(Work work, boolean serialize) throws IOException {
595        List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState()));
596        if (serialize) {
597            args = new ArrayList<>(args);
598            args.add(serializeWork(work));
599        }
600        return args;
601    }
602
603    /**
604     * Gets the work state.
605     *
606     * @param workId the work id
607     * @return the state, or {@code null} if not found
608     */
609    protected State getWorkStateInfo(final String workId) {
610        final byte[] workIdBytes = bytes(workId);
611        return Framework.getService(RedisExecutor.class).execute(jedis -> {
612            // get state
613            byte[] bytes = jedis.hget(stateKey(), workIdBytes);
614            if (bytes == null || bytes.length == 0) {
615                return null;
616            }
617            switch (bytes[0]) {
618            case STATE_SCHEDULED_B:
619                return State.SCHEDULED;
620            case STATE_RUNNING_B:
621                return State.RUNNING;
622            default:
623                String msg;
624                try {
625                    msg = new String(bytes, UTF_8);
626                } catch (UnsupportedEncodingException e) {
627                    msg = Arrays.toString(bytes);
628                }
629                log.error("Unknown work state: " + msg + ", work: " + workId);
630                return null;
631            }
632        });
633    }
634
635    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
636        return Framework.getService(RedisExecutor.class).execute(jedis -> {
637            List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
638            List<String> list = new ArrayList<>(keys.size());
639            for (byte[] workIdBytes : keys) {
640                list.add(string(workIdBytes));
641            }
642            return list;
643        });
644    }
645
646    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
647        return Framework.getService(RedisExecutor.class).execute(jedis -> {
648
649            Set<byte[]> keys = jedis.smembers(queueBytes);
650            List<String> list = new ArrayList<>(keys.size());
651            for (byte[] workIdBytes : keys) {
652                list.add(string(workIdBytes));
653            }
654            return list;
655        });
656
657    }
658
659    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
660        return Framework.getService(RedisExecutor.class).execute(jedis -> {
661            List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
662            List<Work> list = new ArrayList<>(keys.size());
663            for (byte[] workIdBytes : keys) {
664                // get data
665                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
666                Work work = deserializeWork(workBytes);
667                list.add(work);
668            }
669            return list;
670        });
671    }
672
673    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
674        return Framework.getService(RedisExecutor.class).execute(jedis -> {
675            Set<byte[]> keys = jedis.smembers(queueBytes);
676            List<Work> list = new ArrayList<>(keys.size());
677            for (byte[] workIdBytes : keys) {
678                // get data
679                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
680                Work work = deserializeWork(workBytes);
681                list.add(work);
682            }
683            return list;
684        });
685    }
686
687    protected Work getWork(byte[] workIdBytes) {
688        try {
689            return getWorkData(workIdBytes);
690        } catch (IOException e) {
691            throw new RuntimeException(e);
692        }
693    }
694
695    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
696        return Framework.getService(RedisExecutor.class).execute(jedis -> {
697            byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
698            return deserializeWork(workBytes);
699        });
700    }
701
702    /**
703     * Removes first work from work queue.
704     *
705     * @param queueId the queue id
706     * @return the work, or {@code null} if the scheduled queue is empty
707     */
708    protected Work getWorkFromQueue(final String queueId) throws IOException {
709        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
710        List<byte[]> keys = keys(queueId);
711        List<byte[]> args = Collections.singletonList(STATE_RUNNING);
712        List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args);
713        if (result == null) {
714            return null;
715        }
716
717        List<Number> numbers = (List<Number>) result.get(0);
718        WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers));
719        Object bytes = result.get(1);
720        if (bytes instanceof String) {
721            bytes = bytes((String) bytes);
722        }
723        Work work = deserializeWork((byte[]) bytes);
724
725        listener.queueChanged(work, metrics);
726
727        return work;
728    }
729
730    /**
731     * Removes a given work from queue, move the work from scheduled to completed set.
732     *
733     * @throws IOException
734     */
735    protected void removeScheduledWork(final String queueId, final String workId) throws IOException {
736        evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId));
737    }
738
739    Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException {
740        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
741        List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args);
742        return coerceNullToZero(numbers);
743    }
744
745    protected static Number[] coerceNullToZero(List<Number> numbers) {
746        return coerceNullToZero(numbers.toArray(new Number[numbers.size()]));
747    }
748
749    protected static Number[] coerceNullToZero(Number[] counters) {
750        for (int i = 0; i < counters.length; ++i) {
751            if (counters[i] == null) {
752                counters[i] = 0;
753            }
754        }
755        return counters;
756    }
757
758    @Override
759    public void listen(Listener listener) {
760        this.listener = listener;
761
762    }
763
764    @Override
765    public boolean supportsProcessingDisabling() {
766        return true;
767    }
768
769}