001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.redis.RedisAdmin; 043import org.nuxeo.ecm.core.redis.RedisCallable; 044import org.nuxeo.ecm.core.redis.RedisExecutor; 045import org.nuxeo.ecm.core.work.WorkHolder; 046import org.nuxeo.ecm.core.work.WorkManagerImpl; 047import org.nuxeo.ecm.core.work.WorkQueueDescriptorRegistry; 048import org.nuxeo.ecm.core.work.WorkQueuing; 049import org.nuxeo.ecm.core.work.api.Work; 050import org.nuxeo.ecm.core.work.api.Work.State; 051import org.nuxeo.runtime.api.Framework; 052 053import redis.clients.jedis.Jedis; 054import redis.clients.jedis.Pipeline; 055import redis.clients.jedis.Protocol; 056import redis.clients.jedis.ScanParams; 057import redis.clients.jedis.ScanResult; 058import redis.clients.util.SafeEncoder; 059 060/** 061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 062 * 063 * @since 5.8 064 */ 065public class RedisWorkQueuing implements WorkQueuing { 066 067 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 068 069 protected static final String UTF_8 = "UTF-8"; 070 071 /** 072 * Global hash of Work instance id -> serialized Work instance. 073 */ 074 protected static final String KEY_DATA = "data"; 075 076 /** 077 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 078 * a completion time in milliseconds. 079 */ 080 protected static final String KEY_STATE = "state"; 081 082 /** 083 * Per-queue list of suspended Work instance ids. 084 */ 085 protected static final String KEY_SUSPENDED_PREFIX = "prev:"; 086 087 /** 088 * Per-queue list of scheduled Work instance ids. 089 */ 090 protected static final String KEY_QUEUE_PREFIX = "queue:"; 091 092 /** 093 * Per-queue set of scheduled Work instance ids. 094 */ 095 protected static final String KEY_SCHEDULED_PREFIX = "sched:"; 096 097 /** 098 * Per-queue set of running Work instance ids. 099 */ 100 protected static final String KEY_RUNNING_PREFIX = "run:"; 101 102 /** 103 * Per-queue set of completed Work instance ids. 104 */ 105 protected static final String KEY_COMPLETED_PREFIX = "done:"; 106 107 protected static final byte STATE_SCHEDULED_B = 'Q'; 108 109 protected static final byte STATE_CANCELED_B = 'X'; 110 111 protected static final byte STATE_RUNNING_B = 'R'; 112 113 protected static final byte STATE_COMPLETED_B = 'C'; 114 115 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 116 117 protected static final byte[] STATE_CANCELED = new byte[] { STATE_CANCELED_B }; 118 119 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 120 121 protected static final byte[] STATE_COMPLETED = new byte[] { STATE_COMPLETED_B }; 122 123 protected final WorkManagerImpl mgr; 124 125 // @GuardedBy("this") 126 protected Map<String, BlockingQueue<Runnable>> allQueued = new HashMap<String, BlockingQueue<Runnable>>(); 127 128 protected RedisExecutor redisExecutor; 129 130 protected RedisAdmin redisAdmin; 131 132 protected String redisNamespace; 133 134 protected String delCompletedSha; 135 136 public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) { 137 this.mgr = mgr; 138 } 139 140 @Override 141 public void init() { 142 redisExecutor = Framework.getLocalService(RedisExecutor.class); 143 redisAdmin = Framework.getService(RedisAdmin.class); 144 redisNamespace = redisAdmin.namespace("work"); 145 try { 146 for (String queueId : getSuspendedQueueIds()) { 147 int n = scheduleSuspendedWork(queueId); 148 log.info("Re-scheduling " + n + " work instances suspended from queue: " + queueId); 149 } 150 } catch (IOException e) { 151 throw new RuntimeException(e); 152 } 153 try { 154 delCompletedSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "del-completed"); 155 } catch (IOException e) { 156 throw new RuntimeException(e); 157 } 158 } 159 160 @Override 161 public BlockingQueue<Runnable> initWorkQueue(String queueId) { 162 if (allQueued.containsKey(queueId)) { 163 throw new IllegalStateException(queueId + " is already configured"); 164 } 165 final BlockingQueue<Runnable> queue = newBlockingQueue(queueId); 166 allQueued.put(queueId, queue); 167 return queue; 168 } 169 170 @Override 171 public boolean workSchedule(String queueId, Work work) { 172 return getWorkQueue(queueId).offer(new WorkHolder(work)); 173 } 174 175 public BlockingQueue<Runnable> getWorkQueue(String queueId) { 176 if (!allQueued.containsKey(queueId)) { 177 throw new IllegalStateException(queueId + " was not configured yet"); 178 } 179 return allQueued.get(queueId); 180 } 181 182 183 @Override 184 public void workRunning(String queueId, Work work) { 185 try { 186 workSetRunning(queueId, work); 187 } catch (IOException e) { 188 throw new RuntimeException(e); 189 } 190 } 191 192 @Override 193 public void workCompleted(String queueId, Work work) { 194 try { 195 workSetCompleted(queueId, work); 196 } catch (IOException e) { 197 throw new RuntimeException(e); 198 } 199 } 200 201 protected BlockingQueue<Runnable> newBlockingQueue(String queueId) { 202 return new RedisBlockingQueue(queueId, this); 203 } 204 205 @Override 206 public List<Work> listWork(String queueId, State state) { 207 switch (state) { 208 case SCHEDULED: 209 return listScheduled(queueId); 210 case RUNNING: 211 return listRunning(queueId); 212 case COMPLETED: 213 return listCompleted(queueId); 214 default: 215 throw new IllegalArgumentException(String.valueOf(state)); 216 } 217 } 218 219 @Override 220 public List<String> listWorkIds(String queueId, State state) { 221 if (state == null) { 222 return listNonCompletedIds(queueId); 223 } 224 switch (state) { 225 case SCHEDULED: 226 return listScheduledIds(queueId); 227 case RUNNING: 228 return listRunningIds(queueId); 229 case COMPLETED: 230 return listCompletedIds(queueId); 231 default: 232 throw new IllegalArgumentException(String.valueOf(state)); 233 } 234 } 235 236 protected List<Work> listScheduled(String queueId) { 237 try { 238 return listWorkList(queuedKey(queueId)); 239 } catch (IOException e) { 240 throw new RuntimeException(e); 241 } 242 } 243 244 protected List<Work> listRunning(String queueId) { 245 try { 246 return listWorkSet(runningKey(queueId)); 247 } catch (IOException e) { 248 throw new RuntimeException(e); 249 } 250 } 251 252 protected List<Work> listCompleted(String queueId) { 253 try { 254 return listWorkSet(completedKey(queueId)); 255 } catch (IOException e) { 256 throw new RuntimeException(e); 257 } 258 } 259 260 protected List<String> listScheduledIds(String queueId) { 261 try { 262 return listWorkIdsList(queuedKey(queueId)); 263 } catch (IOException e) { 264 throw new RuntimeException(e); 265 } 266 } 267 268 protected List<String> listRunningIds(String queueId) { 269 try { 270 return listWorkIdsSet(runningKey(queueId)); 271 } catch (IOException e) { 272 throw new RuntimeException(e); 273 } 274 } 275 276 protected List<String> listNonCompletedIds(String queueId) { 277 List<String> list = listScheduledIds(queueId); 278 list.addAll(listRunningIds(queueId)); 279 return list; 280 } 281 282 protected List<String> listCompletedIds(String queueId) { 283 try { 284 return listWorkIdsSet(completedKey(queueId)); 285 } catch (IOException e) { 286 throw new RuntimeException(e); 287 } 288 } 289 290 @Override 291 public int count(String queueId, State state) { 292 switch (state) { 293 case SCHEDULED: 294 return getScheduledSize(queueId); 295 case RUNNING: 296 return getRunningSize(queueId); 297 case COMPLETED: 298 return getCompletedSize(queueId); 299 default: 300 throw new IllegalArgumentException(String.valueOf(state)); 301 } 302 } 303 304 protected int getScheduledSize(String queueId) { 305 try { 306 return getRedisSetSize(scheduledKey(queueId)); 307 } catch (IOException e) { 308 throw new RuntimeException(e); 309 } 310 } 311 312 protected int getRunningSize(String queueId) { 313 try { 314 return getRedisSetSize(runningKey(queueId)); 315 } catch (IOException e) { 316 throw new RuntimeException(e); 317 } 318 } 319 320 protected int getCompletedSize(String queueId) { 321 try { 322 return getRedisSetSize(completedKey(queueId)); 323 } catch (IOException e) { 324 throw new RuntimeException(e); 325 } 326 } 327 328 @Override 329 public Work find(String workId, State state) { 330 if (isWorkInState(workId, state)) { 331 return getWork(bytes(workId)); 332 } 333 return null; 334 } 335 336 @Override 337 public boolean isWorkInState(String workId, State state) { 338 State s = getWorkState(workId); 339 if (state == null) { 340 return s == State.SCHEDULED || s == State.RUNNING; 341 } 342 return s == state; 343 } 344 345 @Override 346 public Work removeScheduled(String queueId, String workId) { 347 try { 348 return removeScheduledWork(queueId, workId); 349 } catch (IOException cause) { 350 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 351 } 352 } 353 354 @Override 355 public State getWorkState(String workId) { 356 return getWorkStateInfo(workId); 357 } 358 359 @Override 360 public int setSuspending(String queueId) { 361 try { 362 int n = suspendScheduledWork(queueId); 363 log.info("Suspending " + n + " work instances from queue: " + queueId); 364 allQueued.remove(queueId); 365 return n; 366 } catch (IOException e) { 367 throw new RuntimeException(e); 368 } 369 } 370 371 @Override 372 public void clearCompletedWork(String queueId, long completionTime) { 373 try { 374 if (completionTime <= 0) { 375 removeAllCompletedWork(queueId); 376 } else { 377 removeCompletedWork(queueId, completionTime); 378 } 379 } catch (IOException e) { 380 throw new RuntimeException(e); 381 } 382 } 383 384 /* 385 * ******************** Redis Interface ******************** 386 */ 387 388 protected String string(byte[] bytes) { 389 try { 390 return new String(bytes, UTF_8); 391 } catch (IOException e) { 392 throw new RuntimeException(e); 393 } 394 } 395 396 protected byte[] bytes(String string) { 397 try { 398 return string.getBytes(UTF_8); 399 } catch (IOException e) { 400 // cannot happen for UTF-8 401 throw new RuntimeException(e); 402 } 403 } 404 405 protected byte[] keyBytes(String prefix, String queueId) { 406 return keyBytes(prefix + queueId); 407 } 408 409 protected byte[] keyBytes(String prefix) { 410 return bytes(redisNamespace + prefix); 411 } 412 413 protected byte[] suspendedKey(String queueId) { 414 return keyBytes(KEY_SUSPENDED_PREFIX, queueId); 415 } 416 417 protected byte[] queuedKey(String queueId) { 418 return keyBytes(KEY_QUEUE_PREFIX, queueId); 419 } 420 421 protected byte[] runningKey(String queueId) { 422 return keyBytes(KEY_RUNNING_PREFIX, queueId); 423 } 424 425 protected byte[] scheduledKey(String queueId) { 426 return keyBytes(KEY_SCHEDULED_PREFIX, queueId); 427 } 428 429 protected byte[] completedKey(String queueId) { 430 return keyBytes(KEY_COMPLETED_PREFIX, queueId); 431 } 432 433 protected String completedKeyString(String queueId) { 434 return redisNamespace + KEY_COMPLETED_PREFIX + queueId; 435 } 436 437 protected byte[] stateKey() { 438 return keyBytes(KEY_STATE); 439 } 440 441 protected String stateKeyString() { 442 return redisNamespace + KEY_STATE; 443 } 444 445 protected byte[] dataKey() { 446 return keyBytes(KEY_DATA); 447 } 448 449 protected String dataKeyString() { 450 return redisNamespace + KEY_DATA; 451 } 452 453 protected byte[] serializeWork(Work work) throws IOException { 454 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 455 ObjectOutputStream out = new ObjectOutputStream(baout); 456 out.writeObject(work); 457 out.flush(); 458 out.close(); 459 return baout.toByteArray(); 460 } 461 462 protected Work deserializeWork(byte[] workBytes) { 463 if (workBytes == null) { 464 return null; 465 } 466 InputStream bain = new ByteArrayInputStream(workBytes); 467 try (ObjectInputStream in = new ObjectInputStream(bain)) { 468 return (Work) in.readObject(); 469 } catch (RuntimeException cause) { 470 throw cause; 471 } catch (IOException | ClassNotFoundException cause) { 472 throw new RuntimeException("Cannot deserialize work", cause); 473 } 474 } 475 476 protected int getRedisListSize(final byte[] key) throws IOException { 477 return redisExecutor.execute(new RedisCallable<Long>() { 478 479 @Override 480 public Long call(Jedis jedis) { 481 return jedis.llen(key); 482 } 483 484 }).intValue(); 485 } 486 487 protected int getRedisSetSize(final byte[] key) throws IOException { 488 return redisExecutor.execute(new RedisCallable<Long>() { 489 490 @Override 491 public Long call(Jedis jedis) { 492 return jedis.scard(key); 493 } 494 495 }).intValue(); 496 } 497 498 499 /** 500 * Persists a work instance and adds it to the scheduled queue. 501 * 502 * @param queueId the queue id 503 * @param work the work instance 504 * @throws IOException 505 */ 506 public void addScheduledWork(final String queueId, Work work) throws IOException { 507 log.debug("Add scheduled " + work); 508 final byte[] workIdBytes = bytes(work.getId()); 509 510 // serialize Work 511 final byte[] workBytes = serializeWork(work); 512 513 redisExecutor.execute(new RedisCallable<Void>() { 514 515 @Override 516 public Void call(Jedis jedis) { 517 if (redisExecutor.supportPipelined()) { 518 Pipeline pipe = jedis.pipelined(); 519 pipe.hset(dataKey(), workIdBytes, workBytes); 520 pipe.hset(stateKey(), workIdBytes, STATE_SCHEDULED); 521 pipe.lpush(queuedKey(queueId), workIdBytes); 522 pipe.sadd(scheduledKey(queueId), workIdBytes); 523 pipe.sync(); 524 } else { 525 jedis.hset(dataKey(), workIdBytes, workBytes); 526 jedis.hset(stateKey(), workIdBytes, STATE_SCHEDULED); 527 jedis.lpush(queuedKey(queueId), workIdBytes); 528 jedis.sadd(scheduledKey(queueId), workIdBytes); 529 } 530 return null; 531 } 532 533 }); 534 } 535 536 /** 537 * Finds which queues have suspended work. 538 * 539 * @return a set of queue ids 540 * @since 5.8 541 */ 542 protected Set<String> getSuspendedQueueIds() throws IOException { 543 return getQueueIds(KEY_SUSPENDED_PREFIX); 544 } 545 546 protected Set<String> getScheduledQueueIds() { 547 return getQueueIds(KEY_QUEUE_PREFIX); 548 } 549 550 protected Set<String> getRunningQueueIds() { 551 return getQueueIds(KEY_RUNNING_PREFIX); 552 } 553 554 @Override 555 public Set<String> getCompletedQueueIds() { 556 return getQueueIds(KEY_COMPLETED_PREFIX); 557 } 558 559 /** 560 * Finds which queues have work for a given state prefix. 561 * 562 * @return a set of queue ids 563 * @since 5.8 564 */ 565 protected Set<String> getQueueIds(final String queuePrefix) { 566 return redisExecutor.execute(new RedisCallable<Set<String>>() { 567 @Override 568 public Set<String> call(Jedis jedis) { 569 int offset = keyBytes(queuePrefix).length; 570 Set<byte[]> keys = jedis.keys(keyBytes(queuePrefix, "*")); 571 Set<String> queueIds = new HashSet<String>(keys.size()); 572 for (byte[] bytes : keys) { 573 try { 574 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 575 queueIds.add(queueId); 576 } catch (IOException e) { 577 throw new NuxeoException(e); 578 } 579 } 580 return queueIds; 581 } 582 583 }); 584 } 585 586 /** 587 * Resumes all suspended work instances by moving them to the scheduled queue. 588 * 589 * @param queueId the queue id 590 * @return the number of work instances scheduled 591 * @since 5.8 592 */ 593 public int scheduleSuspendedWork(final String queueId) throws IOException { 594 return redisExecutor.execute(new RedisCallable<Integer>() { 595 @Override 596 public Integer call(Jedis jedis) { 597 for (int n = 0;; n++) { 598 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 599 if (workIdBytes == null) { 600 return Integer.valueOf(n); 601 } 602 } 603 } 604 605 }).intValue(); 606 } 607 608 /** 609 * Suspends all scheduled work instances by moving them to the suspended queue. 610 * 611 * @param queueId the queue id 612 * @return the number of work instances suspended 613 * @since 5.8 614 */ 615 public int suspendScheduledWork(final String queueId) throws IOException { 616 return redisExecutor.execute(new RedisCallable<Integer>() { 617 618 @Override 619 public Integer call(Jedis jedis) { 620 for (int n = 0;; n++) { 621 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 622 if (workIdBytes == null) { 623 return n; 624 } 625 } 626 } 627 }).intValue(); 628 } 629 630 /** 631 * Switches a work to state running. 632 * 633 * @param queueId the queue id 634 * @param work the work 635 */ 636 protected void workSetRunning(final String queueId, Work work) throws IOException { 637 final byte[] workIdBytes = bytes(work.getId()); 638 redisExecutor.execute(new RedisCallable<Void>() { 639 640 @Override 641 public Void call(Jedis jedis) { 642 if (redisExecutor.supportPipelined()) { 643 Pipeline pipe = jedis.pipelined(); 644 pipe.sadd(runningKey(queueId), workIdBytes); 645 pipe.srem(scheduledKey(queueId), workIdBytes); 646 pipe.hset(stateKey(), workIdBytes, STATE_RUNNING); 647 pipe.sync(); 648 } else { 649 jedis.sadd(runningKey(queueId), workIdBytes); 650 jedis.srem(scheduledKey(queueId), workIdBytes); 651 jedis.hset(stateKey(), workIdBytes, STATE_RUNNING); 652 } 653 return null; 654 } 655 }); 656 } 657 658 /** 659 * Switches a work to state completed, and saves its new state. 660 */ 661 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 662 final byte[] workIdBytes = bytes(work.getId()); 663 final byte[] workBytes = serializeWork(work); 664 redisExecutor.execute(new RedisCallable<Void>() { 665 666 @Override 667 public Void call(Jedis jedis) { 668 byte[] completedBytes = bytes(((char) STATE_COMPLETED_B) + String.valueOf(work.getCompletionTime())); 669 if (redisExecutor.supportPipelined()) { 670 Pipeline pipe = jedis.pipelined(); 671 // store (updated) content in hash 672 pipe.hset(dataKey(), workIdBytes, workBytes); 673 // remove key from running set 674 pipe.srem(runningKey(queueId), workIdBytes); 675 // put key in completed set 676 pipe.sadd(completedKey(queueId), workIdBytes); 677 // set state to completed 678 pipe.hset(stateKey(), workIdBytes, completedBytes); 679 pipe.sync(); 680 } else { 681 jedis.hset(dataKey(), workIdBytes, workBytes); 682 jedis.srem(runningKey(queueId), workIdBytes); 683 jedis.sadd(completedKey(queueId), workIdBytes); 684 jedis.hset(stateKey(), workIdBytes, completedBytes); 685 } 686 return null; 687 } 688 }); 689 } 690 691 /** 692 * Gets the work state. 693 * 694 * @param workId the work id 695 * @return the state, or {@code null} if not found 696 */ 697 protected State getWorkStateInfo(final String workId) { 698 final byte[] workIdBytes = bytes(workId); 699 return redisExecutor.execute(new RedisCallable<State>() { 700 @Override 701 public State call(Jedis jedis) { 702 // get state 703 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 704 if (bytes == null || bytes.length == 0) { 705 return null; 706 } 707 switch (bytes[0]) { 708 case STATE_SCHEDULED_B: 709 return State.SCHEDULED; 710 case STATE_CANCELED_B: 711 return State.CANCELED; 712 case STATE_RUNNING_B: 713 return State.RUNNING; 714 case STATE_COMPLETED_B: 715 return State.COMPLETED; 716 default: 717 String msg; 718 try { 719 msg = new String(bytes, UTF_8); 720 } catch (UnsupportedEncodingException e) { 721 msg = Arrays.toString(bytes); 722 } 723 log.error("Unknown work state: " + msg + ", work: " + workId); 724 return null; 725 } 726 } 727 }); 728 } 729 730 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 731 return redisExecutor.execute(new RedisCallable<List<String>>() { 732 733 @Override 734 public List<String> call(Jedis jedis) { 735 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 736 List<String> list = new ArrayList<String>(keys.size()); 737 for (byte[] workIdBytes : keys) { 738 list.add(string(workIdBytes)); 739 } 740 return list; 741 } 742 743 }); 744 } 745 746 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 747 return redisExecutor.execute(new RedisCallable<List<String>>() { 748 749 @Override 750 public List<String> call(Jedis jedis) { 751 752 Set<byte[]> keys = jedis.smembers(queueBytes); 753 List<String> list = new ArrayList<String>(keys.size()); 754 for (byte[] workIdBytes : keys) { 755 list.add(string(workIdBytes)); 756 } 757 return list; 758 } 759 760 }); 761 762 } 763 764 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 765 return redisExecutor.execute(new RedisCallable<List<Work>>() { 766 @Override 767 public List<Work> call(Jedis jedis) { 768 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 769 List<Work> list = new ArrayList<Work>(keys.size()); 770 for (byte[] workIdBytes : keys) { 771 // get data 772 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 773 Work work = deserializeWork(workBytes); 774 list.add(work); 775 } 776 return list; 777 } 778 }); 779 } 780 781 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 782 return redisExecutor.execute(new RedisCallable<List<Work>>() { 783 @Override 784 public List<Work> call(Jedis jedis) { 785 Set<byte[]> keys = jedis.smembers(queueBytes); 786 List<Work> list = new ArrayList<Work>(keys.size()); 787 for (byte[] workIdBytes : keys) { 788 // get data 789 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 790 Work work = deserializeWork(workBytes); 791 list.add(work); 792 } 793 return list; 794 } 795 }); 796 } 797 798 protected Work getWork(byte[] workIdBytes) { 799 try { 800 return getWorkData(workIdBytes); 801 } catch (IOException e) { 802 throw new RuntimeException(e); 803 } 804 } 805 806 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 807 return redisExecutor.execute(new RedisCallable<Work>() { 808 809 @Override 810 public Work call(Jedis jedis) { 811 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 812 return deserializeWork(workBytes); 813 } 814 815 }); 816 } 817 818 /** 819 * Removes first work from work queue. 820 * 821 * @param queueId the queue id 822 * @return the work, or {@code null} if the scheduled queue is empty 823 */ 824 protected Work getWorkFromQueue(final String queueId) throws IOException { 825 return redisExecutor.execute(new RedisCallable<Work>() { 826 827 @Override 828 public Work call(Jedis jedis) { 829 // pop from queue 830 byte[] workIdBytes = jedis.rpop(queuedKey(queueId)); 831 if (workIdBytes == null) { 832 return null; 833 } 834 // get data 835 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 836 return deserializeWork(workBytes); 837 } 838 839 }); 840 } 841 842 /** 843 * Removes a given work from queue, move the work from scheduled to completed set. 844 * 845 * @throws IOException 846 */ 847 protected Work removeScheduledWork(final String queueId, final String workId) throws IOException { 848 final byte[] workIdBytes = bytes(workId); 849 return redisExecutor.execute(new RedisCallable<Work>() { 850 851 @Override 852 public Work call(Jedis jedis) { 853 // remove from queue 854 Long n = jedis.lrem(queuedKey(queueId), 0, workIdBytes); 855 if (n == null || n.intValue() == 0) { 856 return null; 857 } 858 // remove from set 859 jedis.srem(scheduledKey(queueId), workIdBytes); 860 // set state to completed at current time 861 byte[] completedBytes = bytes(String.valueOf(System.currentTimeMillis())); 862 jedis.hset(stateKey(), workIdBytes, completedBytes); 863 // get data 864 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 865 return deserializeWork(workBytes); 866 } 867 868 }); 869 } 870 871 /** 872 * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does 873 * not support SSCAN. 874 * 875 * @since 7.3 876 */ 877 public static class SScanner { 878 879 // results of SMEMBERS for last key, in embedded mode 880 protected List<String> smembers; 881 882 protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) { 883 ScanResult<String> scanResult; 884 try { 885 scanResult = jedis.sscan(key, cursor, params); 886 } catch (Exception e) { 887 // when testing with embedded fake redis, we may get an un-declared exception 888 if (!(e.getCause() instanceof NoSuchMethodException)) { 889 throw e; 890 } 891 // no SSCAN available in embedded, do a full SMEMBERS 892 if (smembers == null) { 893 Set<String> set = jedis.smembers(key); 894 smembers = new ArrayList<>(set); 895 } 896 Collection<byte[]> bparams = params.getParams(); 897 int count = 1000; 898 for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) { 899 byte[] param = it.next(); 900 if (param.equals(Protocol.Keyword.MATCH.raw)) { 901 throw new UnsupportedOperationException("MATCH not supported"); 902 } 903 if (param.equals(Protocol.Keyword.COUNT.raw)) { 904 count = Integer.parseInt(SafeEncoder.encode(it.next())); 905 } 906 } 907 int pos = Integer.parseInt(cursor); // don't check range, callers are cool 908 int end = Math.min(pos + count, smembers.size()); 909 int nextPos = end == smembers.size() ? 0 : end; 910 scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end)); 911 if (nextPos == 0) { 912 smembers = null; 913 } 914 } 915 return scanResult; 916 } 917 } 918 919 /** 920 * Don't pass more than approx. this number of parameters to Lua calls. 921 */ 922 private static final int BATCH_SIZE = 5000; 923 924 protected void removeAllCompletedWork(final String queueId) throws IOException { 925 removeCompletedWork(queueId, 0); 926 } 927 928 protected void removeCompletedWork(final String queueId, final long completionTime) throws IOException { 929 redisExecutor.execute(new RedisCallable<Void>() { 930 931 @Override 932 public Void call(Jedis jedis) { 933 String completedKey = completedKeyString(queueId); 934 String stateKey = stateKeyString(); 935 String dataKey = dataKeyString(); 936 SScanner sscanner = new SScanner(); 937 ScanParams scanParams = new ScanParams().count(BATCH_SIZE); 938 String cursor = "0"; 939 do { 940 ScanResult<String> scanResult = sscanner.sscan(jedis, completedKey, cursor, scanParams); 941 cursor = scanResult.getStringCursor(); 942 List<String> workIds = scanResult.getResult(); 943 if (!workIds.isEmpty()) { 944 // delete these works if before the completion time 945 List<String> keys = Arrays.asList(completedKey, stateKey, dataKey); 946 List<String> args = new ArrayList<String>(1 + workIds.size()); 947 args.add(String.valueOf(completionTime)); 948 args.addAll(workIds); 949 jedis.evalsha(delCompletedSha, keys, args); 950 } 951 } while (!"0".equals(cursor)); 952 return null; 953 } 954 }); 955 } 956 957}