001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.redis.RedisAdmin; 043import org.nuxeo.ecm.core.redis.RedisCallable; 044import org.nuxeo.ecm.core.redis.RedisExecutor; 045import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 046import org.nuxeo.ecm.core.work.WorkHolder; 047import org.nuxeo.ecm.core.work.WorkQueuing; 048import org.nuxeo.ecm.core.work.api.Work; 049import org.nuxeo.ecm.core.work.api.Work.State; 050import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 051import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 052import org.nuxeo.runtime.api.Framework; 053 054import redis.clients.jedis.Jedis; 055import redis.clients.jedis.Protocol; 056import redis.clients.jedis.ScanParams; 057import redis.clients.jedis.ScanResult; 058import redis.clients.jedis.exceptions.JedisException; 059import redis.clients.util.SafeEncoder; 060 061/** 062 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 063 * 064 * @since 5.8 065 */ 066public class RedisWorkQueuing implements WorkQueuing { 067 068 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 069 070 protected static final String UTF_8 = "UTF-8"; 071 072 /** 073 * Global hash of Work instance id -> serialoized Work instance. 074 */ 075 protected static final String KEY_DATA = "data"; 076 077 /** 078 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 079 * a completion time in milliseconds. 080 */ 081 protected static final String KEY_STATE = "state"; 082 083 /** 084 * Per-queue list of suspended Work instance ids. 085 */ 086 protected static final String KEY_SUSPENDED_PREFIX = "prev"; 087 088 protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes(); 089 090 /** 091 * Per-queue list of scheduled Work instance ids. 092 */ 093 protected static final String KEY_QUEUE_PREFIX = "queue"; 094 095 protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes(); 096 097 /** 098 * Per-queue set of scheduled Work instance ids. 099 */ 100 protected static final String KEY_SCHEDULED_PREFIX = "sched"; 101 102 protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes(); 103 104 /** 105 * Per-queue set of running Work instance ids. 106 */ 107 protected static final String KEY_RUNNING_PREFIX = "run"; 108 109 protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes(); 110 111 /** 112 * Per-queue set of counters. 113 */ 114 protected static final String KEY_COMPLETED_PREFIX = "done"; 115 116 protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes(); 117 118 protected static final String KEY_CANCELED_PREFIX = "cancel"; 119 120 protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes(); 121 122 protected static final String KEY_COUNT_PREFIX = "count"; 123 124 protected static final byte STATE_SCHEDULED_B = 'Q'; 125 126 protected static final byte STATE_RUNNING_B = 'R'; 127 128 protected static final byte STATE_RUNNING_C = 'C'; 129 130 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 131 132 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 133 134 protected static final byte[] STATE_UNKNOWN = new byte[0]; 135 136 protected Listener listener; 137 138 protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>(); 139 140 protected String redisNamespace; 141 142 // lua scripts 143 protected byte[] initWorkQueueSha; 144 145 protected byte[] metricsWorkQueueSha; 146 147 protected byte[] schedulingWorkSha; 148 149 protected byte[] popWorkSha; 150 151 protected byte[] runningWorkSha; 152 153 protected byte[] cancelledScheduledWorkSha; 154 155 protected byte[] completedWorkSha; 156 157 protected byte[] cancelledRunningWorkSha; 158 159 public RedisWorkQueuing(Listener listener) { 160 this.listener = listener; 161 loadConfig(); 162 } 163 164 void loadConfig() { 165 RedisAdmin admin = Framework.getService(RedisAdmin.class); 166 redisNamespace = admin.namespace("work"); 167 try { 168 initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes(); 169 metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes(); 170 schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes(); 171 popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes(); 172 runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes(); 173 cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes(); 174 completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes(); 175 cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes(); 176 } catch (IOException e) { 177 throw new RuntimeException("Cannot load LUA scripts", e); 178 } 179 } 180 181 @Override 182 public NuxeoBlockingQueue init(WorkQueueDescriptor config) { 183 evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList()); 184 RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this); 185 allQueued.put(config.id, queue); 186 return queue; 187 } 188 189 @Override 190 public NuxeoBlockingQueue getQueue(String queueId) { 191 return allQueued.get(queueId); 192 } 193 194 @Override 195 public void workSchedule(String queueId, Work work) { 196 getQueue(queueId).offer(new WorkHolder(work)); 197 } 198 199 @Override 200 public void workRunning(String queueId, Work work) { 201 try { 202 workSetRunning(queueId, work); 203 } catch (IOException e) { 204 throw new RuntimeException(e); 205 } 206 } 207 208 @Override 209 public void workCanceled(String queueId, Work work) { 210 try { 211 workSetCancelledScheduled(queueId, work); 212 } catch (IOException e) { 213 throw new RuntimeException(e); 214 } 215 } 216 217 @Override 218 public void workCompleted(String queueId, Work work) { 219 try { 220 workSetCompleted(queueId, work); 221 } catch (IOException e) { 222 throw new RuntimeException(e); 223 } 224 } 225 226 @Override 227 public void workReschedule(String queueId, Work work) { 228 try { 229 workSetReschedule(queueId, work); 230 } catch (IOException e) { 231 throw new RuntimeException(e); 232 } 233 } 234 235 @Override 236 public List<Work> listWork(String queueId, State state) { 237 switch (state) { 238 case SCHEDULED: 239 return listScheduled(queueId); 240 case RUNNING: 241 return listRunning(queueId); 242 default: 243 throw new IllegalArgumentException(String.valueOf(state)); 244 } 245 } 246 247 @Override 248 public List<String> listWorkIds(String queueId, State state) { 249 if (state == null) { 250 return listNonCompletedIds(queueId); 251 } 252 switch (state) { 253 case SCHEDULED: 254 return listScheduledIds(queueId); 255 case RUNNING: 256 return listRunningIds(queueId); 257 default: 258 throw new IllegalArgumentException(String.valueOf(state)); 259 } 260 } 261 262 protected List<Work> listScheduled(String queueId) { 263 try { 264 return listWorkList(queuedKey(queueId)); 265 } catch (IOException e) { 266 throw new RuntimeException(e); 267 } 268 } 269 270 protected List<Work> listRunning(String queueId) { 271 try { 272 return listWorkSet(runningKey(queueId)); 273 } catch (IOException e) { 274 throw new RuntimeException(e); 275 } 276 } 277 278 protected List<String> listScheduledIds(String queueId) { 279 try { 280 return listWorkIdsList(queuedKey(queueId)); 281 } catch (IOException e) { 282 throw new RuntimeException(e); 283 } 284 } 285 286 protected List<String> listRunningIds(String queueId) { 287 try { 288 return listWorkIdsSet(runningKey(queueId)); 289 } catch (IOException e) { 290 throw new RuntimeException(e); 291 } 292 } 293 294 protected List<String> listNonCompletedIds(String queueId) { 295 List<String> list = listScheduledIds(queueId); 296 list.addAll(listRunningIds(queueId)); 297 return list; 298 } 299 300 @Override 301 public long count(String queueId, State state) { 302 switch (state) { 303 case SCHEDULED: 304 return metrics(queueId).scheduled.longValue(); 305 case RUNNING: 306 return metrics(queueId).running.longValue(); 307 default: 308 throw new IllegalArgumentException(String.valueOf(state)); 309 } 310 } 311 312 @Override 313 public Work find(String workId, State state) { 314 if (isWorkInState(workId, state)) { 315 return getWork(bytes(workId)); 316 } 317 return null; 318 } 319 320 @Override 321 public boolean isWorkInState(String workId, State state) { 322 State s = getWorkState(workId); 323 if (state == null) { 324 return s == State.SCHEDULED || s == State.RUNNING; 325 } 326 return s == state; 327 } 328 329 @Override 330 public void removeScheduled(String queueId, String workId) { 331 try { 332 removeScheduledWork(queueId, workId); 333 } catch (IOException cause) { 334 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 335 } 336 } 337 338 @Override 339 public State getWorkState(String workId) { 340 return getWorkStateInfo(workId); 341 } 342 343 @Override 344 public void setActive(String queueId, boolean value) { 345 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 346 if (value) { 347 listener.queueActivated(metrics); 348 } else { 349 listener.queueDeactivated(metrics); 350 } 351 } 352 353 /* 354 * ******************** Redis Interface ******************** 355 */ 356 357 protected String string(byte[] bytes) { 358 try { 359 return new String(bytes, UTF_8); 360 } catch (IOException e) { 361 throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e); 362 } 363 } 364 365 protected byte[] bytes(String string) { 366 try { 367 return string.getBytes(UTF_8); 368 } catch (IOException e) { 369 throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e); 370 } 371 } 372 373 protected byte[] bytes(Work.State state) { 374 switch (state) { 375 case SCHEDULED: 376 return STATE_SCHEDULED; 377 case RUNNING: 378 return STATE_RUNNING; 379 default: 380 return STATE_UNKNOWN; 381 } 382 } 383 384 protected static String key(String... names) { 385 return String.join(":", names); 386 } 387 388 protected byte[] keyBytes(String prefix, String queueId) { 389 return keyBytes(key(prefix, queueId)); 390 } 391 392 protected byte[] keyBytes(String prefix) { 393 return bytes(redisNamespace.concat(prefix)); 394 } 395 396 protected byte[] workId(Work work) { 397 return workId(work.getId()); 398 } 399 400 protected byte[] workId(String id) { 401 return bytes(id); 402 } 403 404 protected byte[] suspendedKey(String queueId) { 405 return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId)); 406 } 407 408 protected byte[] queuedKey(String queueId) { 409 return keyBytes(key(KEY_QUEUE_PREFIX, queueId)); 410 } 411 412 protected byte[] countKey(String queueId) { 413 return keyBytes(key(KEY_COUNT_PREFIX, queueId)); 414 } 415 416 protected byte[] runningKey(String queueId) { 417 return keyBytes(key(KEY_RUNNING_PREFIX, queueId)); 418 } 419 420 protected byte[] scheduledKey(String queueId) { 421 return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId)); 422 } 423 424 protected byte[] completedKey(String queueId) { 425 return keyBytes(key(KEY_COMPLETED_PREFIX, queueId)); 426 } 427 428 protected byte[] canceledKey(String queueId) { 429 return keyBytes(key(KEY_CANCELED_PREFIX, queueId)); 430 } 431 432 protected byte[] stateKey() { 433 return keyBytes(KEY_STATE); 434 } 435 436 protected byte[] dataKey() { 437 return keyBytes(KEY_DATA); 438 } 439 440 protected byte[] serializeWork(Work work) throws IOException { 441 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 442 ObjectOutputStream out = new ObjectOutputStream(baout); 443 out.writeObject(work); 444 out.flush(); 445 out.close(); 446 return baout.toByteArray(); 447 } 448 449 protected Work deserializeWork(byte[] workBytes) { 450 if (workBytes == null) { 451 return null; 452 } 453 InputStream bain = new ByteArrayInputStream(workBytes); 454 try (ObjectInputStream in = new ObjectInputStream(bain)) { 455 return (Work) in.readObject(); 456 } catch (RuntimeException cause) { 457 throw cause; 458 } catch (IOException | ClassNotFoundException cause) { 459 throw new RuntimeException("Cannot deserialize work", cause); 460 } 461 } 462 463 /** 464 * Finds which queues have suspended work. 465 * 466 * @return a set of queue ids 467 * @since 5.8 468 */ 469 protected Set<String> getSuspendedQueueIds() throws IOException { 470 return getQueueIds(KEY_SUSPENDED_PREFIX); 471 } 472 473 protected Set<String> getScheduledQueueIds() { 474 return getQueueIds(KEY_QUEUE_PREFIX); 475 } 476 477 protected Set<String> getRunningQueueIds() { 478 return getQueueIds(KEY_RUNNING_PREFIX); 479 } 480 481 /** 482 * Finds which queues have work for a given state prefix. 483 * 484 * @return a set of queue ids 485 * @since 5.8 486 */ 487 protected Set<String> getQueueIds(final String queuePrefix) { 488 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() { 489 @Override 490 public Set<String> call(Jedis jedis) { 491 int offset = keyBytes(queuePrefix).length; 492 Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*"))); 493 Set<String> queueIds = new HashSet<String>(keys.size()); 494 for (byte[] bytes : keys) { 495 try { 496 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 497 queueIds.add(queueId); 498 } catch (IOException e) { 499 throw new NuxeoException(e); 500 } 501 } 502 return queueIds; 503 } 504 505 }); 506 } 507 508 /** 509 * Resumes all suspended work instances by moving them to the scheduled queue. 510 * 511 * @param queueId the queue id 512 * @return the number of work instances scheduled 513 * @since 5.8 514 */ 515 public int scheduleSuspendedWork(final String queueId) throws IOException { 516 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 517 @Override 518 public Integer call(Jedis jedis) { 519 for (int n = 0;; n++) { 520 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 521 if (workIdBytes == null) { 522 return Integer.valueOf(n); 523 } 524 } 525 } 526 527 }).intValue(); 528 } 529 530 /** 531 * Suspends all scheduled work instances by moving them to the suspended queue. 532 * 533 * @param queueId the queue id 534 * @return the number of work instances suspended 535 * @since 5.8 536 */ 537 public int suspendScheduledWork(final String queueId) throws IOException { 538 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 539 540 @Override 541 public Integer call(Jedis jedis) { 542 for (int n = 0;; n++) { 543 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 544 if (workIdBytes == null) { 545 return n; 546 } 547 } 548 } 549 }).intValue(); 550 } 551 552 @Override 553 public WorkQueueMetrics metrics(String queueId) { 554 return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList())); 555 } 556 557 WorkQueueMetrics metrics(String queueId, Number[] counters) { 558 return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]); 559 } 560 561 /** 562 * Persists a work instance and adds it to the scheduled queue. 563 * 564 * @param queueId the queue id 565 * @param work the work instance 566 * @throws IOException 567 */ 568 public void workSetScheduled(final String queueId, Work work) throws IOException { 569 listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true)))); 570 } 571 572 /** 573 * Switches a work to state completed, and saves its new state. 574 */ 575 protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException { 576 listener.queueChanged(work, 577 metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true)))); 578 } 579 580 /** 581 * Switches a work to state running. 582 * 583 * @param queueId the queue id 584 * @param work the work 585 */ 586 protected void workSetRunning(final String queueId, Work work) throws IOException { 587 listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true)))); 588 } 589 590 /** 591 * Switches a work to state completed, and saves its new state. 592 */ 593 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 594 listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false)))); 595 } 596 597 /** 598 * Switches a work to state canceled, and saves its new state. 599 */ 600 protected void workSetReschedule(final String queueId, final Work work) throws IOException { 601 listener.queueChanged(work, 602 metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true)))); 603 } 604 605 protected List<byte[]> keys(String queueid) { 606 return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid), 607 runningKey(queueid), completedKey(queueid), canceledKey(queueid)); 608 } 609 610 protected List<byte[]> args(String workId) throws IOException { 611 return Arrays.asList(workId(workId)); 612 } 613 614 protected List<byte[]> args(Work work, boolean serialize) throws IOException { 615 List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState())); 616 if (serialize) { 617 args = new ArrayList<>(args); 618 args.add(serializeWork(work)); 619 } 620 return args; 621 } 622 623 /** 624 * Gets the work state. 625 * 626 * @param workId the work id 627 * @return the state, or {@code null} if not found 628 */ 629 protected State getWorkStateInfo(final String workId) { 630 final byte[] workIdBytes = bytes(workId); 631 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() { 632 @Override 633 public State call(Jedis jedis) { 634 // get state 635 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 636 if (bytes == null || bytes.length == 0) { 637 return null; 638 } 639 switch (bytes[0]) { 640 case STATE_SCHEDULED_B: 641 return State.SCHEDULED; 642 case STATE_RUNNING_B: 643 return State.RUNNING; 644 default: 645 String msg; 646 try { 647 msg = new String(bytes, UTF_8); 648 } catch (UnsupportedEncodingException e) { 649 msg = Arrays.toString(bytes); 650 } 651 log.error("Unknown work state: " + msg + ", work: " + workId); 652 return null; 653 } 654 } 655 }); 656 } 657 658 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 659 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 660 661 @Override 662 public List<String> call(Jedis jedis) { 663 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 664 List<String> list = new ArrayList<String>(keys.size()); 665 for (byte[] workIdBytes : keys) { 666 list.add(string(workIdBytes)); 667 } 668 return list; 669 } 670 671 }); 672 } 673 674 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 675 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 676 677 @Override 678 public List<String> call(Jedis jedis) { 679 680 Set<byte[]> keys = jedis.smembers(queueBytes); 681 List<String> list = new ArrayList<String>(keys.size()); 682 for (byte[] workIdBytes : keys) { 683 list.add(string(workIdBytes)); 684 } 685 return list; 686 } 687 688 }); 689 690 } 691 692 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 693 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 694 @Override 695 public List<Work> call(Jedis jedis) { 696 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 697 List<Work> list = new ArrayList<Work>(keys.size()); 698 for (byte[] workIdBytes : keys) { 699 // get data 700 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 701 Work work = deserializeWork(workBytes); 702 list.add(work); 703 } 704 return list; 705 } 706 }); 707 } 708 709 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 710 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 711 @Override 712 public List<Work> call(Jedis jedis) { 713 Set<byte[]> keys = jedis.smembers(queueBytes); 714 List<Work> list = new ArrayList<Work>(keys.size()); 715 for (byte[] workIdBytes : keys) { 716 // get data 717 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 718 Work work = deserializeWork(workBytes); 719 list.add(work); 720 } 721 return list; 722 } 723 }); 724 } 725 726 protected Work getWork(byte[] workIdBytes) { 727 try { 728 return getWorkData(workIdBytes); 729 } catch (IOException e) { 730 throw new RuntimeException(e); 731 } 732 } 733 734 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 735 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() { 736 737 @Override 738 public Work call(Jedis jedis) { 739 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 740 return deserializeWork(workBytes); 741 } 742 743 }); 744 } 745 746 /** 747 * Removes first work from work queue. 748 * 749 * @param queueId the queue id 750 * @return the work, or {@code null} if the scheduled queue is empty 751 */ 752 protected Work getWorkFromQueue(final String queueId) throws IOException { 753 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 754 List<byte[]> keys = keys(queueId); 755 List<byte[]> args = Collections.singletonList(STATE_RUNNING); 756 List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args); 757 if (result == null) { 758 return null; 759 } 760 761 List<Number> numbers = (List<Number>) result.get(0); 762 WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers)); 763 Object bytes = result.get(1); 764 if (bytes instanceof String) { 765 bytes = bytes((String) bytes); 766 } 767 Work work = deserializeWork((byte[]) bytes); 768 769 listener.queueChanged(work, metrics); 770 771 return work; 772 } 773 774 /** 775 * Removes a given work from queue, move the work from scheduled to completed set. 776 * 777 * @throws IOException 778 */ 779 protected void removeScheduledWork(final String queueId, final String workId) throws IOException { 780 evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId)); 781 } 782 783 /** 784 * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does 785 * not support SSCAN. 786 * 787 * @since 7.3 788 */ 789 public static class SScanner { 790 791 // results of SMEMBERS for last key, in embedded mode 792 protected List<String> smembers; 793 794 protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) { 795 ScanResult<String> scanResult; 796 try { 797 scanResult = jedis.sscan(key, cursor, params); 798 } catch (Exception e) { 799 // when testing with embedded fake redis, we may get an un-declared exception 800 if (!(e.getCause() instanceof NoSuchMethodException)) { 801 throw e; 802 } 803 // no SSCAN available in embedded, do a full SMEMBERS 804 if (smembers == null) { 805 Set<String> set = jedis.smembers(key); 806 smembers = new ArrayList<>(set); 807 } 808 Collection<byte[]> bparams = params.getParams(); 809 int count = 1000; 810 for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) { 811 byte[] param = it.next(); 812 if (param.equals(Protocol.Keyword.MATCH.raw)) { 813 throw new UnsupportedOperationException("MATCH not supported"); 814 } 815 if (param.equals(Protocol.Keyword.COUNT.raw)) { 816 count = Integer.parseInt(SafeEncoder.encode(it.next())); 817 } 818 } 819 int pos = Integer.parseInt(cursor); // don't check range, callers are cool 820 int end = Math.min(pos + count, smembers.size()); 821 int nextPos = end == smembers.size() ? 0 : end; 822 scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end)); 823 if (nextPos == 0) { 824 smembers = null; 825 } 826 } 827 return scanResult; 828 } 829 } 830 831 Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException { 832 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 833 List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args); 834 return coerceNullToZero(numbers); 835 } 836 837 protected static Number[] coerceNullToZero(List<Number> numbers) { 838 return coerceNullToZero(numbers.toArray(new Number[numbers.size()])); 839 } 840 841 protected static Number[] coerceNullToZero(Number[] counters) { 842 for (int i = 0; i < counters.length; ++i) { 843 if (counters[i] == null) { 844 counters[i] = 0; 845 } 846 } 847 return counters; 848 } 849 850 @Override 851 public void listen(Listener listener) { 852 this.listener = listener; 853 854 } 855 856}