001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.redis.RedisAdmin; 041import org.nuxeo.ecm.core.redis.RedisCallable; 042import org.nuxeo.ecm.core.redis.RedisExecutor; 043import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 044import org.nuxeo.ecm.core.work.WorkHolder; 045import org.nuxeo.ecm.core.work.WorkQueuing; 046import org.nuxeo.ecm.core.work.api.Work; 047import org.nuxeo.ecm.core.work.api.Work.State; 048import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 049import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 050import org.nuxeo.runtime.api.Framework; 051 052import redis.clients.jedis.Jedis; 053import redis.clients.jedis.exceptions.JedisException; 054 055/** 056 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 057 * 058 * @since 5.8 059 */ 060public class RedisWorkQueuing implements WorkQueuing { 061 062 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 063 064 protected static final String UTF_8 = "UTF-8"; 065 066 /** 067 * Global hash of Work instance id -> serialoized Work instance. 068 */ 069 protected static final String KEY_DATA = "data"; 070 071 /** 072 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 073 * a completion time in milliseconds. 074 */ 075 protected static final String KEY_STATE = "state"; 076 077 /** 078 * Per-queue list of suspended Work instance ids. 079 */ 080 protected static final String KEY_SUSPENDED_PREFIX = "prev"; 081 082 protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes(); 083 084 /** 085 * Per-queue list of scheduled Work instance ids. 086 */ 087 protected static final String KEY_QUEUE_PREFIX = "queue"; 088 089 protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes(); 090 091 /** 092 * Per-queue set of scheduled Work instance ids. 093 */ 094 protected static final String KEY_SCHEDULED_PREFIX = "sched"; 095 096 protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes(); 097 098 /** 099 * Per-queue set of running Work instance ids. 100 */ 101 protected static final String KEY_RUNNING_PREFIX = "run"; 102 103 protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes(); 104 105 /** 106 * Per-queue set of counters. 107 */ 108 protected static final String KEY_COMPLETED_PREFIX = "done"; 109 110 protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes(); 111 112 protected static final String KEY_CANCELED_PREFIX = "cancel"; 113 114 protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes(); 115 116 protected static final String KEY_COUNT_PREFIX = "count"; 117 118 protected static final byte STATE_SCHEDULED_B = 'Q'; 119 120 protected static final byte STATE_RUNNING_B = 'R'; 121 122 protected static final byte STATE_RUNNING_C = 'C'; 123 124 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 125 126 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 127 128 protected static final byte[] STATE_UNKNOWN = new byte[0]; 129 130 protected Listener listener; 131 132 protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>(); 133 134 protected String redisNamespace; 135 136 // lua scripts 137 protected byte[] initWorkQueueSha; 138 139 protected byte[] metricsWorkQueueSha; 140 141 protected byte[] schedulingWorkSha; 142 143 protected byte[] popWorkSha; 144 145 protected byte[] runningWorkSha; 146 147 protected byte[] cancelledScheduledWorkSha; 148 149 protected byte[] completedWorkSha; 150 151 protected byte[] cancelledRunningWorkSha; 152 153 public RedisWorkQueuing(Listener listener) { 154 this.listener = listener; 155 loadConfig(); 156 } 157 158 void loadConfig() { 159 RedisAdmin admin = Framework.getService(RedisAdmin.class); 160 redisNamespace = admin.namespace("work"); 161 try { 162 initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes(); 163 metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes(); 164 schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes(); 165 popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes(); 166 runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes(); 167 cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes(); 168 completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes(); 169 cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes(); 170 } catch (IOException e) { 171 throw new RuntimeException("Cannot load LUA scripts", e); 172 } 173 } 174 175 @Override 176 public NuxeoBlockingQueue init(WorkQueueDescriptor config) { 177 evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList()); 178 RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this); 179 allQueued.put(config.id, queue); 180 return queue; 181 } 182 183 @Override 184 public NuxeoBlockingQueue getQueue(String queueId) { 185 return allQueued.get(queueId); 186 } 187 188 @Override 189 public void workSchedule(String queueId, Work work) { 190 getQueue(queueId).offer(new WorkHolder(work)); 191 } 192 193 @Override 194 public void workRunning(String queueId, Work work) { 195 try { 196 workSetRunning(queueId, work); 197 } catch (IOException e) { 198 throw new RuntimeException(e); 199 } 200 } 201 202 @Override 203 public void workCanceled(String queueId, Work work) { 204 try { 205 workSetCancelledScheduled(queueId, work); 206 } catch (IOException e) { 207 throw new RuntimeException(e); 208 } 209 } 210 211 @Override 212 public void workCompleted(String queueId, Work work) { 213 try { 214 workSetCompleted(queueId, work); 215 } catch (IOException e) { 216 throw new RuntimeException(e); 217 } 218 } 219 220 @Override 221 public void workReschedule(String queueId, Work work) { 222 try { 223 workSetReschedule(queueId, work); 224 } catch (IOException e) { 225 throw new RuntimeException(e); 226 } 227 } 228 229 @Override 230 public List<Work> listWork(String queueId, State state) { 231 switch (state) { 232 case SCHEDULED: 233 return listScheduled(queueId); 234 case RUNNING: 235 return listRunning(queueId); 236 default: 237 throw new IllegalArgumentException(String.valueOf(state)); 238 } 239 } 240 241 @Override 242 public List<String> listWorkIds(String queueId, State state) { 243 if (state == null) { 244 return listNonCompletedIds(queueId); 245 } 246 switch (state) { 247 case SCHEDULED: 248 return listScheduledIds(queueId); 249 case RUNNING: 250 return listRunningIds(queueId); 251 default: 252 throw new IllegalArgumentException(String.valueOf(state)); 253 } 254 } 255 256 protected List<Work> listScheduled(String queueId) { 257 try { 258 return listWorkList(queuedKey(queueId)); 259 } catch (IOException e) { 260 throw new RuntimeException(e); 261 } 262 } 263 264 protected List<Work> listRunning(String queueId) { 265 try { 266 return listWorkSet(runningKey(queueId)); 267 } catch (IOException e) { 268 throw new RuntimeException(e); 269 } 270 } 271 272 protected List<String> listScheduledIds(String queueId) { 273 try { 274 return listWorkIdsList(queuedKey(queueId)); 275 } catch (IOException e) { 276 throw new RuntimeException(e); 277 } 278 } 279 280 protected List<String> listRunningIds(String queueId) { 281 try { 282 return listWorkIdsSet(runningKey(queueId)); 283 } catch (IOException e) { 284 throw new RuntimeException(e); 285 } 286 } 287 288 protected List<String> listNonCompletedIds(String queueId) { 289 List<String> list = listScheduledIds(queueId); 290 list.addAll(listRunningIds(queueId)); 291 return list; 292 } 293 294 @Override 295 public long count(String queueId, State state) { 296 switch (state) { 297 case SCHEDULED: 298 return metrics(queueId).scheduled.longValue(); 299 case RUNNING: 300 return metrics(queueId).running.longValue(); 301 default: 302 throw new IllegalArgumentException(String.valueOf(state)); 303 } 304 } 305 306 @Override 307 public Work find(String workId, State state) { 308 if (isWorkInState(workId, state)) { 309 return getWork(bytes(workId)); 310 } 311 return null; 312 } 313 314 @Override 315 public boolean isWorkInState(String workId, State state) { 316 State s = getWorkState(workId); 317 if (state == null) { 318 return s == State.SCHEDULED || s == State.RUNNING; 319 } 320 return s == state; 321 } 322 323 @Override 324 public void removeScheduled(String queueId, String workId) { 325 try { 326 removeScheduledWork(queueId, workId); 327 } catch (IOException cause) { 328 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 329 } 330 } 331 332 @Override 333 public State getWorkState(String workId) { 334 return getWorkStateInfo(workId); 335 } 336 337 @Override 338 public void setActive(String queueId, boolean value) { 339 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 340 if (value) { 341 listener.queueActivated(metrics); 342 } else { 343 listener.queueDeactivated(metrics); 344 } 345 } 346 347 /* 348 * ******************** Redis Interface ******************** 349 */ 350 351 protected String string(byte[] bytes) { 352 try { 353 return new String(bytes, UTF_8); 354 } catch (IOException e) { 355 throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e); 356 } 357 } 358 359 protected byte[] bytes(String string) { 360 try { 361 return string.getBytes(UTF_8); 362 } catch (IOException e) { 363 throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e); 364 } 365 } 366 367 protected byte[] bytes(Work.State state) { 368 switch (state) { 369 case SCHEDULED: 370 return STATE_SCHEDULED; 371 case RUNNING: 372 return STATE_RUNNING; 373 default: 374 return STATE_UNKNOWN; 375 } 376 } 377 378 protected static String key(String... names) { 379 return String.join(":", names); 380 } 381 382 protected byte[] keyBytes(String prefix, String queueId) { 383 return keyBytes(key(prefix, queueId)); 384 } 385 386 protected byte[] keyBytes(String prefix) { 387 return bytes(redisNamespace.concat(prefix)); 388 } 389 390 protected byte[] workId(Work work) { 391 return workId(work.getId()); 392 } 393 394 protected byte[] workId(String id) { 395 return bytes(id); 396 } 397 398 protected byte[] suspendedKey(String queueId) { 399 return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId)); 400 } 401 402 protected byte[] queuedKey(String queueId) { 403 return keyBytes(key(KEY_QUEUE_PREFIX, queueId)); 404 } 405 406 protected byte[] countKey(String queueId) { 407 return keyBytes(key(KEY_COUNT_PREFIX, queueId)); 408 } 409 410 protected byte[] runningKey(String queueId) { 411 return keyBytes(key(KEY_RUNNING_PREFIX, queueId)); 412 } 413 414 protected byte[] scheduledKey(String queueId) { 415 return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId)); 416 } 417 418 protected byte[] completedKey(String queueId) { 419 return keyBytes(key(KEY_COMPLETED_PREFIX, queueId)); 420 } 421 422 protected byte[] canceledKey(String queueId) { 423 return keyBytes(key(KEY_CANCELED_PREFIX, queueId)); 424 } 425 426 protected byte[] stateKey() { 427 return keyBytes(KEY_STATE); 428 } 429 430 protected byte[] dataKey() { 431 return keyBytes(KEY_DATA); 432 } 433 434 protected byte[] serializeWork(Work work) throws IOException { 435 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 436 ObjectOutputStream out = new ObjectOutputStream(baout); 437 out.writeObject(work); 438 out.flush(); 439 out.close(); 440 return baout.toByteArray(); 441 } 442 443 protected Work deserializeWork(byte[] workBytes) { 444 if (workBytes == null) { 445 return null; 446 } 447 InputStream bain = new ByteArrayInputStream(workBytes); 448 try (ObjectInputStream in = new ObjectInputStream(bain)) { 449 return (Work) in.readObject(); 450 } catch (RuntimeException cause) { 451 throw cause; 452 } catch (IOException | ClassNotFoundException cause) { 453 throw new RuntimeException("Cannot deserialize work", cause); 454 } 455 } 456 457 /** 458 * Finds which queues have suspended work. 459 * 460 * @return a set of queue ids 461 * @since 5.8 462 */ 463 protected Set<String> getSuspendedQueueIds() throws IOException { 464 return getQueueIds(KEY_SUSPENDED_PREFIX); 465 } 466 467 protected Set<String> getScheduledQueueIds() { 468 return getQueueIds(KEY_QUEUE_PREFIX); 469 } 470 471 protected Set<String> getRunningQueueIds() { 472 return getQueueIds(KEY_RUNNING_PREFIX); 473 } 474 475 /** 476 * Finds which queues have work for a given state prefix. 477 * 478 * @return a set of queue ids 479 * @since 5.8 480 */ 481 protected Set<String> getQueueIds(final String queuePrefix) { 482 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() { 483 @Override 484 public Set<String> call(Jedis jedis) { 485 int offset = keyBytes(queuePrefix).length; 486 Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*"))); 487 Set<String> queueIds = new HashSet<String>(keys.size()); 488 for (byte[] bytes : keys) { 489 try { 490 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 491 queueIds.add(queueId); 492 } catch (IOException e) { 493 throw new NuxeoException(e); 494 } 495 } 496 return queueIds; 497 } 498 499 }); 500 } 501 502 /** 503 * Resumes all suspended work instances by moving them to the scheduled queue. 504 * 505 * @param queueId the queue id 506 * @return the number of work instances scheduled 507 * @since 5.8 508 */ 509 public int scheduleSuspendedWork(final String queueId) throws IOException { 510 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 511 @Override 512 public Integer call(Jedis jedis) { 513 for (int n = 0;; n++) { 514 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 515 if (workIdBytes == null) { 516 return Integer.valueOf(n); 517 } 518 } 519 } 520 521 }).intValue(); 522 } 523 524 /** 525 * Suspends all scheduled work instances by moving them to the suspended queue. 526 * 527 * @param queueId the queue id 528 * @return the number of work instances suspended 529 * @since 5.8 530 */ 531 public int suspendScheduledWork(final String queueId) throws IOException { 532 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 533 534 @Override 535 public Integer call(Jedis jedis) { 536 for (int n = 0;; n++) { 537 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 538 if (workIdBytes == null) { 539 return n; 540 } 541 } 542 } 543 }).intValue(); 544 } 545 546 @Override 547 public WorkQueueMetrics metrics(String queueId) { 548 return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList())); 549 } 550 551 WorkQueueMetrics metrics(String queueId, Number[] counters) { 552 return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]); 553 } 554 555 /** 556 * Persists a work instance and adds it to the scheduled queue. 557 * 558 * @param queueId the queue id 559 * @param work the work instance 560 * @throws IOException 561 */ 562 public void workSetScheduled(final String queueId, Work work) throws IOException { 563 listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true)))); 564 } 565 566 /** 567 * Switches a work to state completed, and saves its new state. 568 */ 569 protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException { 570 listener.queueChanged(work, 571 metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true)))); 572 } 573 574 /** 575 * Switches a work to state running. 576 * 577 * @param queueId the queue id 578 * @param work the work 579 */ 580 protected void workSetRunning(final String queueId, Work work) throws IOException { 581 listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true)))); 582 } 583 584 /** 585 * Switches a work to state completed, and saves its new state. 586 */ 587 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 588 listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false)))); 589 } 590 591 /** 592 * Switches a work to state canceled, and saves its new state. 593 */ 594 protected void workSetReschedule(final String queueId, final Work work) throws IOException { 595 listener.queueChanged(work, 596 metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true)))); 597 } 598 599 protected List<byte[]> keys(String queueid) { 600 return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid), 601 runningKey(queueid), completedKey(queueid), canceledKey(queueid)); 602 } 603 604 protected List<byte[]> args(String workId) throws IOException { 605 return Arrays.asList(workId(workId)); 606 } 607 608 protected List<byte[]> args(Work work, boolean serialize) throws IOException { 609 List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState())); 610 if (serialize) { 611 args = new ArrayList<>(args); 612 args.add(serializeWork(work)); 613 } 614 return args; 615 } 616 617 /** 618 * Gets the work state. 619 * 620 * @param workId the work id 621 * @return the state, or {@code null} if not found 622 */ 623 protected State getWorkStateInfo(final String workId) { 624 final byte[] workIdBytes = bytes(workId); 625 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() { 626 @Override 627 public State call(Jedis jedis) { 628 // get state 629 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 630 if (bytes == null || bytes.length == 0) { 631 return null; 632 } 633 switch (bytes[0]) { 634 case STATE_SCHEDULED_B: 635 return State.SCHEDULED; 636 case STATE_RUNNING_B: 637 return State.RUNNING; 638 default: 639 String msg; 640 try { 641 msg = new String(bytes, UTF_8); 642 } catch (UnsupportedEncodingException e) { 643 msg = Arrays.toString(bytes); 644 } 645 log.error("Unknown work state: " + msg + ", work: " + workId); 646 return null; 647 } 648 } 649 }); 650 } 651 652 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 653 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 654 655 @Override 656 public List<String> call(Jedis jedis) { 657 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 658 List<String> list = new ArrayList<String>(keys.size()); 659 for (byte[] workIdBytes : keys) { 660 list.add(string(workIdBytes)); 661 } 662 return list; 663 } 664 665 }); 666 } 667 668 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 669 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 670 671 @Override 672 public List<String> call(Jedis jedis) { 673 674 Set<byte[]> keys = jedis.smembers(queueBytes); 675 List<String> list = new ArrayList<String>(keys.size()); 676 for (byte[] workIdBytes : keys) { 677 list.add(string(workIdBytes)); 678 } 679 return list; 680 } 681 682 }); 683 684 } 685 686 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 687 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 688 @Override 689 public List<Work> call(Jedis jedis) { 690 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 691 List<Work> list = new ArrayList<Work>(keys.size()); 692 for (byte[] workIdBytes : keys) { 693 // get data 694 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 695 Work work = deserializeWork(workBytes); 696 list.add(work); 697 } 698 return list; 699 } 700 }); 701 } 702 703 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 704 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 705 @Override 706 public List<Work> call(Jedis jedis) { 707 Set<byte[]> keys = jedis.smembers(queueBytes); 708 List<Work> list = new ArrayList<Work>(keys.size()); 709 for (byte[] workIdBytes : keys) { 710 // get data 711 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 712 Work work = deserializeWork(workBytes); 713 list.add(work); 714 } 715 return list; 716 } 717 }); 718 } 719 720 protected Work getWork(byte[] workIdBytes) { 721 try { 722 return getWorkData(workIdBytes); 723 } catch (IOException e) { 724 throw new RuntimeException(e); 725 } 726 } 727 728 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 729 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() { 730 731 @Override 732 public Work call(Jedis jedis) { 733 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 734 return deserializeWork(workBytes); 735 } 736 737 }); 738 } 739 740 /** 741 * Removes first work from work queue. 742 * 743 * @param queueId the queue id 744 * @return the work, or {@code null} if the scheduled queue is empty 745 */ 746 protected Work getWorkFromQueue(final String queueId) throws IOException { 747 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 748 List<byte[]> keys = keys(queueId); 749 List<byte[]> args = Collections.singletonList(STATE_RUNNING); 750 List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args); 751 if (result == null) { 752 return null; 753 } 754 755 List<Number> numbers = (List<Number>) result.get(0); 756 WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers)); 757 Object bytes = result.get(1); 758 if (bytes instanceof String) { 759 bytes = bytes((String) bytes); 760 } 761 Work work = deserializeWork((byte[]) bytes); 762 763 listener.queueChanged(work, metrics); 764 765 return work; 766 } 767 768 /** 769 * Removes a given work from queue, move the work from scheduled to completed set. 770 * 771 * @throws IOException 772 */ 773 protected void removeScheduledWork(final String queueId, final String workId) throws IOException { 774 evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId)); 775 } 776 777 Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException { 778 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 779 List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args); 780 return coerceNullToZero(numbers); 781 } 782 783 protected static Number[] coerceNullToZero(List<Number> numbers) { 784 return coerceNullToZero(numbers.toArray(new Number[numbers.size()])); 785 } 786 787 protected static Number[] coerceNullToZero(Number[] counters) { 788 for (int i = 0; i < counters.length; ++i) { 789 if (counters[i] == null) { 790 counters[i] = 0; 791 } 792 } 793 return counters; 794 } 795 796 @Override 797 public void listen(Listener listener) { 798 this.listener = listener; 799 800 } 801 802}