001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.redis.RedisAdmin; 041import org.nuxeo.ecm.core.redis.RedisExecutor; 042import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 043import org.nuxeo.ecm.core.work.WorkHolder; 044import org.nuxeo.ecm.core.work.WorkQueuing; 045import org.nuxeo.ecm.core.work.api.Work; 046import org.nuxeo.ecm.core.work.api.Work.State; 047import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 048import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 049import org.nuxeo.runtime.api.Framework; 050 051import redis.clients.jedis.exceptions.JedisException; 052 053/** 054 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 055 * 056 * @since 5.8 057 */ 058public class RedisWorkQueuing implements WorkQueuing { 059 060 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 061 062 protected static final String UTF_8 = "UTF-8"; 063 064 /** 065 * Global hash of Work instance id -> serialoized Work instance. 066 */ 067 protected static final String KEY_DATA = "data"; 068 069 /** 070 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 071 * a completion time in milliseconds. 072 */ 073 protected static final String KEY_STATE = "state"; 074 075 /** 076 * Per-queue list of suspended Work instance ids. 077 */ 078 protected static final String KEY_SUSPENDED_PREFIX = "prev"; 079 080 protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes(); 081 082 /** 083 * Per-queue list of scheduled Work instance ids. 084 */ 085 protected static final String KEY_QUEUE_PREFIX = "queue"; 086 087 protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes(); 088 089 /** 090 * Per-queue set of scheduled Work instance ids. 091 */ 092 protected static final String KEY_SCHEDULED_PREFIX = "sched"; 093 094 protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes(); 095 096 /** 097 * Per-queue set of running Work instance ids. 098 */ 099 protected static final String KEY_RUNNING_PREFIX = "run"; 100 101 protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes(); 102 103 /** 104 * Per-queue set of counters. 105 */ 106 protected static final String KEY_COMPLETED_PREFIX = "done"; 107 108 protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes(); 109 110 protected static final String KEY_CANCELED_PREFIX = "cancel"; 111 112 protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes(); 113 114 protected static final String KEY_COUNT_PREFIX = "count"; 115 116 protected static final byte STATE_SCHEDULED_B = 'Q'; 117 118 protected static final byte STATE_RUNNING_B = 'R'; 119 120 protected static final byte STATE_RUNNING_C = 'C'; 121 122 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 123 124 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 125 126 protected static final byte[] STATE_UNKNOWN = new byte[0]; 127 128 protected Listener listener; 129 130 protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>(); 131 132 protected String redisNamespace; 133 134 // lua scripts 135 protected byte[] initWorkQueueSha; 136 137 protected byte[] metricsWorkQueueSha; 138 139 protected byte[] schedulingWorkSha; 140 141 protected byte[] popWorkSha; 142 143 protected byte[] runningWorkSha; 144 145 protected byte[] cancelledScheduledWorkSha; 146 147 protected byte[] completedWorkSha; 148 149 protected byte[] cancelledRunningWorkSha; 150 151 public RedisWorkQueuing(Listener listener) { 152 this.listener = listener; 153 loadConfig(); 154 } 155 156 void loadConfig() { 157 RedisAdmin admin = Framework.getService(RedisAdmin.class); 158 redisNamespace = admin.namespace("work"); 159 try { 160 initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes(); 161 metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes(); 162 schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes(); 163 popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes(); 164 runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes(); 165 cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes(); 166 completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes(); 167 cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes(); 168 } catch (IOException e) { 169 throw new RuntimeException("Cannot load LUA scripts", e); 170 } 171 } 172 173 @Override 174 public NuxeoBlockingQueue init(WorkQueueDescriptor config) { 175 evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList()); 176 RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this); 177 allQueued.put(config.id, queue); 178 return queue; 179 } 180 181 @Override 182 public NuxeoBlockingQueue getQueue(String queueId) { 183 return allQueued.get(queueId); 184 } 185 186 @Override 187 public void workSchedule(String queueId, Work work) { 188 getQueue(queueId).offer(new WorkHolder(work)); 189 } 190 191 @Override 192 public void workRunning(String queueId, Work work) { 193 try { 194 workSetRunning(queueId, work); 195 } catch (IOException e) { 196 throw new RuntimeException(e); 197 } 198 } 199 200 @Override 201 public void workCanceled(String queueId, Work work) { 202 try { 203 workSetCancelledScheduled(queueId, work); 204 } catch (IOException e) { 205 throw new RuntimeException(e); 206 } 207 } 208 209 @Override 210 public void workCompleted(String queueId, Work work) { 211 try { 212 workSetCompleted(queueId, work); 213 } catch (IOException e) { 214 throw new RuntimeException(e); 215 } 216 } 217 218 @Override 219 public void workReschedule(String queueId, Work work) { 220 try { 221 workSetReschedule(queueId, work); 222 } catch (IOException e) { 223 throw new RuntimeException(e); 224 } 225 } 226 227 @Override 228 public List<Work> listWork(String queueId, State state) { 229 switch (state) { 230 case SCHEDULED: 231 return listScheduled(queueId); 232 case RUNNING: 233 return listRunning(queueId); 234 default: 235 throw new IllegalArgumentException(String.valueOf(state)); 236 } 237 } 238 239 @Override 240 public List<String> listWorkIds(String queueId, State state) { 241 if (state == null) { 242 return listNonCompletedIds(queueId); 243 } 244 switch (state) { 245 case SCHEDULED: 246 return listScheduledIds(queueId); 247 case RUNNING: 248 return listRunningIds(queueId); 249 default: 250 throw new IllegalArgumentException(String.valueOf(state)); 251 } 252 } 253 254 protected List<Work> listScheduled(String queueId) { 255 try { 256 return listWorkList(queuedKey(queueId)); 257 } catch (IOException e) { 258 throw new RuntimeException(e); 259 } 260 } 261 262 protected List<Work> listRunning(String queueId) { 263 try { 264 return listWorkSet(runningKey(queueId)); 265 } catch (IOException e) { 266 throw new RuntimeException(e); 267 } 268 } 269 270 protected List<String> listScheduledIds(String queueId) { 271 try { 272 return listWorkIdsList(queuedKey(queueId)); 273 } catch (IOException e) { 274 throw new RuntimeException(e); 275 } 276 } 277 278 protected List<String> listRunningIds(String queueId) { 279 try { 280 return listWorkIdsSet(runningKey(queueId)); 281 } catch (IOException e) { 282 throw new RuntimeException(e); 283 } 284 } 285 286 protected List<String> listNonCompletedIds(String queueId) { 287 List<String> list = listScheduledIds(queueId); 288 list.addAll(listRunningIds(queueId)); 289 return list; 290 } 291 292 @Override 293 public long count(String queueId, State state) { 294 switch (state) { 295 case SCHEDULED: 296 return metrics(queueId).scheduled.longValue(); 297 case RUNNING: 298 return metrics(queueId).running.longValue(); 299 default: 300 throw new IllegalArgumentException(String.valueOf(state)); 301 } 302 } 303 304 @Override 305 public Work find(String workId, State state) { 306 if (isWorkInState(workId, state)) { 307 return getWork(bytes(workId)); 308 } 309 return null; 310 } 311 312 @Override 313 public boolean isWorkInState(String workId, State state) { 314 State s = getWorkState(workId); 315 if (state == null) { 316 return s == State.SCHEDULED || s == State.RUNNING; 317 } 318 return s == state; 319 } 320 321 @Override 322 public void removeScheduled(String queueId, String workId) { 323 try { 324 removeScheduledWork(queueId, workId); 325 } catch (IOException cause) { 326 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 327 } 328 } 329 330 @Override 331 public State getWorkState(String workId) { 332 return getWorkStateInfo(workId); 333 } 334 335 @Override 336 public void setActive(String queueId, boolean value) { 337 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 338 if (value) { 339 listener.queueActivated(metrics); 340 } else { 341 listener.queueDeactivated(metrics); 342 } 343 } 344 345 /* 346 * ******************** Redis Interface ******************** 347 */ 348 349 protected String string(byte[] bytes) { 350 try { 351 return new String(bytes, UTF_8); 352 } catch (IOException e) { 353 throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e); 354 } 355 } 356 357 protected byte[] bytes(String string) { 358 try { 359 return string.getBytes(UTF_8); 360 } catch (IOException e) { 361 throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e); 362 } 363 } 364 365 protected byte[] bytes(Work.State state) { 366 switch (state) { 367 case SCHEDULED: 368 return STATE_SCHEDULED; 369 case RUNNING: 370 return STATE_RUNNING; 371 default: 372 return STATE_UNKNOWN; 373 } 374 } 375 376 protected static String key(String... names) { 377 return String.join(":", names); 378 } 379 380 protected byte[] keyBytes(String prefix, String queueId) { 381 return keyBytes(key(prefix, queueId)); 382 } 383 384 protected byte[] keyBytes(String prefix) { 385 return bytes(redisNamespace.concat(prefix)); 386 } 387 388 protected byte[] workId(Work work) { 389 return workId(work.getId()); 390 } 391 392 protected byte[] workId(String id) { 393 return bytes(id); 394 } 395 396 protected byte[] suspendedKey(String queueId) { 397 return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId)); 398 } 399 400 protected byte[] queuedKey(String queueId) { 401 return keyBytes(key(KEY_QUEUE_PREFIX, queueId)); 402 } 403 404 protected byte[] countKey(String queueId) { 405 return keyBytes(key(KEY_COUNT_PREFIX, queueId)); 406 } 407 408 protected byte[] runningKey(String queueId) { 409 return keyBytes(key(KEY_RUNNING_PREFIX, queueId)); 410 } 411 412 protected byte[] scheduledKey(String queueId) { 413 return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId)); 414 } 415 416 protected byte[] completedKey(String queueId) { 417 return keyBytes(key(KEY_COMPLETED_PREFIX, queueId)); 418 } 419 420 protected byte[] canceledKey(String queueId) { 421 return keyBytes(key(KEY_CANCELED_PREFIX, queueId)); 422 } 423 424 protected byte[] stateKey() { 425 return keyBytes(KEY_STATE); 426 } 427 428 protected byte[] dataKey() { 429 return keyBytes(KEY_DATA); 430 } 431 432 protected byte[] serializeWork(Work work) throws IOException { 433 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 434 ObjectOutputStream out = new ObjectOutputStream(baout); 435 out.writeObject(work); 436 out.flush(); 437 out.close(); 438 return baout.toByteArray(); 439 } 440 441 protected Work deserializeWork(byte[] workBytes) { 442 if (workBytes == null) { 443 return null; 444 } 445 InputStream bain = new ByteArrayInputStream(workBytes); 446 try (ObjectInputStream in = new ObjectInputStream(bain)) { 447 return (Work) in.readObject(); 448 } catch (RuntimeException cause) { 449 throw cause; 450 } catch (IOException | ClassNotFoundException cause) { 451 throw new RuntimeException("Cannot deserialize work", cause); 452 } 453 } 454 455 /** 456 * Finds which queues have suspended work. 457 * 458 * @return a set of queue ids 459 * @since 5.8 460 */ 461 protected Set<String> getSuspendedQueueIds() throws IOException { 462 return getQueueIds(KEY_SUSPENDED_PREFIX); 463 } 464 465 protected Set<String> getScheduledQueueIds() { 466 return getQueueIds(KEY_QUEUE_PREFIX); 467 } 468 469 protected Set<String> getRunningQueueIds() { 470 return getQueueIds(KEY_RUNNING_PREFIX); 471 } 472 473 /** 474 * Finds which queues have work for a given state prefix. 475 * 476 * @return a set of queue ids 477 * @since 5.8 478 */ 479 protected Set<String> getQueueIds(final String queuePrefix) { 480 return Framework.getService(RedisExecutor.class).execute(jedis -> { 481 int offset = keyBytes(queuePrefix).length; 482 Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*"))); 483 Set<String> queueIds = new HashSet<>(keys.size()); 484 for (byte[] bytes : keys) { 485 try { 486 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 487 queueIds.add(queueId); 488 } catch (IOException e) { 489 throw new NuxeoException(e); 490 } 491 } 492 return queueIds; 493 }); 494 } 495 496 /** 497 * Resumes all suspended work instances by moving them to the scheduled queue. 498 * 499 * @param queueId the queue id 500 * @return the number of work instances scheduled 501 * @since 5.8 502 */ 503 public int scheduleSuspendedWork(final String queueId) throws IOException { 504 return Framework.getService(RedisExecutor.class).execute(jedis -> { 505 for (int n = 0;; n++) { 506 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 507 if (workIdBytes == null) { 508 return Integer.valueOf(n); 509 } 510 } 511 }).intValue(); 512 } 513 514 /** 515 * Suspends all scheduled work instances by moving them to the suspended queue. 516 * 517 * @param queueId the queue id 518 * @return the number of work instances suspended 519 * @since 5.8 520 */ 521 public int suspendScheduledWork(final String queueId) throws IOException { 522 return Framework.getService(RedisExecutor.class).execute(jedis -> { 523 for (int n = 0;; n++) { 524 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 525 if (workIdBytes == null) { 526 return n; 527 } 528 } 529 }).intValue(); 530 } 531 532 @Override 533 public WorkQueueMetrics metrics(String queueId) { 534 return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList())); 535 } 536 537 WorkQueueMetrics metrics(String queueId, Number[] counters) { 538 return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]); 539 } 540 541 /** 542 * Persists a work instance and adds it to the scheduled queue. 543 * 544 * @param queueId the queue id 545 * @param work the work instance 546 * @throws IOException 547 */ 548 public void workSetScheduled(final String queueId, Work work) throws IOException { 549 listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true)))); 550 } 551 552 /** 553 * Switches a work to state completed, and saves its new state. 554 */ 555 protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException { 556 listener.queueChanged(work, 557 metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true)))); 558 } 559 560 /** 561 * Switches a work to state running. 562 * 563 * @param queueId the queue id 564 * @param work the work 565 */ 566 protected void workSetRunning(final String queueId, Work work) throws IOException { 567 listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true)))); 568 } 569 570 /** 571 * Switches a work to state completed, and saves its new state. 572 */ 573 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 574 listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false)))); 575 } 576 577 /** 578 * Switches a work to state canceled, and saves its new state. 579 */ 580 protected void workSetReschedule(final String queueId, final Work work) throws IOException { 581 listener.queueChanged(work, 582 metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true)))); 583 } 584 585 protected List<byte[]> keys(String queueid) { 586 return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid), 587 runningKey(queueid), completedKey(queueid), canceledKey(queueid)); 588 } 589 590 protected List<byte[]> args(String workId) throws IOException { 591 return Arrays.asList(workId(workId)); 592 } 593 594 protected List<byte[]> args(Work work, boolean serialize) throws IOException { 595 List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState())); 596 if (serialize) { 597 args = new ArrayList<>(args); 598 args.add(serializeWork(work)); 599 } 600 return args; 601 } 602 603 /** 604 * Gets the work state. 605 * 606 * @param workId the work id 607 * @return the state, or {@code null} if not found 608 */ 609 protected State getWorkStateInfo(final String workId) { 610 final byte[] workIdBytes = bytes(workId); 611 return Framework.getService(RedisExecutor.class).execute(jedis -> { 612 // get state 613 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 614 if (bytes == null || bytes.length == 0) { 615 return null; 616 } 617 switch (bytes[0]) { 618 case STATE_SCHEDULED_B: 619 return State.SCHEDULED; 620 case STATE_RUNNING_B: 621 return State.RUNNING; 622 default: 623 String msg; 624 try { 625 msg = new String(bytes, UTF_8); 626 } catch (UnsupportedEncodingException e) { 627 msg = Arrays.toString(bytes); 628 } 629 log.error("Unknown work state: " + msg + ", work: " + workId); 630 return null; 631 } 632 }); 633 } 634 635 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 636 return Framework.getService(RedisExecutor.class).execute(jedis -> { 637 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 638 List<String> list = new ArrayList<>(keys.size()); 639 for (byte[] workIdBytes : keys) { 640 list.add(string(workIdBytes)); 641 } 642 return list; 643 }); 644 } 645 646 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 647 return Framework.getService(RedisExecutor.class).execute(jedis -> { 648 649 Set<byte[]> keys = jedis.smembers(queueBytes); 650 List<String> list = new ArrayList<>(keys.size()); 651 for (byte[] workIdBytes : keys) { 652 list.add(string(workIdBytes)); 653 } 654 return list; 655 }); 656 657 } 658 659 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 660 return Framework.getService(RedisExecutor.class).execute(jedis -> { 661 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 662 List<Work> list = new ArrayList<>(keys.size()); 663 for (byte[] workIdBytes : keys) { 664 // get data 665 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 666 Work work = deserializeWork(workBytes); 667 list.add(work); 668 } 669 return list; 670 }); 671 } 672 673 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 674 return Framework.getService(RedisExecutor.class).execute(jedis -> { 675 Set<byte[]> keys = jedis.smembers(queueBytes); 676 List<Work> list = new ArrayList<>(keys.size()); 677 for (byte[] workIdBytes : keys) { 678 // get data 679 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 680 Work work = deserializeWork(workBytes); 681 list.add(work); 682 } 683 return list; 684 }); 685 } 686 687 protected Work getWork(byte[] workIdBytes) { 688 try { 689 return getWorkData(workIdBytes); 690 } catch (IOException e) { 691 throw new RuntimeException(e); 692 } 693 } 694 695 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 696 return Framework.getService(RedisExecutor.class).execute(jedis -> { 697 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 698 return deserializeWork(workBytes); 699 }); 700 } 701 702 /** 703 * Removes first work from work queue. 704 * 705 * @param queueId the queue id 706 * @return the work, or {@code null} if the scheduled queue is empty 707 */ 708 protected Work getWorkFromQueue(final String queueId) throws IOException { 709 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 710 List<byte[]> keys = keys(queueId); 711 List<byte[]> args = Collections.singletonList(STATE_RUNNING); 712 List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args); 713 if (result == null) { 714 return null; 715 } 716 717 List<Number> numbers = (List<Number>) result.get(0); 718 WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers)); 719 Object bytes = result.get(1); 720 if (bytes instanceof String) { 721 bytes = bytes((String) bytes); 722 } 723 Work work = deserializeWork((byte[]) bytes); 724 725 listener.queueChanged(work, metrics); 726 727 return work; 728 } 729 730 /** 731 * Removes a given work from queue, move the work from scheduled to completed set. 732 * 733 * @throws IOException 734 */ 735 protected void removeScheduledWork(final String queueId, final String workId) throws IOException { 736 evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId)); 737 } 738 739 Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException { 740 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 741 List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args); 742 return coerceNullToZero(numbers); 743 } 744 745 protected static Number[] coerceNullToZero(List<Number> numbers) { 746 return coerceNullToZero(numbers.toArray(new Number[numbers.size()])); 747 } 748 749 protected static Number[] coerceNullToZero(Number[] counters) { 750 for (int i = 0; i < counters.length; ++i) { 751 if (counters[i] == null) { 752 counters[i] = 0; 753 } 754 } 755 return counters; 756 } 757 758 @Override 759 public void listen(Listener listener) { 760 this.listener = listener; 761 762 } 763 764 @Override 765 public boolean supportsProcessingDisabling() { 766 return true; 767 } 768 769}