001/*
002 * (C) Copyright 2013-2019 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.redis.contribs;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.ObjectInputStream;
028import java.io.ObjectOutputStream;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.nuxeo.ecm.core.redis.RedisAdmin;
041import org.nuxeo.ecm.core.redis.RedisExecutor;
042import org.nuxeo.ecm.core.work.NuxeoBlockingQueue;
043import org.nuxeo.ecm.core.work.WorkHolder;
044import org.nuxeo.ecm.core.work.WorkQueuing;
045import org.nuxeo.ecm.core.work.api.Work;
046import org.nuxeo.ecm.core.work.api.Work.State;
047import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
048import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
049import org.nuxeo.runtime.api.Framework;
050
051import redis.clients.jedis.exceptions.JedisException;
052
053/**
054 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis.
055 *
056 * @since 5.8
057 */
058public class RedisWorkQueuing implements WorkQueuing {
059
060    private static final Log log = LogFactory.getLog(RedisWorkQueuing.class);
061
062    /**
063     * Global hash of Work instance id -> serialized Work instance.
064     */
065    protected static final String KEY_DATA = "data";
066
067    /**
068     * Global hash of Work instance id -> Work state. The completed state is followed by a completion time in
069     * milliseconds.
070     */
071    protected static final String KEY_STATE = "state";
072
073    /**
074     * Per-queue list of suspended Work instance ids.
075     */
076    protected static final String KEY_SUSPENDED_PREFIX = "prev";
077
078    protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes();
079
080    /**
081     * Per-queue list of scheduled Work instance ids.
082     */
083    protected static final String KEY_QUEUE_PREFIX = "queue";
084
085    protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes();
086
087    /**
088     * Per-queue set of scheduled Work instance ids.
089     */
090    protected static final String KEY_SCHEDULED_PREFIX = "sched";
091
092    protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes();
093
094    /**
095     * Per-queue set of running Work instance ids.
096     */
097    protected static final String KEY_RUNNING_PREFIX = "run";
098
099    protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes();
100
101    /**
102     * Per-queue set of counters.
103     */
104    protected static final String KEY_COMPLETED_PREFIX = "done";
105
106    protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes();
107
108    protected static final String KEY_CANCELED_PREFIX = "cancel";
109
110    protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes();
111
112    protected static final String KEY_COUNT_PREFIX = "count";
113
114    protected static final byte STATE_SCHEDULED_B = 'Q';
115
116    protected static final byte STATE_RUNNING_B = 'R';
117
118    protected static final byte STATE_RUNNING_C = 'C';
119
120    protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B };
121
122    protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B };
123
124    protected static final byte[] STATE_UNKNOWN = new byte[0];
125
126    protected Listener listener;
127
128    protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>();
129
130    protected String redisNamespace;
131
132    // lua scripts
133    protected byte[] initWorkQueueSha;
134
135    protected byte[] metricsWorkQueueSha;
136
137    protected byte[] schedulingWorkSha;
138
139    protected byte[] popWorkSha;
140
141    protected byte[] runningWorkSha;
142
143    protected byte[] cancelledScheduledWorkSha;
144
145    protected byte[] completedWorkSha;
146
147    protected byte[] cancelledRunningWorkSha;
148
149    public RedisWorkQueuing(Listener listener) {
150        this.listener = listener;
151        loadConfig();
152    }
153
154    void loadConfig() {
155        RedisAdmin admin = Framework.getService(RedisAdmin.class);
156        redisNamespace = admin.namespace("work");
157        try {
158            initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes();
159            metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes();
160            schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes();
161            popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes();
162            runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes();
163            cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes();
164            completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes();
165            cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes();
166        } catch (IOException e) {
167            throw new RuntimeException("Cannot load LUA scripts", e);
168        }
169    }
170
171    @Override
172    public NuxeoBlockingQueue init(WorkQueueDescriptor config) {
173        evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList());
174        RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this);
175        allQueued.put(config.id, queue);
176        return queue;
177    }
178
179    @Override
180    public NuxeoBlockingQueue getQueue(String queueId) {
181        return allQueued.get(queueId);
182    }
183
184    @Override
185    public void workSchedule(String queueId, Work work) {
186        getQueue(queueId).offer(new WorkHolder(work));
187    }
188
189    @Override
190    public void workRunning(String queueId, Work work) {
191        try {
192            workSetRunning(queueId, work);
193        } catch (IOException e) {
194            throw new RuntimeException(e);
195        }
196    }
197
198    @Override
199    public void workCanceled(String queueId, Work work) {
200        try {
201            workSetCancelledScheduled(queueId, work);
202        } catch (IOException e) {
203            throw new RuntimeException(e);
204        }
205    }
206
207    @Override
208    public void workCompleted(String queueId, Work work) {
209        try {
210            workSetCompleted(queueId, work);
211        } catch (IOException e) {
212            throw new RuntimeException(e);
213        }
214    }
215
216    @Override
217    public void workReschedule(String queueId, Work work) {
218        try {
219            workSetReschedule(queueId, work);
220        } catch (IOException e) {
221            throw new RuntimeException(e);
222        }
223    }
224
225    @Override
226    public List<Work> listWork(String queueId, State state) {
227        switch (state) {
228        case SCHEDULED:
229            return listScheduled(queueId);
230        case RUNNING:
231            return listRunning(queueId);
232        default:
233            throw new IllegalArgumentException(String.valueOf(state));
234        }
235    }
236
237    @Override
238    public List<String> listWorkIds(String queueId, State state) {
239        if (state == null) {
240            return listNonCompletedIds(queueId);
241        }
242        switch (state) {
243        case SCHEDULED:
244            return listScheduledIds(queueId);
245        case RUNNING:
246            return listRunningIds(queueId);
247        default:
248            throw new IllegalArgumentException(String.valueOf(state));
249        }
250    }
251
252    protected List<Work> listScheduled(String queueId) {
253        try {
254            return listWorkList(queuedKey(queueId));
255        } catch (IOException e) {
256            throw new RuntimeException(e);
257        }
258    }
259
260    protected List<Work> listRunning(String queueId) {
261        try {
262            return listWorkSet(runningKey(queueId));
263        } catch (IOException e) {
264            throw new RuntimeException(e);
265        }
266    }
267
268    protected List<String> listScheduledIds(String queueId) {
269        try {
270            return listWorkIdsList(queuedKey(queueId));
271        } catch (IOException e) {
272            throw new RuntimeException(e);
273        }
274    }
275
276    protected List<String> listRunningIds(String queueId) {
277        try {
278            return listWorkIdsSet(runningKey(queueId));
279        } catch (IOException e) {
280            throw new RuntimeException(e);
281        }
282    }
283
284    protected List<String> listNonCompletedIds(String queueId) {
285        List<String> list = listScheduledIds(queueId);
286        list.addAll(listRunningIds(queueId));
287        return list;
288    }
289
290    @Override
291    public long count(String queueId, State state) {
292        switch (state) {
293        case SCHEDULED:
294            return metrics(queueId).scheduled.longValue();
295        case RUNNING:
296            return metrics(queueId).running.longValue();
297        default:
298            throw new IllegalArgumentException(String.valueOf(state));
299        }
300    }
301
302    @Override
303    public Work find(String workId, State state) {
304        if (isWorkInState(workId, state)) {
305            return getWork(bytes(workId));
306        }
307        return null;
308    }
309
310    @Override
311    public boolean isWorkInState(String workId, State state) {
312        State s = getWorkState(workId);
313        if (state == null) {
314            return s == State.SCHEDULED || s == State.RUNNING;
315        }
316        return s == state;
317    }
318
319    @Override
320    public void removeScheduled(String queueId, String workId) {
321        try {
322            removeScheduledWork(queueId, workId);
323        } catch (IOException cause) {
324            throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause);
325        }
326    }
327
328    @Override
329    public State getWorkState(String workId) {
330        return getWorkStateInfo(workId);
331    }
332
333    @Override
334    public void setActive(String queueId, boolean value) {
335        WorkQueueMetrics metrics = getQueue(queueId).setActive(value);
336        if (value) {
337            listener.queueActivated(metrics);
338        } else {
339            listener.queueDeactivated(metrics);
340        }
341    }
342
343    /*
344     * ******************** Redis Interface ********************
345     */
346
347    protected String string(byte[] bytes) {
348        return new String(bytes, UTF_8);
349    }
350
351    protected byte[] bytes(String string) {
352        return string.getBytes(UTF_8);
353    }
354
355    protected byte[] bytes(Work.State state) {
356        switch (state) {
357        case SCHEDULED:
358            return STATE_SCHEDULED;
359        case RUNNING:
360            return STATE_RUNNING;
361        default:
362            return STATE_UNKNOWN;
363        }
364    }
365
366    protected static String key(String... names) {
367        return String.join(":", names);
368    }
369
370    protected byte[] keyBytes(String prefix, String queueId) {
371        return keyBytes(key(prefix, queueId));
372    }
373
374    protected byte[] keyBytes(String prefix) {
375        return bytes(redisNamespace.concat(prefix));
376    }
377
378    protected byte[] workId(Work work) {
379        return workId(work.getId());
380    }
381
382    protected byte[] workId(String id) {
383        return bytes(id);
384    }
385
386    protected byte[] suspendedKey(String queueId) {
387        return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId));
388    }
389
390    protected byte[] queuedKey(String queueId) {
391        return keyBytes(key(KEY_QUEUE_PREFIX, queueId));
392    }
393
394    protected byte[] countKey(String queueId) {
395        return keyBytes(key(KEY_COUNT_PREFIX, queueId));
396    }
397
398    protected byte[] runningKey(String queueId) {
399        return keyBytes(key(KEY_RUNNING_PREFIX, queueId));
400    }
401
402    protected byte[] scheduledKey(String queueId) {
403        return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId));
404    }
405
406    protected byte[] completedKey(String queueId) {
407        return keyBytes(key(KEY_COMPLETED_PREFIX, queueId));
408    }
409
410    protected byte[] canceledKey(String queueId) {
411        return keyBytes(key(KEY_CANCELED_PREFIX, queueId));
412    }
413
414    protected byte[] stateKey() {
415        return keyBytes(KEY_STATE);
416    }
417
418    protected byte[] dataKey() {
419        return keyBytes(KEY_DATA);
420    }
421
422    protected byte[] serializeWork(Work work) throws IOException {
423        ByteArrayOutputStream baout = new ByteArrayOutputStream();
424        ObjectOutputStream out = new ObjectOutputStream(baout);
425        out.writeObject(work);
426        out.flush();
427        out.close();
428        return baout.toByteArray();
429    }
430
431    protected Work deserializeWork(byte[] workBytes) {
432        if (workBytes == null) {
433            return null;
434        }
435        InputStream bain = new ByteArrayInputStream(workBytes);
436        try (ObjectInputStream in = new ObjectInputStream(bain)) {
437            return (Work) in.readObject();
438        } catch (RuntimeException cause) {
439            throw cause;
440        } catch (IOException | ClassNotFoundException cause) {
441            throw new RuntimeException("Cannot deserialize work", cause);
442        }
443    }
444
445    /**
446     * Finds which queues have suspended work.
447     *
448     * @return a set of queue ids
449     * @since 5.8
450     */
451    protected Set<String> getSuspendedQueueIds() throws IOException {
452        return getQueueIds(KEY_SUSPENDED_PREFIX);
453    }
454
455    protected Set<String> getScheduledQueueIds() {
456        return getQueueIds(KEY_QUEUE_PREFIX);
457    }
458
459    protected Set<String> getRunningQueueIds() {
460        return getQueueIds(KEY_RUNNING_PREFIX);
461    }
462
463    /**
464     * Finds which queues have work for a given state prefix.
465     *
466     * @return a set of queue ids
467     * @since 5.8
468     */
469    protected Set<String> getQueueIds(final String queuePrefix) {
470        return Framework.getService(RedisExecutor.class).execute(jedis -> {
471            int offset = keyBytes(queuePrefix).length;
472            Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*")));
473            Set<String> queueIds = new HashSet<>(keys.size());
474            for (byte[] bytes : keys) {
475                String queueId = new String(bytes, offset, bytes.length - offset, UTF_8);
476                queueIds.add(queueId);
477            }
478            return queueIds;
479        });
480    }
481
482    /**
483     * Resumes all suspended work instances by moving them to the scheduled queue.
484     *
485     * @param queueId the queue id
486     * @return the number of work instances scheduled
487     * @since 5.8
488     */
489    public int scheduleSuspendedWork(final String queueId) throws IOException {
490        return Framework.getService(RedisExecutor.class).execute(jedis -> {
491            for (int n = 0;; n++) {
492                byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId));
493                if (workIdBytes == null) {
494                    return Integer.valueOf(n);
495                }
496            }
497        }).intValue();
498    }
499
500    /**
501     * Suspends all scheduled work instances by moving them to the suspended queue.
502     *
503     * @param queueId the queue id
504     * @return the number of work instances suspended
505     * @since 5.8
506     */
507    public int suspendScheduledWork(final String queueId) throws IOException {
508        return Framework.getService(RedisExecutor.class).execute(jedis -> {
509            for (int n = 0;; n++) {
510                byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId));
511                if (workIdBytes == null) {
512                    return n;
513                }
514            }
515        }).intValue();
516    }
517
518    @Override
519    public WorkQueueMetrics metrics(String queueId) {
520        return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList()));
521    }
522
523    WorkQueueMetrics metrics(String queueId, Number[] counters) {
524        return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]);
525    }
526
527    /**
528     * Persists a work instance and adds it to the scheduled queue.
529     *
530     * @param queueId the queue id
531     * @param work the work instance
532     */
533    public void workSetScheduled(final String queueId, Work work) throws IOException {
534        listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true))));
535    }
536
537    /**
538     * Switches a work to state completed, and saves its new state.
539     */
540    protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException {
541        listener.queueChanged(work,
542                metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true))));
543    }
544
545    /**
546     * Switches a work to state running.
547     *
548     * @param queueId the queue id
549     * @param work the work
550     */
551    protected void workSetRunning(final String queueId, Work work) throws IOException {
552        listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true))));
553    }
554
555    /**
556     * Switches a work to state completed, and saves its new state.
557     */
558    protected void workSetCompleted(final String queueId, final Work work) throws IOException {
559        listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false))));
560    }
561
562    /**
563     * Switches a work to state canceled, and saves its new state.
564     */
565    protected void workSetReschedule(final String queueId, final Work work) throws IOException {
566        listener.queueChanged(work,
567                metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true))));
568    }
569
570    protected List<byte[]> keys(String queueid) {
571        return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid),
572                runningKey(queueid), completedKey(queueid), canceledKey(queueid));
573    }
574
575    protected List<byte[]> args(String workId) throws IOException {
576        return Arrays.asList(workId(workId));
577    }
578
579    protected List<byte[]> args(Work work, boolean serialize) throws IOException {
580        List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState()));
581        if (serialize) {
582            args = new ArrayList<>(args);
583            args.add(serializeWork(work));
584        }
585        return args;
586    }
587
588    /**
589     * Gets the work state.
590     *
591     * @param workId the work id
592     * @return the state, or {@code null} if not found
593     */
594    protected State getWorkStateInfo(final String workId) {
595        final byte[] workIdBytes = bytes(workId);
596        return Framework.getService(RedisExecutor.class).execute(jedis -> {
597            // get state
598            byte[] bytes = jedis.hget(stateKey(), workIdBytes);
599            if (bytes == null || bytes.length == 0) {
600                return null;
601            }
602            switch (bytes[0]) {
603            case STATE_SCHEDULED_B:
604                return State.SCHEDULED;
605            case STATE_RUNNING_B:
606                return State.RUNNING;
607            default:
608                String msg = new String(bytes, UTF_8);
609                log.error("Unknown work state: " + msg + ", work: " + workId);
610                return null;
611            }
612        });
613    }
614
615    protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException {
616        return Framework.getService(RedisExecutor.class).execute(jedis -> {
617            List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
618            List<String> list = new ArrayList<>(keys.size());
619            for (byte[] workIdBytes : keys) {
620                list.add(string(workIdBytes));
621            }
622            return list;
623        });
624    }
625
626    protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException {
627        return Framework.getService(RedisExecutor.class).execute(jedis -> {
628
629            Set<byte[]> keys = jedis.smembers(queueBytes);
630            List<String> list = new ArrayList<>(keys.size());
631            for (byte[] workIdBytes : keys) {
632                list.add(string(workIdBytes));
633            }
634            return list;
635        });
636
637    }
638
639    protected List<Work> listWorkList(final byte[] queueBytes) throws IOException {
640        return Framework.getService(RedisExecutor.class).execute(jedis -> {
641            List<byte[]> keys = jedis.lrange(queueBytes, 0, -1);
642            List<Work> list = new ArrayList<>(keys.size());
643            for (byte[] workIdBytes : keys) {
644                // get data
645                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
646                Work work = deserializeWork(workBytes);
647                list.add(work);
648            }
649            return list;
650        });
651    }
652
653    protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException {
654        return Framework.getService(RedisExecutor.class).execute(jedis -> {
655            Set<byte[]> keys = jedis.smembers(queueBytes);
656            List<Work> list = new ArrayList<>(keys.size());
657            for (byte[] workIdBytes : keys) {
658                // get data
659                byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
660                Work work = deserializeWork(workBytes);
661                list.add(work);
662            }
663            return list;
664        });
665    }
666
667    protected Work getWork(byte[] workIdBytes) {
668        try {
669            return getWorkData(workIdBytes);
670        } catch (IOException e) {
671            throw new RuntimeException(e);
672        }
673    }
674
675    protected Work getWorkData(final byte[] workIdBytes) throws IOException {
676        return Framework.getService(RedisExecutor.class).execute(jedis -> {
677            byte[] workBytes = jedis.hget(dataKey(), workIdBytes);
678            return deserializeWork(workBytes);
679        });
680    }
681
682    /**
683     * Removes first work from work queue.
684     *
685     * @param queueId the queue id
686     * @return the work, or {@code null} if the scheduled queue is empty
687     */
688    protected Work getWorkFromQueue(final String queueId) throws IOException {
689        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
690        List<byte[]> keys = keys(queueId);
691        List<byte[]> args = Collections.singletonList(STATE_RUNNING);
692        List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args);
693        if (result == null) {
694            return null;
695        }
696
697        List<Number> numbers = (List<Number>) result.get(0);
698        WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers));
699        Object bytes = result.get(1);
700        if (bytes instanceof String) {
701            bytes = bytes((String) bytes);
702        }
703        Work work = deserializeWork((byte[]) bytes);
704
705        listener.queueChanged(work, metrics);
706
707        return work;
708    }
709
710    /**
711     * Removes a given work from queue, move the work from scheduled to completed set.
712     */
713    protected void removeScheduledWork(final String queueId, final String workId) throws IOException {
714        evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId));
715    }
716
717    Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException {
718        RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class);
719        List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args);
720        return coerceNullToZero(numbers);
721    }
722
723    protected static Number[] coerceNullToZero(List<Number> numbers) {
724        return coerceNullToZero(numbers.toArray(new Number[numbers.size()]));
725    }
726
727    protected static Number[] coerceNullToZero(Number[] counters) {
728        for (int i = 0; i < counters.length; ++i) {
729            if (counters[i] == null) {
730                counters[i] = 0;
731            }
732        }
733        return counters;
734    }
735
736    @Override
737    public void listen(Listener listener) {
738        this.listener = listener;
739
740    }
741
742    @Override
743    public boolean supportsProcessingDisabling() {
744        return true;
745    }
746
747}