001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.redis.RedisAdmin; 042import org.nuxeo.ecm.core.redis.RedisCallable; 043import org.nuxeo.ecm.core.redis.RedisExecutor; 044import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 045import org.nuxeo.ecm.core.work.WorkHolder; 046import org.nuxeo.ecm.core.work.WorkQueuing; 047import org.nuxeo.ecm.core.work.api.Work; 048import org.nuxeo.ecm.core.work.api.Work.State; 049import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 050import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 051import org.nuxeo.runtime.api.Framework; 052 053import redis.clients.jedis.Jedis; 054import redis.clients.jedis.Protocol; 055import redis.clients.jedis.ScanParams; 056import redis.clients.jedis.ScanResult; 057import redis.clients.jedis.exceptions.JedisException; 058import redis.clients.util.SafeEncoder; 059 060/** 061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 062 * 063 * @since 5.8 064 */ 065public class RedisWorkQueuing implements WorkQueuing { 066 067 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 068 069 protected static final String UTF_8 = "UTF-8"; 070 071 /** 072 * Global hash of Work instance id -> serialoized Work instance. 073 */ 074 protected static final String KEY_DATA = "data"; 075 076 /** 077 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 078 * a completion time in milliseconds. 079 */ 080 protected static final String KEY_STATE = "state"; 081 082 /** 083 * Per-queue list of suspended Work instance ids. 084 */ 085 protected static final String KEY_SUSPENDED_PREFIX = "prev"; 086 087 protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes(); 088 089 /** 090 * Per-queue list of scheduled Work instance ids. 091 */ 092 protected static final String KEY_QUEUE_PREFIX = "queue"; 093 094 protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes(); 095 096 /** 097 * Per-queue set of scheduled Work instance ids. 098 */ 099 protected static final String KEY_SCHEDULED_PREFIX = "sched"; 100 101 protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes(); 102 103 /** 104 * Per-queue set of running Work instance ids. 105 */ 106 protected static final String KEY_RUNNING_PREFIX = "run"; 107 108 protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes(); 109 110 /** 111 * Per-queue set of counters. 112 */ 113 protected static final String KEY_COMPLETED_PREFIX = "done"; 114 115 protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes(); 116 117 protected static final String KEY_CANCELED_PREFIX = "cancel"; 118 119 protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes(); 120 121 protected static final String KEY_COUNT_PREFIX = "count"; 122 123 protected static final byte STATE_SCHEDULED_B = 'Q'; 124 125 protected static final byte STATE_RUNNING_B = 'R'; 126 127 protected static final byte STATE_RUNNING_C = 'C'; 128 129 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 130 131 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 132 133 protected static final byte[] STATE_UNKNOWN = new byte[0]; 134 135 protected Listener listener; 136 137 protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>(); 138 139 protected String redisNamespace; 140 141 // lua scripts 142 protected byte[] initWorkQueueSha; 143 144 protected byte[] metricsWorkQueueSha; 145 146 protected byte[] schedulingWorkSha; 147 148 protected byte[] runningWorkSha; 149 150 protected byte[] cancelledScheduledWorkSha; 151 152 protected byte[] completedWorkSha; 153 154 protected byte[] cancelledRunningWorkSha; 155 156 public RedisWorkQueuing(Listener listener) { 157 this.listener = listener; 158 loadConfig(); 159 } 160 161 void loadConfig() { 162 RedisAdmin admin = Framework.getService(RedisAdmin.class); 163 redisNamespace = admin.namespace("work"); 164 try { 165 initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue") 166 .getBytes(); 167 metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue") 168 .getBytes(); 169 schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work") 170 .getBytes(); 171 runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work") 172 .getBytes(); 173 cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work") 174 .getBytes(); 175 completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work") 176 .getBytes(); 177 cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work") 178 .getBytes(); 179 } catch (IOException e) { 180 throw new RuntimeException("Cannot load LUA scripts", e); 181 } 182 } 183 184 @Override 185 public NuxeoBlockingQueue init(WorkQueueDescriptor config) { 186 evalSha(metricsWorkQueueSha, keys(config.id), Collections.emptyList()); 187 RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this); 188 allQueued.put(config.id, queue); 189 return queue; 190 } 191 192 @Override 193 public NuxeoBlockingQueue getQueue(String queueId) { 194 return allQueued.get(queueId); 195 } 196 197 198 @Override 199 public void workSchedule(String queueId, Work work) { 200 getQueue(queueId).offer(new WorkHolder(work)); 201 } 202 203 @Override 204 public void workRunning(String queueId, Work work) { 205 try { 206 workSetRunning(queueId, work); 207 } catch (IOException e) { 208 throw new RuntimeException(e); 209 } 210 } 211 212 @Override 213 public void workCanceled(String queueId, Work work) { 214 try { 215 workSetCancelledScheduled(queueId, work); 216 } catch (IOException e) { 217 throw new RuntimeException(e); 218 } 219 } 220 221 @Override 222 public void workCompleted(String queueId, Work work) { 223 try { 224 workSetCompleted(queueId, work); 225 } catch (IOException e) { 226 throw new RuntimeException(e); 227 } 228 } 229 230 @Override 231 public void workReschedule(String queueId, Work work) { 232 try { 233 workSetReschedule(queueId, work); 234 } catch (IOException e) { 235 throw new RuntimeException(e); 236 } 237 } 238 239 @Override 240 public List<Work> listWork(String queueId, State state) { 241 switch (state) { 242 case SCHEDULED: 243 return listScheduled(queueId); 244 case RUNNING: 245 return listRunning(queueId); 246 default: 247 throw new IllegalArgumentException(String.valueOf(state)); 248 } 249 } 250 251 @Override 252 public List<String> listWorkIds(String queueId, State state) { 253 if (state == null) { 254 return listNonCompletedIds(queueId); 255 } 256 switch (state) { 257 case SCHEDULED: 258 return listScheduledIds(queueId); 259 case RUNNING: 260 return listRunningIds(queueId); 261 default: 262 throw new IllegalArgumentException(String.valueOf(state)); 263 } 264 } 265 266 protected List<Work> listScheduled(String queueId) { 267 try { 268 return listWorkList(queuedKey(queueId)); 269 } catch (IOException e) { 270 throw new RuntimeException(e); 271 } 272 } 273 274 protected List<Work> listRunning(String queueId) { 275 try { 276 return listWorkSet(runningKey(queueId)); 277 } catch (IOException e) { 278 throw new RuntimeException(e); 279 } 280 } 281 282 protected List<String> listScheduledIds(String queueId) { 283 try { 284 return listWorkIdsList(queuedKey(queueId)); 285 } catch (IOException e) { 286 throw new RuntimeException(e); 287 } 288 } 289 290 protected List<String> listRunningIds(String queueId) { 291 try { 292 return listWorkIdsSet(runningKey(queueId)); 293 } catch (IOException e) { 294 throw new RuntimeException(e); 295 } 296 } 297 298 protected List<String> listNonCompletedIds(String queueId) { 299 List<String> list = listScheduledIds(queueId); 300 list.addAll(listRunningIds(queueId)); 301 return list; 302 } 303 304 @Override 305 public long count(String queueId, State state) { 306 switch (state) { 307 case SCHEDULED: 308 return metrics(queueId).scheduled.longValue(); 309 case RUNNING: 310 return metrics(queueId).running.longValue(); 311 default: 312 throw new IllegalArgumentException(String.valueOf(state)); 313 } 314 } 315 316 @Override 317 public Work find(String workId, State state) { 318 if (isWorkInState(workId, state)) { 319 return getWork(bytes(workId)); 320 } 321 return null; 322 } 323 324 @Override 325 public boolean isWorkInState(String workId, State state) { 326 State s = getWorkState(workId); 327 if (state == null) { 328 return s == State.SCHEDULED || s == State.RUNNING; 329 } 330 return s == state; 331 } 332 333 @Override 334 public void removeScheduled(String queueId, String workId) { 335 try { 336 removeScheduledWork(queueId, workId); 337 } catch (IOException cause) { 338 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 339 } 340 } 341 342 @Override 343 public State getWorkState(String workId) { 344 return getWorkStateInfo(workId); 345 } 346 347 @Override 348 public void setActive(String queueId, boolean value) { 349 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 350 if (value) { 351 listener.queueActivated(metrics); 352 } else { 353 listener.queueDeactivated(metrics); 354 } 355 } 356 357 @Override 358 public int setSuspending(String queueId) { 359 try { 360 int n = suspendScheduledWork(queueId); 361 log.info("Suspending " + n + " work instances from queue: " + queueId); 362 allQueued.remove(queueId); 363 return n; 364 } catch (IOException e) { 365 throw new RuntimeException(e); 366 } 367 } 368 369 /* 370 * ******************** Redis Interface ******************** 371 */ 372 373 protected String string(byte[] bytes) { 374 try { 375 return new String(bytes, UTF_8); 376 } catch (IOException e) { 377 throw new RuntimeException("Should not happen, cannot decode string in UTF-8", e); 378 } 379 } 380 381 protected byte[] bytes(String string) { 382 try { 383 return string.getBytes(UTF_8); 384 } catch (IOException e) { 385 throw new RuntimeException("Should not happen, cannot encode string in UTF-8", e); 386 } 387 } 388 389 protected byte[] bytes(Work.State state) { 390 switch (state) { 391 case SCHEDULED: 392 return STATE_SCHEDULED; 393 case RUNNING: 394 return STATE_RUNNING; 395 default: 396 return STATE_UNKNOWN; 397 } 398 } 399 400 protected static String key(String... names) { 401 return String.join(":", names); 402 } 403 404 protected byte[] keyBytes(String prefix, String queueId) { 405 return keyBytes(key(prefix, queueId)); 406 } 407 408 protected byte[] keyBytes(String prefix) { 409 return bytes(redisNamespace.concat(prefix)); 410 } 411 412 protected byte[] workId(Work work) { 413 return workId(work.getId()); 414 } 415 416 protected byte[] workId(String id) { 417 return bytes(id); 418 } 419 420 protected byte[] suspendedKey(String queueId) { 421 return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId)); 422 } 423 424 protected byte[] queuedKey(String queueId) { 425 return keyBytes(key(KEY_QUEUE_PREFIX, queueId)); 426 } 427 428 protected byte[] countKey(String queueId) { 429 return keyBytes(key(KEY_COUNT_PREFIX, queueId)); 430 } 431 432 protected byte[] runningKey(String queueId) { 433 return keyBytes(key(KEY_RUNNING_PREFIX, queueId)); 434 } 435 436 protected byte[] scheduledKey(String queueId) { 437 return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId)); 438 } 439 440 protected byte[] completedKey(String queueId) { 441 return keyBytes(key(KEY_COMPLETED_PREFIX, queueId)); 442 } 443 444 protected byte[] canceledKey(String queueId) { 445 return keyBytes(key(KEY_CANCELED_PREFIX, queueId)); 446 } 447 448 protected byte[] stateKey() { 449 return keyBytes(KEY_STATE); 450 } 451 452 protected byte[] dataKey() { 453 return keyBytes(KEY_DATA); 454 } 455 456 protected byte[] serializeWork(Work work) throws IOException { 457 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 458 ObjectOutputStream out = new ObjectOutputStream(baout); 459 out.writeObject(work); 460 out.flush(); 461 out.close(); 462 return baout.toByteArray(); 463 } 464 465 protected Work deserializeWork(byte[] workBytes) { 466 if (workBytes == null) { 467 return null; 468 } 469 InputStream bain = new ByteArrayInputStream(workBytes); 470 try (ObjectInputStream in = new ObjectInputStream(bain)) { 471 return (Work) in.readObject(); 472 } catch (RuntimeException cause) { 473 throw cause; 474 } catch (IOException | ClassNotFoundException cause) { 475 throw new RuntimeException("Cannot deserialize work", cause); 476 } 477 } 478 479 /** 480 * Finds which queues have suspended work. 481 * 482 * @return a set of queue ids 483 * @since 5.8 484 */ 485 protected Set<String> getSuspendedQueueIds() throws IOException { 486 return getQueueIds(KEY_SUSPENDED_PREFIX); 487 } 488 489 protected Set<String> getScheduledQueueIds() { 490 return getQueueIds(KEY_QUEUE_PREFIX); 491 } 492 493 protected Set<String> getRunningQueueIds() { 494 return getQueueIds(KEY_RUNNING_PREFIX); 495 } 496 497 /** 498 * Finds which queues have work for a given state prefix. 499 * 500 * @return a set of queue ids 501 * @since 5.8 502 */ 503 protected Set<String> getQueueIds(final String queuePrefix) { 504 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Set<String>>() { 505 @Override 506 public Set<String> call(Jedis jedis) { 507 int offset = keyBytes(queuePrefix).length; 508 Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*"))); 509 Set<String> queueIds = new HashSet<String>(keys.size()); 510 for (byte[] bytes : keys) { 511 try { 512 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 513 queueIds.add(queueId); 514 } catch (IOException e) { 515 throw new NuxeoException(e); 516 } 517 } 518 return queueIds; 519 } 520 521 }); 522 } 523 524 /** 525 * Resumes all suspended work instances by moving them to the scheduled queue. 526 * 527 * @param queueId the queue id 528 * @return the number of work instances scheduled 529 * @since 5.8 530 */ 531 public int scheduleSuspendedWork(final String queueId) throws IOException { 532 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 533 @Override 534 public Integer call(Jedis jedis) { 535 for (int n = 0;; n++) { 536 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 537 if (workIdBytes == null) { 538 return Integer.valueOf(n); 539 } 540 } 541 } 542 543 }).intValue(); 544 } 545 546 /** 547 * Suspends all scheduled work instances by moving them to the suspended queue. 548 * 549 * @param queueId the queue id 550 * @return the number of work instances suspended 551 * @since 5.8 552 */ 553 public int suspendScheduledWork(final String queueId) throws IOException { 554 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Integer>() { 555 556 @Override 557 public Integer call(Jedis jedis) { 558 for (int n = 0;; n++) { 559 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 560 if (workIdBytes == null) { 561 return n; 562 } 563 } 564 } 565 }).intValue(); 566 } 567 568 @Override 569 public WorkQueueMetrics metrics(String queueId) { 570 return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList())); 571 } 572 573 WorkQueueMetrics metrics(String queueId, Number[] counters) { 574 return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]); 575 } 576 577 /** 578 * Persists a work instance and adds it to the scheduled queue. 579 * 580 * @param queueId the queue id 581 * @param work the work instance 582 * @throws IOException 583 */ 584 public void workSetScheduled(final String queueId, Work work) throws IOException { 585 listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true)))); 586 } 587 588 /** 589 * Switches a work to state completed, and saves its new state. 590 */ 591 protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException { 592 listener.queueChanged(work, 593 metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true)))); 594 } 595 596 /** 597 * Switches a work to state running. 598 * 599 * @param queueId the queue id 600 * @param work the work 601 */ 602 protected void workSetRunning(final String queueId, Work work) throws IOException { 603 listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true)))); 604 } 605 606 /** 607 * Switches a work to state completed, and saves its new state. 608 */ 609 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 610 listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false)))); 611 } 612 613 /** 614 * Switches a work to state canceled, and saves its new state. 615 */ 616 protected void workSetReschedule(final String queueId, final Work work) throws IOException { 617 listener.queueChanged(work, 618 metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true)))); 619 } 620 621 protected List<byte[]> keys(String queueid) { 622 return Arrays.asList(dataKey(), 623 stateKey(), 624 countKey(queueid), 625 scheduledKey(queueid), 626 queuedKey(queueid), 627 runningKey(queueid), 628 completedKey(queueid), 629 canceledKey(queueid)); 630 } 631 632 protected List<byte[]> args(String workId) throws IOException { 633 return Arrays.asList(workId(workId)); 634 } 635 636 protected List<byte[]> args(Work work, boolean serialize) throws IOException { 637 List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState())); 638 if (serialize) { 639 args = new ArrayList<>(args); 640 args.add(serializeWork(work)); 641 } 642 return args; 643 } 644 645 /** 646 * Gets the work state. 647 * 648 * @param workId the work id 649 * @return the state, or {@code null} if not found 650 */ 651 protected State getWorkStateInfo(final String workId) { 652 final byte[] workIdBytes = bytes(workId); 653 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<State>() { 654 @Override 655 public State call(Jedis jedis) { 656 // get state 657 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 658 if (bytes == null || bytes.length == 0) { 659 return null; 660 } 661 switch (bytes[0]) { 662 case STATE_SCHEDULED_B: 663 return State.SCHEDULED; 664 case STATE_RUNNING_B: 665 return State.RUNNING; 666 default: 667 String msg; 668 try { 669 msg = new String(bytes, UTF_8); 670 } catch (UnsupportedEncodingException e) { 671 msg = Arrays.toString(bytes); 672 } 673 log.error("Unknown work state: " + msg + ", work: " + workId); 674 return null; 675 } 676 } 677 }); 678 } 679 680 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 681 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 682 683 @Override 684 public List<String> call(Jedis jedis) { 685 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 686 List<String> list = new ArrayList<String>(keys.size()); 687 for (byte[] workIdBytes : keys) { 688 list.add(string(workIdBytes)); 689 } 690 return list; 691 } 692 693 }); 694 } 695 696 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 697 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<String>>() { 698 699 @Override 700 public List<String> call(Jedis jedis) { 701 702 Set<byte[]> keys = jedis.smembers(queueBytes); 703 List<String> list = new ArrayList<String>(keys.size()); 704 for (byte[] workIdBytes : keys) { 705 list.add(string(workIdBytes)); 706 } 707 return list; 708 } 709 710 }); 711 712 } 713 714 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 715 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 716 @Override 717 public List<Work> call(Jedis jedis) { 718 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 719 List<Work> list = new ArrayList<Work>(keys.size()); 720 for (byte[] workIdBytes : keys) { 721 // get data 722 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 723 Work work = deserializeWork(workBytes); 724 list.add(work); 725 } 726 return list; 727 } 728 }); 729 } 730 731 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 732 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<List<Work>>() { 733 @Override 734 public List<Work> call(Jedis jedis) { 735 Set<byte[]> keys = jedis.smembers(queueBytes); 736 List<Work> list = new ArrayList<Work>(keys.size()); 737 for (byte[] workIdBytes : keys) { 738 // get data 739 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 740 Work work = deserializeWork(workBytes); 741 list.add(work); 742 } 743 return list; 744 } 745 }); 746 } 747 748 protected Work getWork(byte[] workIdBytes) { 749 try { 750 return getWorkData(workIdBytes); 751 } catch (IOException e) { 752 throw new RuntimeException(e); 753 } 754 } 755 756 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 757 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() { 758 759 @Override 760 public Work call(Jedis jedis) { 761 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 762 return deserializeWork(workBytes); 763 } 764 765 }); 766 } 767 768 /** 769 * Removes first work from work queue. 770 * 771 * @param queueId the queue id 772 * @return the work, or {@code null} if the scheduled queue is empty 773 */ 774 protected Work getWorkFromQueue(final String queueId) throws IOException { 775 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Work>() { 776 777 @Override 778 public Work call(Jedis jedis) { 779 // pop from queue 780 byte[] workIdBytes = jedis.rpop(queuedKey(queueId)); 781 if (workIdBytes == null) { 782 return null; 783 } 784 // get data 785 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 786 return deserializeWork(workBytes); 787 } 788 789 }); 790 } 791 792 /** 793 * Removes a given work from queue, move the work from scheduled to completed set. 794 * 795 * @throws IOException 796 */ 797 protected void removeScheduledWork(final String queueId, final String workId) throws IOException { 798 evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId)); 799 } 800 801 /** 802 * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does 803 * not support SSCAN. 804 * 805 * @since 7.3 806 */ 807 public static class SScanner { 808 809 // results of SMEMBERS for last key, in embedded mode 810 protected List<String> smembers; 811 812 protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) { 813 ScanResult<String> scanResult; 814 try { 815 scanResult = jedis.sscan(key, cursor, params); 816 } catch (Exception e) { 817 // when testing with embedded fake redis, we may get an un-declared exception 818 if (!(e.getCause() instanceof NoSuchMethodException)) { 819 throw e; 820 } 821 // no SSCAN available in embedded, do a full SMEMBERS 822 if (smembers == null) { 823 Set<String> set = jedis.smembers(key); 824 smembers = new ArrayList<>(set); 825 } 826 Collection<byte[]> bparams = params.getParams(); 827 int count = 1000; 828 for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) { 829 byte[] param = it.next(); 830 if (param.equals(Protocol.Keyword.MATCH.raw)) { 831 throw new UnsupportedOperationException("MATCH not supported"); 832 } 833 if (param.equals(Protocol.Keyword.COUNT.raw)) { 834 count = Integer.parseInt(SafeEncoder.encode(it.next())); 835 } 836 } 837 int pos = Integer.parseInt(cursor); // don't check range, callers are cool 838 int end = Math.min(pos + count, smembers.size()); 839 int nextPos = end == smembers.size() ? 0 : end; 840 scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end)); 841 if (nextPos == 0) { 842 smembers = null; 843 } 844 } 845 return scanResult; 846 } 847 } 848 849 Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException { 850 return Framework.getService(RedisExecutor.class).execute(new RedisCallable<Number[]>() { 851 852 @Override 853 public Number[] call(Jedis jedis) { 854 return coerce((List<Number>) jedis.evalsha(sha, keys, args)); 855 } 856 857 Number[] coerce(List<Number> numbers) { 858 Number[] counter = numbers.toArray(new Number[numbers.size()]); 859 for (int i = 0; i < counter.length; ++i) { 860 if (counter[i] == null) { 861 counter[i] = 0; 862 } 863 } 864 return counter; 865 } 866 }); 867 } 868 869 @Override 870 public void listen(Listener listener) { 871 this.listener =listener; 872 873 } 874 875}