001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.redis.RedisAdmin; 042import org.nuxeo.ecm.core.redis.RedisCallable; 043import org.nuxeo.ecm.core.redis.RedisExecutor; 044import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 045import org.nuxeo.ecm.core.work.WorkHolder; 046import org.nuxeo.ecm.core.work.WorkQueuing; 047import org.nuxeo.ecm.core.work.api.Work; 048import org.nuxeo.ecm.core.work.api.Work.State; 049import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 050import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 051import org.nuxeo.runtime.api.Framework; 052 053import redis.clients.jedis.Jedis; 054import redis.clients.jedis.Protocol; 055import redis.clients.jedis.ScanParams; 056import redis.clients.jedis.ScanResult; 057import redis.clients.jedis.exceptions.JedisException; 058import redis.clients.util.SafeEncoder; 059 060/** 061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 062 * 063 * @since 5.8 064 */ 065public class RedisWorkQueuing implements WorkQueuing { 066 067 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 068 069 protected static final String UTF_8 = "UTF-8"; 070 071 /** 072 * Global hash of Work instance id -> serialoized Work instance. 073 */ 074 protected static final String KEY_DATA = "data"; 075 076 /** 077 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 078 * a completion time in milliseconds. 079 */ 080 protected static final String KEY_STATE = "state"; 081 082 /** 083 * Per-queue list of suspended Work instance ids. 084 */ 085 protected static final String KEY_SUSPENDED_PREFIX = "prev"; 086 087 protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes(); 088 089 /** 090 * Per-queue list of scheduled Work instance ids. 091 */ 092 protected static final String KEY_QUEUE_PREFIX = "queue"; 093 094 protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes(); 095 096 /** 097 * Per-queue set of scheduled Work instance ids. 098 */ 099 protected static final String KEY_SCHEDULED_PREFIX = "sched"; 100 101 protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes(); 102 103 /** 104 * Per-queue set of running Work instance ids. 105 */ 106 protected static final String KEY_RUNNING_PREFIX = "run"; 107 108 protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes(); 109 110 /** 111 * Per-queue set of counters. 112 */ 113 protected static final String KEY_COMPLETED_PREFIX = "done"; 114 115 protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes(); 116 117 protected static final String KEY_CANCELED_PREFIX = "cancel"; 118 119 protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes(); 120 121 protected static final String KEY_COUNT_PREFIX = "count"; 122 123 protected static final byte STATE_SCHEDULED_B = 'Q'; 124 125 protected static final byte STATE_RUNNING_B = 'R'; 126 127 protected static final byte STATE_RUNNING_C = 'C'; 128 129 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 130 131 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 132 133 protected static final byte[] STATE_UNKNOWN = new byte[0]; 134 135 protected Listener listener; 136 137 protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>(); 138 139 protected String redisNamespace; 140 141 // lua scripts 142 protected byte[] initWorkQueueSha; 143 144 protected byte[] metricsWorkQueueSha; 145 146 protected byte[] schedulingWorkSha; 147 148 protected byte[] popWorkSha; 149 150 protected byte[] runningWorkSha; 151 152 protected byte[] cancelledScheduledWorkSha; 153 154 protected byte[] completedWorkSha; 155 156 protected byte[] cancelledRunningWorkSha; 157 158 public RedisWorkQueuing(Listener listener) { 159 this.listener = listener; 160 loadConfig(); 161 } 162 163 void loadConfig() { 164 RedisAdmin admin = Framework.getService(RedisAdmin.class); 165 redisNamespace = admin.namespace("work"); 166 try { 167 initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue") 168 .getBytes(); 169 metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue") 170 .getBytes(); 171 schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work") 172 .getBytes(); 173 popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work") 174 .getBytes(); 175 runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work") 176 .getBytes(); 177 cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work") 178 .getBytes(); 179 completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work") 180 .getBytes(); 181 cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work") 182 .getBytes(); 183 } catch (IOException e) { 184 throw new RuntimeException("Cannot load LUA scripts", e); 185 } 186 } 187 188 @Override 189 public NuxeoBlockingQueue init(WorkQueueDescriptor config) { 190 evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList()); 191 RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this); 192 allQueued.put(config.id, queue); 193 return queue; 194 } 195 196 @Override 197 public NuxeoBlockingQueue getQueue(String queueId) { 198 return allQueued.get(queueId); 199 } 200 201 202 @Override 203 public void workSchedule(String queueId, Work work) { 204 getQueue(queueId).offer(new WorkHolder(work)); 205 } 206 207 @Override 208 public void workRunning(String queueId, Work work) { 209 try { 210 workSetRunning(queueId, work); 211 } catch (IOException e) { 212 throw new RuntimeException(e); 213 } 214 } 215 216 @Override 217 public void workCanceled(String queueId, Work work) { 218 try { 219 workSetCancelledScheduled(queueId, work); 220 } catch (IOException e) { 221 throw new RuntimeException(e); 222 } 223 } 224 225 @Override 226 public void workCompleted(String queueId, Work work) { 227 try { 228 workSetCompleted(queueId, work); 229 } catch (IOException e) { 230 throw new RuntimeException(e); 231 } 232 } 233 234 @Override 235 public void workReschedule(String queueId, Work work) { 236 try { 237 workSetReschedule(queueId, work); 238 } catch (IOException e) { 239 throw new RuntimeException(e); 240 } 241 } 242 243 @Override 244 public List<Work> listWork(String queueId, State state) { 245 switch (state) { 246 case SCHEDULED: 247 return listScheduled(queueId); 248 case RUNNING: 249 return listRunning(queueId); 250 default: 251 throw new IllegalArgumentException(String.valueOf(state)); 252 } 253 } 254 255 @Override 256 public List<String> listWorkIds(String queueId, State state) { 257 if (state == null) { 258 return listNonCompletedIds(queueId); 259 } 260 switch (state) { 261 case SCHEDULED: 262 return listScheduledIds(queueId); 263 case RUNNING: 264 return listRunningIds(queueId); 265 default: 266 throw new IllegalArgumentException(String.valueOf(state)); 267 } 268 } 269 270 protected List<Work> listScheduled(String queueId) { 271 try { 272 return listWorkList(queuedKey(queueId)); 273 } catch (IOException e) { 274 throw new RuntimeException(e); 275 } 276 } 277 278 protected List<Work> listRunning(String queueId) { 279 try { 280 return listWorkSet(runningKey(queueId)); 281 } catch (IOException e) { 282 throw new RuntimeException(e); 283 } 284 } 285 286 protected List<String> listScheduledIds(String queueId) { 287 try { 288 return listWorkIdsList(queuedKey(queueId)); 289 } catch (IOException e) { 290 throw new RuntimeException(e); 291 } 292 } 293 294 protected List<String> listRunningIds(String queueId) { 295 try { 296 return listWorkIdsSet(runningKey(queueId)); 297 } catch (IOException e) { 298 throw new RuntimeException(e); 299 } 300 } 301 302 protected List<String> listNonCompletedIds(String queueId) { 303 List<String> list = listScheduledIds(queueId); 304 list.addAll(listRunningIds(queueId)); 305 return list; 306 } 307 308 @Override 309 public long count(String queueId, State state) { 310 switch (state) { 311 case SCHEDULED: 312 return metrics(queueId).scheduled.longValue(); 313 case RUNNING: 314 return metrics(queueId).running.longValue(); 315 default: 316 throw new IllegalArgumentException(String.valueOf(state)); 317 } 318 } 319 320 @Override 321 public Work find(String workId, State state) { 322 if (isWorkInState(workId, state)) { 323 return getWork(bytes(workId)); 324 } 325 return null; 326 } 327 328 @Override 329 public boolean isWorkInState(String workId, State state) { 330 State s = getWorkState(workId); 331 if (state == null) { 332 return s == State.SCHEDULED || s == State.RUNNING; 333 } 334 return s == state; 335 } 336 337 @Override 338 public void removeScheduled(String queueId, String workId) { 339 try { 340 removeScheduledWork(queueId, workId); 341 } catch (IOException cause) { 342 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 343 } 344 } 345 346 @Override 347 public State getWorkState(String workId) { 348 return getWorkStateInfo(workId); 349 } 350 351 @Override 352 public void setActive(String queueId, boolean value) { 353 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 354 if (value) { 355 listener.queueActivated(metrics); 356 } else { 357 listener.queueDeactivated(metrics); 358 } 359 } 360 361 /* 362 * ******************** Redis Interface ******************** 363 */ 364 365 protected String string(byte[] bytes) { 366 try { 367 return new String(bytes, UTF_8); 368 } catch (IOException e) { 369 throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e); 370 } 371 } 372 373 protected byte[] bytes(String string) { 374 try { 375 return string.getBytes(UTF_8); 376 } catch (IOException e) { 377 throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e); 378 } 379 } 380 381 protected byte[] bytes(Work.State state) { 382 switch (state) { 383 case SCHEDULED: 384 return STATE_SCHEDULED; 385 case RUNNING: 386 return STATE_RUNNING; 387 default: 388 return STATE_UNKNOWN; 389 } 390 } 391 392 protected static String key(String... names) { 393 return String.join(":", names); 394 } 395 396 protected byte[] keyBytes(String prefix, String queueId) { 397 return keyBytes(key(prefix, queueId)); 398 } 399 400 protected byte[] keyBytes(String prefix) { 401 return bytes(redisNamespace.concat(prefix)); 402 } 403 404 protected byte[] workId(Work work) { 405 return workId(work.getId()); 406 } 407 408 protected byte[] workId(String id) { 409 return bytes(id); 410 } 411 412 protected byte[] suspendedKey(String queueId) { 413 return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId)); 414 } 415 416 protected byte[] queuedKey(String queueId) { 417 return keyBytes(key(KEY_QUEUE_PREFIX, queueId)); 418 } 419 420 protected byte[] countKey(String queueId) { 421 return keyBytes(key(KEY_COUNT_PREFIX, queueId)); 422 } 423 424 protected byte[] runningKey(String queueId) { 425 return keyBytes(key(KEY_RUNNING_PREFIX, queueId)); 426 } 427 428 protected byte[] scheduledKey(String queueId) { 429 return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId)); 430 } 431 432 protected byte[] completedKey(String queueId) { 433 return keyBytes(key(KEY_COMPLETED_PREFIX, queueId)); 434 } 435 436 protected byte[] canceledKey(String queueId) { 437 return keyBytes(key(KEY_CANCELED_PREFIX, queueId)); 438 } 439 440 protected byte[] stateKey() { 441 return keyBytes(KEY_STATE); 442 } 443 444 protected byte[] dataKey() { 445 return keyBytes(KEY_DATA); 446 } 447 448 protected byte[] serializeWork(Work work) throws IOException { 449 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 450 ObjectOutputStream out = new ObjectOutputStream(baout); 451 out.writeObject(work); 452 out.flush(); 453 out.close(); 454 return baout.toByteArray(); 455 } 456 457 protected Work deserializeWork(byte[] workBytes) { 458 if (workBytes == null) { 459 return null; 460 } 461 InputStream bain = new ByteArrayInputStream(workBytes); 462 try (ObjectInputStream in = new ObjectInputStream(bain)) { 463 return (Work) in.readObject(); 464 } catch (RuntimeException cause) { 465 throw cause; 466 } catch (IOException | ClassNotFoundException cause) { 467 throw new RuntimeException("Cannot deserialize work", cause); 468 } 469 } 470 471 /** 472 * Finds which queues have suspended work. 473 * 474 * @return a set of queue ids 475 * @since 5.8 476 */ 477 protected Set<String> getSuspendedQueueIds() throws IOException { 478 return getQueueIds(KEY_SUSPENDED_PREFIX); 479 } 480 481 protected Set<String> getScheduledQueueIds() { 482 return getQueueIds(KEY_QUEUE_PREFIX); 483 } 484 485 protected Set<String> getRunningQueueIds() { 486 return getQueueIds(KEY_RUNNING_PREFIX); 487 } 488 489 /** 490 * Finds which queues have work for a given state prefix. 491 * 492 * @return a set of queue ids 493 * @since 5.8 494 */ 495 protected Set<String> getQueueIds(final String queuePrefix) { 496 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() { 497 @Override 498 public Set<String> call(Jedis jedis) { 499 int offset = keyBytes(queuePrefix).length; 500 Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*"))); 501 Set<String> queueIds = new HashSet<String>(keys.size()); 502 for (byte[] bytes : keys) { 503 try { 504 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 505 queueIds.add(queueId); 506 } catch (IOException e) { 507 throw new NuxeoException(e); 508 } 509 } 510 return queueIds; 511 } 512 513 }); 514 } 515 516 /** 517 * Resumes all suspended work instances by moving them to the scheduled queue. 518 * 519 * @param queueId the queue id 520 * @return the number of work instances scheduled 521 * @since 5.8 522 */ 523 public int scheduleSuspendedWork(final String queueId) throws IOException { 524 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 525 @Override 526 public Integer call(Jedis jedis) { 527 for (int n = 0;; n++) { 528 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 529 if (workIdBytes == null) { 530 return Integer.valueOf(n); 531 } 532 } 533 } 534 535 }).intValue(); 536 } 537 538 /** 539 * Suspends all scheduled work instances by moving them to the suspended queue. 540 * 541 * @param queueId the queue id 542 * @return the number of work instances suspended 543 * @since 5.8 544 */ 545 public int suspendScheduledWork(final String queueId) throws IOException { 546 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 547 548 @Override 549 public Integer call(Jedis jedis) { 550 for (int n = 0;; n++) { 551 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 552 if (workIdBytes == null) { 553 return n; 554 } 555 } 556 } 557 }).intValue(); 558 } 559 560 @Override 561 public WorkQueueMetrics metrics(String queueId) { 562 return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList())); 563 } 564 565 WorkQueueMetrics metrics(String queueId, Number[] counters) { 566 return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]); 567 } 568 569 /** 570 * Persists a work instance and adds it to the scheduled queue. 571 * 572 * @param queueId the queue id 573 * @param work the work instance 574 * @throws IOException 575 */ 576 public void workSetScheduled(final String queueId, Work work) throws IOException { 577 listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true)))); 578 } 579 580 /** 581 * Switches a work to state completed, and saves its new state. 582 */ 583 protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException { 584 listener.queueChanged(work, 585 metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true)))); 586 } 587 588 /** 589 * Switches a work to state running. 590 * 591 * @param queueId the queue id 592 * @param work the work 593 */ 594 protected void workSetRunning(final String queueId, Work work) throws IOException { 595 listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true)))); 596 } 597 598 /** 599 * Switches a work to state completed, and saves its new state. 600 */ 601 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 602 listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false)))); 603 } 604 605 /** 606 * Switches a work to state canceled, and saves its new state. 607 */ 608 protected void workSetReschedule(final String queueId, final Work work) throws IOException { 609 listener.queueChanged(work, 610 metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true)))); 611 } 612 613 protected List<byte[]> keys(String queueid) { 614 return Arrays.asList(dataKey(), 615 stateKey(), 616 countKey(queueid), 617 scheduledKey(queueid), 618 queuedKey(queueid), 619 runningKey(queueid), 620 completedKey(queueid), 621 canceledKey(queueid)); 622 } 623 624 protected List<byte[]> args(String workId) throws IOException { 625 return Arrays.asList(workId(workId)); 626 } 627 628 protected List<byte[]> args(Work work, boolean serialize) throws IOException { 629 List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState())); 630 if (serialize) { 631 args = new ArrayList<>(args); 632 args.add(serializeWork(work)); 633 } 634 return args; 635 } 636 637 /** 638 * Gets the work state. 639 * 640 * @param workId the work id 641 * @return the state, or {@code null} if not found 642 */ 643 protected State getWorkStateInfo(final String workId) { 644 final byte[] workIdBytes = bytes(workId); 645 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() { 646 @Override 647 public State call(Jedis jedis) { 648 // get state 649 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 650 if (bytes == null || bytes.length == 0) { 651 return null; 652 } 653 switch (bytes[0]) { 654 case STATE_SCHEDULED_B: 655 return State.SCHEDULED; 656 case STATE_RUNNING_B: 657 return State.RUNNING; 658 default: 659 String msg; 660 try { 661 msg = new String(bytes, UTF_8); 662 } catch (UnsupportedEncodingException e) { 663 msg = Arrays.toString(bytes); 664 } 665 log.error("Unknown work state: " + msg + ", work: " + workId); 666 return null; 667 } 668 } 669 }); 670 } 671 672 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 673 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 674 675 @Override 676 public List<String> call(Jedis jedis) { 677 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 678 List<String> list = new ArrayList<String>(keys.size()); 679 for (byte[] workIdBytes : keys) { 680 list.add(string(workIdBytes)); 681 } 682 return list; 683 } 684 685 }); 686 } 687 688 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 689 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 690 691 @Override 692 public List<String> call(Jedis jedis) { 693 694 Set<byte[]> keys = jedis.smembers(queueBytes); 695 List<String> list = new ArrayList<String>(keys.size()); 696 for (byte[] workIdBytes : keys) { 697 list.add(string(workIdBytes)); 698 } 699 return list; 700 } 701 702 }); 703 704 } 705 706 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 707 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 708 @Override 709 public List<Work> call(Jedis jedis) { 710 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 711 List<Work> list = new ArrayList<Work>(keys.size()); 712 for (byte[] workIdBytes : keys) { 713 // get data 714 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 715 Work work = deserializeWork(workBytes); 716 list.add(work); 717 } 718 return list; 719 } 720 }); 721 } 722 723 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 724 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 725 @Override 726 public List<Work> call(Jedis jedis) { 727 Set<byte[]> keys = jedis.smembers(queueBytes); 728 List<Work> list = new ArrayList<Work>(keys.size()); 729 for (byte[] workIdBytes : keys) { 730 // get data 731 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 732 Work work = deserializeWork(workBytes); 733 list.add(work); 734 } 735 return list; 736 } 737 }); 738 } 739 740 protected Work getWork(byte[] workIdBytes) { 741 try { 742 return getWorkData(workIdBytes); 743 } catch (IOException e) { 744 throw new RuntimeException(e); 745 } 746 } 747 748 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 749 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() { 750 751 @Override 752 public Work call(Jedis jedis) { 753 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 754 return deserializeWork(workBytes); 755 } 756 757 }); 758 } 759 760 /** 761 * Removes first work from work queue. 762 * 763 * @param queueId the queue id 764 * @return the work, or {@code null} if the scheduled queue is empty 765 */ 766 protected Work getWorkFromQueue(final String queueId) throws IOException { 767 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 768 List<byte[]> keys = keys(queueId); 769 List<byte[]> args = Collections.singletonList(STATE_RUNNING); 770 Object id = redisExecutor.evalsha(popWorkSha, keys, args); 771 if (id == null) { 772 return null; 773 } 774 if (id instanceof String) { 775 id = bytes((String) id); 776 } 777 byte[] workIdBytes = (byte[]) id; 778 byte[] workBytes = redisExecutor.execute(jedis -> jedis.hget(dataKey(), workIdBytes)); 779 return deserializeWork(workBytes); 780 } 781 782 /** 783 * Removes a given work from queue, move the work from scheduled to completed set. 784 * 785 * @throws IOException 786 */ 787 protected void removeScheduledWork(final String queueId, final String workId) throws IOException { 788 evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId)); 789 } 790 791 /** 792 * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does 793 * not support SSCAN. 794 * 795 * @since 7.3 796 */ 797 public static class SScanner { 798 799 // results of SMEMBERS for last key, in embedded mode 800 protected List<String> smembers; 801 802 protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) { 803 ScanResult<String> scanResult; 804 try { 805 scanResult = jedis.sscan(key, cursor, params); 806 } catch (Exception e) { 807 // when testing with embedded fake redis, we may get an un-declared exception 808 if (!(e.getCause() instanceof NoSuchMethodException)) { 809 throw e; 810 } 811 // no SSCAN available in embedded, do a full SMEMBERS 812 if (smembers == null) { 813 Set<String> set = jedis.smembers(key); 814 smembers = new ArrayList<>(set); 815 } 816 Collection<byte[]> bparams = params.getParams(); 817 int count = 1000; 818 for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) { 819 byte[] param = it.next(); 820 if (param.equals(Protocol.Keyword.MATCH.raw)) { 821 throw new UnsupportedOperationException("MATCH not supported"); 822 } 823 if (param.equals(Protocol.Keyword.COUNT.raw)) { 824 count = Integer.parseInt(SafeEncoder.encode(it.next())); 825 } 826 } 827 int pos = Integer.parseInt(cursor); // don't check range, callers are cool 828 int end = Math.min(pos + count, smembers.size()); 829 int nextPos = end == smembers.size() ? 0 : end; 830 scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end)); 831 if (nextPos == 0) { 832 smembers = null; 833 } 834 } 835 return scanResult; 836 } 837 } 838 839 Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException { 840 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 841 List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args); 842 return coerceNullToZero(numbers); 843 } 844 845 protected static Number[] coerceNullToZero(List<Number> numbers) { 846 Number[] counters = numbers.toArray(new Number[numbers.size()]); 847 for (int i = 0; i < counters.length; ++i) { 848 if (counters[i] == null) { 849 counters[i] = 0; 850 } 851 } 852 return counters; 853 } 854 855 @Override 856 public void listen(Listener listener) { 857 this.listener = listener; 858 859 } 860 861}