001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.redis.contribs; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.io.UnsupportedEncodingException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.BlockingQueue; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.redis.RedisAdmin; 041import org.nuxeo.ecm.core.redis.RedisCallable; 042import org.nuxeo.ecm.core.redis.RedisExecutor; 043import org.nuxeo.ecm.core.work.WorkManagerImpl; 044import org.nuxeo.ecm.core.work.WorkQueueDescriptorRegistry; 045import org.nuxeo.ecm.core.work.WorkQueuing; 046import org.nuxeo.ecm.core.work.api.Work; 047import org.nuxeo.ecm.core.work.api.Work.State; 048import org.nuxeo.runtime.api.Framework; 049 050import redis.clients.jedis.Jedis; 051import redis.clients.jedis.Pipeline; 052import redis.clients.jedis.Protocol; 053import redis.clients.jedis.ScanParams; 054import redis.clients.jedis.ScanResult; 055import redis.clients.util.SafeEncoder; 056 057/** 058 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 059 * 060 * @since 5.8 061 */ 062public class RedisWorkQueuing implements WorkQueuing { 063 064 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 065 066 protected static final String UTF_8 = "UTF-8"; 067 068 /** 069 * Global hash of Work instance id -> serialized Work instance. 070 */ 071 protected static final String KEY_DATA = "data"; 072 073 /** 074 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 075 * a completion time in milliseconds. 076 */ 077 protected static final String KEY_STATE = "state"; 078 079 /** 080 * Per-queue list of suspended Work instance ids. 081 */ 082 protected static final String KEY_SUSPENDED_PREFIX = "prev:"; 083 084 /** 085 * Per-queue list of scheduled Work instance ids. 086 */ 087 protected static final String KEY_SCHEDULED_PREFIX = "queue:"; 088 089 /** 090 * Per-queue set of running Work instance ids. 091 */ 092 protected static final String KEY_RUNNING_PREFIX = "run:"; 093 094 /** 095 * Per-queue set of completed Work instance ids. 096 */ 097 protected static final String KEY_COMPLETED_PREFIX = "done:"; 098 099 protected static final byte STATE_SCHEDULED_B = 'Q'; 100 101 protected static final byte STATE_CANCELED_B = 'X'; 102 103 protected static final byte STATE_RUNNING_B = 'R'; 104 105 protected static final byte STATE_COMPLETED_B = 'C'; 106 107 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 108 109 protected static final byte[] STATE_CANCELED = new byte[] { STATE_CANCELED_B }; 110 111 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 112 113 protected static final byte[] STATE_COMPLETED = new byte[] { STATE_COMPLETED_B }; 114 115 protected final WorkManagerImpl mgr; 116 117 // @GuardedBy("this") 118 protected Map<String, BlockingQueue<Runnable>> allScheduled = new HashMap<String, BlockingQueue<Runnable>>(); 119 120 protected RedisExecutor redisExecutor; 121 122 protected RedisAdmin redisAdmin; 123 124 protected String redisNamespace; 125 126 protected String delCompletedSha; 127 128 public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) { 129 this.mgr = mgr; 130 } 131 132 @Override 133 public void init() { 134 redisExecutor = Framework.getLocalService(RedisExecutor.class); 135 redisAdmin = Framework.getService(RedisAdmin.class); 136 redisNamespace = redisAdmin.namespace("work"); 137 try { 138 for (String queueId : getSuspendedQueueIds()) { 139 int n = scheduleSuspendedWork(queueId); 140 log.info("Re-scheduling " + n + " work instances suspended from queue: " + queueId); 141 } 142 } catch (IOException e) { 143 throw new RuntimeException(e); 144 } 145 try { 146 delCompletedSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "del-completed"); 147 } catch (IOException e) { 148 throw new RuntimeException(e); 149 } 150 } 151 152 @Override 153 public BlockingQueue<Runnable> initScheduleQueue(String queueId) { 154 if (allScheduled.containsKey(queueId)) { 155 throw new IllegalStateException(queueId + " is already configured"); 156 } 157 final BlockingQueue<Runnable> scheduled = newBlockingQueue(queueId); 158 allScheduled.put(queueId, scheduled); 159 return scheduled; 160 } 161 162 @Override 163 public BlockingQueue<Runnable> getScheduledQueue(String queueId) { 164 if (!allScheduled.containsKey(queueId)) { 165 throw new IllegalStateException(queueId + " was not configured yet"); 166 } 167 return allScheduled.get(queueId); 168 } 169 170 @Override 171 public void workRunning(String queueId, Work work) { 172 try { 173 workSetRunning(queueId, work); 174 } catch (IOException e) { 175 throw new RuntimeException(e); 176 } 177 } 178 179 @Override 180 public void workCompleted(String queueId, Work work) { 181 try { 182 workSetCompleted(queueId, work); 183 } catch (IOException e) { 184 throw new RuntimeException(e); 185 } 186 } 187 188 protected BlockingQueue<Runnable> newBlockingQueue(String queueId) { 189 return new RedisBlockingQueue(queueId, this); 190 } 191 192 @Override 193 public List<Work> listWork(String queueId, State state) { 194 switch (state) { 195 case SCHEDULED: 196 return listScheduled(queueId); 197 case RUNNING: 198 return listRunning(queueId); 199 case COMPLETED: 200 return listCompleted(queueId); 201 default: 202 throw new IllegalArgumentException(String.valueOf(state)); 203 } 204 } 205 206 @Override 207 public List<String> listWorkIds(String queueId, State state) { 208 if (state == null) { 209 return listNonCompletedIds(queueId); 210 } 211 switch (state) { 212 case SCHEDULED: 213 return listScheduledIds(queueId); 214 case RUNNING: 215 return listRunningIds(queueId); 216 case COMPLETED: 217 return listCompletedIds(queueId); 218 default: 219 throw new IllegalArgumentException(String.valueOf(state)); 220 } 221 } 222 223 protected List<Work> listScheduled(String queueId) { 224 try { 225 return listWorkList(scheduledKey(queueId)); 226 } catch (IOException e) { 227 throw new RuntimeException(e); 228 } 229 } 230 231 protected List<Work> listRunning(String queueId) { 232 try { 233 return listWorkSet(runningKey(queueId)); 234 } catch (IOException e) { 235 throw new RuntimeException(e); 236 } 237 } 238 239 protected List<Work> listCompleted(String queueId) { 240 try { 241 return listWorkSet(completedKey(queueId)); 242 } catch (IOException e) { 243 throw new RuntimeException(e); 244 } 245 } 246 247 protected List<String> listScheduledIds(String queueId) { 248 try { 249 return listWorkIdsList(scheduledKey(queueId)); 250 } catch (IOException e) { 251 throw new RuntimeException(e); 252 } 253 } 254 255 protected List<String> listRunningIds(String queueId) { 256 try { 257 return listWorkIdsSet(runningKey(queueId)); 258 } catch (IOException e) { 259 throw new RuntimeException(e); 260 } 261 } 262 263 protected List<String> listNonCompletedIds(String queueId) { 264 List<String> list = listScheduledIds(queueId); 265 list.addAll(listRunningIds(queueId)); 266 return list; 267 } 268 269 protected List<String> listCompletedIds(String queueId) { 270 try { 271 return listWorkIdsSet(completedKey(queueId)); 272 } catch (IOException e) { 273 throw new RuntimeException(e); 274 } 275 } 276 277 @Override 278 public int getQueueSize(String queueId, State state) { 279 switch (state) { 280 case SCHEDULED: 281 return getScheduledSize(queueId); 282 case RUNNING: 283 return getRunningSize(queueId); 284 case COMPLETED: 285 return getCompletedSize(queueId); 286 default: 287 throw new IllegalArgumentException(String.valueOf(state)); 288 } 289 } 290 291 protected int getScheduledSize(String queueId) { 292 try { 293 return getScheduledQueueSize(queueId); 294 } catch (IOException e) { 295 throw new RuntimeException(e); 296 } 297 } 298 299 protected int getRunningSize(String queueId) { 300 try { 301 return getRunningQueueSize(queueId); 302 } catch (IOException e) { 303 throw new RuntimeException(e); 304 } 305 } 306 307 protected int getCompletedSize(String queueId) { 308 try { 309 return getCompletedQueueSize(queueId); 310 } catch (IOException e) { 311 throw new RuntimeException(e); 312 } 313 } 314 315 @Override 316 public Work find(String workId, State state) { 317 if (isWorkInState(workId, state)) { 318 return getWork(bytes(workId)); 319 } 320 return null; 321 } 322 323 @Override 324 public boolean isWorkInState(String workId, State state) { 325 State s = getWorkState(workId); 326 if (state == null) { 327 return s == State.SCHEDULED || s == State.RUNNING; 328 } 329 return s == state; 330 } 331 332 @Override 333 public Work removeScheduled(String queueId, String workId) { 334 try { 335 return removeScheduledWork(queueId, workId); 336 } catch (IOException cause) { 337 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 338 } 339 } 340 341 @Override 342 public State getWorkState(String workId) { 343 return getWorkStateInfo(workId); 344 } 345 346 @Override 347 public int setSuspending(String queueId) { 348 try { 349 int n = suspendScheduledWork(queueId); 350 log.info("Suspending " + n + " work instances from queue: " + queueId); 351 return n; 352 } catch (IOException e) { 353 throw new RuntimeException(e); 354 } 355 } 356 357 @Override 358 public void clearCompletedWork(String queueId, long completionTime) { 359 try { 360 if (completionTime <= 0) { 361 removeAllCompletedWork(queueId); 362 } else { 363 removeCompletedWork(queueId, completionTime); 364 } 365 } catch (IOException e) { 366 throw new RuntimeException(e); 367 } 368 } 369 370 /* 371 * ******************** Redis Interface ******************** 372 */ 373 374 protected String string(byte[] bytes) { 375 try { 376 return new String(bytes, UTF_8); 377 } catch (IOException e) { 378 throw new RuntimeException(e); 379 } 380 } 381 382 protected byte[] bytes(String string) { 383 try { 384 return string.getBytes(UTF_8); 385 } catch (IOException e) { 386 // cannot happen for UTF-8 387 throw new RuntimeException(e); 388 } 389 } 390 391 protected byte[] keyBytes(String prefix, String queueId) { 392 return keyBytes(prefix + queueId); 393 } 394 395 protected byte[] keyBytes(String prefix) { 396 return bytes(redisNamespace + prefix); 397 } 398 399 protected byte[] suspendedKey(String queueId) { 400 return keyBytes(KEY_SUSPENDED_PREFIX, queueId); 401 } 402 403 protected byte[] scheduledKey(String queueId) { 404 return keyBytes(KEY_SCHEDULED_PREFIX, queueId); 405 } 406 407 protected byte[] runningKey(String queueId) { 408 return keyBytes(KEY_RUNNING_PREFIX, queueId); 409 } 410 411 protected byte[] completedKey(String queueId) { 412 return keyBytes(KEY_COMPLETED_PREFIX, queueId); 413 } 414 415 protected String completedKeyString(String queueId) { 416 return redisNamespace + KEY_COMPLETED_PREFIX + queueId; 417 } 418 419 protected byte[] stateKey() { 420 return keyBytes(KEY_STATE); 421 } 422 423 protected String stateKeyString() { 424 return redisNamespace + KEY_STATE; 425 } 426 427 protected byte[] dataKey() { 428 return keyBytes(KEY_DATA); 429 } 430 431 protected String dataKeyString() { 432 return redisNamespace + KEY_DATA; 433 } 434 435 protected byte[] serializeWork(Work work) throws IOException { 436 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 437 ObjectOutputStream out = new ObjectOutputStream(baout); 438 out.writeObject(work); 439 out.flush(); 440 out.close(); 441 return baout.toByteArray(); 442 } 443 444 protected Work deserializeWork(byte[] workBytes) { 445 if (workBytes == null) { 446 return null; 447 } 448 InputStream bain = new ByteArrayInputStream(workBytes); 449 try (ObjectInputStream in = new ObjectInputStream(bain)) { 450 return (Work) in.readObject(); 451 } catch (RuntimeException cause) { 452 throw cause; 453 } catch (IOException | ClassNotFoundException cause) { 454 throw new RuntimeException("Cannot deserialize work", cause); 455 } 456 } 457 458 protected int getScheduledQueueSize(final String queueId) throws IOException { 459 return redisExecutor.execute(new RedisCallable<Long>() { 460 461 @Override 462 public Long call(Jedis jedis) { 463 return jedis.llen(scheduledKey(queueId)); 464 } 465 466 }).intValue(); 467 } 468 469 protected int getRunningQueueSize(final String queueId) throws IOException { 470 return redisExecutor.execute(new RedisCallable<Long>() { 471 472 @Override 473 public Long call(Jedis jedis) { 474 return jedis.scard(runningKey(queueId)); 475 } 476 477 }).intValue(); 478 } 479 480 protected int getCompletedQueueSize(final String queueId) throws IOException { 481 return redisExecutor.execute(new RedisCallable<Long>() { 482 483 @Override 484 public Long call(Jedis jedis) { 485 return jedis.scard(completedKey(queueId)); 486 } 487 488 }).intValue(); 489 } 490 491 /** 492 * Persists a work instance and adds it to the scheduled queue. 493 * 494 * @param queueId the queue id 495 * @param work the work instance 496 * @throws IOException 497 */ 498 public void addScheduledWork(final String queueId, Work work) throws IOException { 499 log.debug("Add scheduled " + work); 500 final byte[] workIdBytes = bytes(work.getId()); 501 502 // serialize Work 503 final byte[] workBytes = serializeWork(work); 504 505 redisExecutor.execute(new RedisCallable<Void>() { 506 507 @Override 508 public Void call(Jedis jedis) { 509 if (redisExecutor.supportPipelined()) { 510 Pipeline pipe = jedis.pipelined(); 511 pipe.hset(dataKey(), workIdBytes, workBytes); 512 pipe.hset(stateKey(), workIdBytes, STATE_SCHEDULED); 513 pipe.lpush(scheduledKey(queueId), workIdBytes); 514 pipe.sync(); 515 } else { 516 jedis.hset(dataKey(), workIdBytes, workBytes); 517 jedis.hset(stateKey(), workIdBytes, STATE_SCHEDULED); 518 jedis.lpush(scheduledKey(queueId), workIdBytes); 519 } 520 return null; 521 } 522 523 }); 524 } 525 526 /** 527 * Finds which queues have suspended work. 528 * 529 * @return a set of queue ids 530 * @since 5.8 531 */ 532 protected Set<String> getSuspendedQueueIds() throws IOException { 533 return getQueueIds(KEY_SUSPENDED_PREFIX); 534 } 535 536 protected Set<String> getScheduledQueueIds() { 537 return getQueueIds(KEY_SCHEDULED_PREFIX); 538 } 539 540 protected Set<String> getRunningQueueIds() { 541 return getQueueIds(KEY_RUNNING_PREFIX); 542 } 543 544 @Override 545 public Set<String> getCompletedQueueIds() { 546 return getQueueIds(KEY_COMPLETED_PREFIX); 547 } 548 549 /** 550 * Finds which queues have work for a given state prefix. 551 * 552 * @return a set of queue ids 553 * @since 5.8 554 */ 555 protected Set<String> getQueueIds(final String queuePrefix) { 556 return redisExecutor.execute(new RedisCallable<Set<String>>() { 557 @Override 558 public Set<String> call(Jedis jedis) { 559 int offset = keyBytes(queuePrefix).length; 560 Set<byte[]> keys = jedis.keys(keyBytes(queuePrefix, "*")); 561 Set<String> queueIds = new HashSet<String>(keys.size()); 562 for (byte[] bytes : keys) { 563 try { 564 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 565 queueIds.add(queueId); 566 } catch (IOException e) { 567 throw new NuxeoException(e); 568 } 569 } 570 return queueIds; 571 } 572 573 }); 574 } 575 576 /** 577 * Resumes all suspended work instances by moving them to the scheduled queue. 578 * 579 * @param queueId the queue id 580 * @return the number of work instances scheduled 581 * @since 5.8 582 */ 583 public int scheduleSuspendedWork(final String queueId) throws IOException { 584 return redisExecutor.execute(new RedisCallable<Integer>() { 585 @Override 586 public Integer call(Jedis jedis) { 587 for (int n = 0;; n++) { 588 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), scheduledKey(queueId)); 589 if (workIdBytes == null) { 590 return Integer.valueOf(n); 591 } 592 } 593 } 594 595 }).intValue(); 596 } 597 598 /** 599 * Suspends all scheduled work instances by moving them to the suspended queue. 600 * 601 * @param queueId the queue id 602 * @return the number of work instances suspended 603 * @since 5.8 604 */ 605 public int suspendScheduledWork(final String queueId) throws IOException { 606 return redisExecutor.execute(new RedisCallable<Integer>() { 607 608 @Override 609 public Integer call(Jedis jedis) { 610 for (int n = 0;; n++) { 611 byte[] workIdBytes = jedis.rpoplpush(scheduledKey(queueId), suspendedKey(queueId)); 612 if (workIdBytes == null) { 613 return n; 614 } 615 } 616 } 617 }).intValue(); 618 } 619 620 /** 621 * Switches a work to state running. 622 * 623 * @param queueId the queue id 624 * @param work the work 625 */ 626 protected void workSetRunning(final String queueId, Work work) throws IOException { 627 final byte[] workIdBytes = bytes(work.getId()); 628 redisExecutor.execute(new RedisCallable<Void>() { 629 630 @Override 631 public Void call(Jedis jedis) { 632 if (redisExecutor.supportPipelined()) { 633 Pipeline pipe = jedis.pipelined(); 634 pipe.sadd(runningKey(queueId), workIdBytes); 635 pipe.hset(stateKey(), workIdBytes, STATE_RUNNING); 636 pipe.sync(); 637 } else { 638 jedis.sadd(runningKey(queueId), workIdBytes); 639 jedis.hset(stateKey(), workIdBytes, STATE_RUNNING); 640 } 641 return null; 642 } 643 }); 644 } 645 646 /** 647 * Switches a work to state completed, and saves its new state. 648 */ 649 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 650 final byte[] workIdBytes = bytes(work.getId()); 651 final byte[] workBytes = serializeWork(work); 652 redisExecutor.execute(new RedisCallable<Void>() { 653 654 @Override 655 public Void call(Jedis jedis) { 656 byte[] completedBytes = bytes(((char) STATE_COMPLETED_B) + String.valueOf(work.getCompletionTime())); 657 if (redisExecutor.supportPipelined()) { 658 Pipeline pipe = jedis.pipelined(); 659 // store (updated) content in hash 660 pipe.hset(dataKey(), workIdBytes, workBytes); 661 // remove key from running set 662 pipe.srem(runningKey(queueId), workIdBytes); 663 // put key in completed set 664 pipe.sadd(completedKey(queueId), workIdBytes); 665 // set state to completed 666 pipe.hset(stateKey(), workIdBytes, completedBytes); 667 pipe.sync(); 668 } else { 669 jedis.hset(dataKey(), workIdBytes, workBytes); 670 jedis.srem(runningKey(queueId), workIdBytes); 671 jedis.sadd(completedKey(queueId), workIdBytes); 672 jedis.hset(stateKey(), workIdBytes, completedBytes); 673 } 674 return null; 675 } 676 }); 677 } 678 679 /** 680 * Gets the work state. 681 * 682 * @param workId the work id 683 * @return the state, or {@code null} if not found 684 */ 685 protected State getWorkStateInfo(final String workId) { 686 final byte[] workIdBytes = bytes(workId); 687 return redisExecutor.execute(new RedisCallable<State>() { 688 @Override 689 public State call(Jedis jedis) { 690 // get state 691 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 692 if (bytes == null || bytes.length == 0) { 693 return null; 694 } 695 switch (bytes[0]) { 696 case STATE_SCHEDULED_B: 697 return State.SCHEDULED; 698 case STATE_CANCELED_B: 699 return State.CANCELED; 700 case STATE_RUNNING_B: 701 return State.RUNNING; 702 case STATE_COMPLETED_B: 703 return State.COMPLETED; 704 default: 705 String msg; 706 try { 707 msg = new String(bytes, UTF_8); 708 } catch (UnsupportedEncodingException e) { 709 msg = Arrays.toString(bytes); 710 } 711 log.error("Unknown work state: " + msg + ", work: " + workId); 712 return null; 713 } 714 } 715 }); 716 } 717 718 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 719 return redisExecutor.execute(new RedisCallable<List<String>>() { 720 721 @Override 722 public List<String> call(Jedis jedis) { 723 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 724 List<String> list = new ArrayList<String>(keys.size()); 725 for (byte[] workIdBytes : keys) { 726 list.add(string(workIdBytes)); 727 } 728 return list; 729 } 730 731 }); 732 } 733 734 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 735 return redisExecutor.execute(new RedisCallable<List<String>>() { 736 737 @Override 738 public List<String> call(Jedis jedis) { 739 740 Set<byte[]> keys = jedis.smembers(queueBytes); 741 List<String> list = new ArrayList<String>(keys.size()); 742 for (byte[] workIdBytes : keys) { 743 list.add(string(workIdBytes)); 744 } 745 return list; 746 } 747 748 }); 749 750 } 751 752 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 753 return redisExecutor.execute(new RedisCallable<List<Work>>() { 754 @Override 755 public List<Work> call(Jedis jedis) { 756 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 757 List<Work> list = new ArrayList<Work>(keys.size()); 758 for (byte[] workIdBytes : keys) { 759 // get data 760 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 761 Work work = deserializeWork(workBytes); 762 list.add(work); 763 } 764 return list; 765 } 766 }); 767 } 768 769 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 770 return redisExecutor.execute(new RedisCallable<List<Work>>() { 771 @Override 772 public List<Work> call(Jedis jedis) { 773 Set<byte[]> keys = jedis.smembers(queueBytes); 774 List<Work> list = new ArrayList<Work>(keys.size()); 775 for (byte[] workIdBytes : keys) { 776 // get data 777 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 778 Work work = deserializeWork(workBytes); 779 list.add(work); 780 } 781 return list; 782 } 783 }); 784 } 785 786 protected Work getWork(byte[] workIdBytes) { 787 try { 788 return getWorkData(workIdBytes); 789 } catch (IOException e) { 790 throw new RuntimeException(e); 791 } 792 } 793 794 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 795 return redisExecutor.execute(new RedisCallable<Work>() { 796 797 @Override 798 public Work call(Jedis jedis) { 799 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 800 return deserializeWork(workBytes); 801 } 802 803 }); 804 } 805 806 /** 807 * Removes first work from scheduled queue. 808 * 809 * @param queueId the queue id 810 * @return the work, or {@code null} if the scheduled queue is empty 811 */ 812 protected Work removeScheduledWork(final String queueId) throws IOException { 813 return redisExecutor.execute(new RedisCallable<Work>() { 814 815 @Override 816 public Work call(Jedis jedis) { 817 // pop from queue 818 byte[] workIdBytes = jedis.rpop(scheduledKey(queueId)); 819 if (workIdBytes == null) { 820 return null; 821 } 822 // get data 823 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 824 return deserializeWork(workBytes); 825 } 826 827 }); 828 } 829 830 /** 831 * Removes a given work from scheduled queue and set state to completed. 832 * 833 * @throws IOException 834 */ 835 protected Work removeScheduledWork(final String queueId, final String workId) throws IOException { 836 final byte[] workIdBytes = bytes(workId); 837 return redisExecutor.execute(new RedisCallable<Work>() { 838 839 @Override 840 public Work call(Jedis jedis) { 841 // remove from queue 842 Long n = jedis.lrem(scheduledKey(queueId), 0, workIdBytes); 843 if (n == null || n.intValue() == 0) { 844 return null; 845 } 846 // set state to completed at current time 847 byte[] completedBytes = bytes(String.valueOf(System.currentTimeMillis())); 848 jedis.hset(stateKey(), workIdBytes, completedBytes); 849 // get data 850 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 851 return deserializeWork(workBytes); 852 } 853 854 }); 855 } 856 857 /** 858 * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does 859 * not support SSCAN. 860 * 861 * @since 7.3 862 */ 863 public static class SScanner { 864 865 // results of SMEMBERS for last key, in embedded mode 866 protected List<String> smembers; 867 868 protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) { 869 ScanResult<String> scanResult; 870 try { 871 scanResult = jedis.sscan(key, cursor, params); 872 } catch (Exception e) { 873 // when testing with embedded fake redis, we may get an un-declared exception 874 if (!(e.getCause() instanceof NoSuchMethodException)) { 875 throw e; 876 } 877 // no SSCAN available in embedded, do a full SMEMBERS 878 if (smembers == null) { 879 Set<String> set = jedis.smembers(key); 880 smembers = new ArrayList<>(set); 881 } 882 Collection<byte[]> bparams = params.getParams(); 883 int count = 1000; 884 for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) { 885 byte[] param = it.next(); 886 if (param.equals(Protocol.Keyword.MATCH.raw)) { 887 throw new UnsupportedOperationException("MATCH not supported"); 888 } 889 if (param.equals(Protocol.Keyword.COUNT.raw)) { 890 count = Integer.parseInt(SafeEncoder.encode(it.next())); 891 } 892 } 893 int pos = Integer.parseInt(cursor); // don't check range, callers are cool 894 int end = Math.min(pos + count, smembers.size()); 895 int nextPos = end == smembers.size() ? 0 : end; 896 scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end)); 897 if (nextPos == 0) { 898 smembers = null; 899 } 900 } 901 return scanResult; 902 } 903 } 904 905 /** 906 * Don't pass more than approx. this number of parameters to Lua calls. 907 */ 908 private static final int BATCH_SIZE = 5000; 909 910 protected void removeAllCompletedWork(final String queueId) throws IOException { 911 removeCompletedWork(queueId, 0); 912 } 913 914 protected void removeCompletedWork(final String queueId, final long completionTime) throws IOException { 915 redisExecutor.execute(new RedisCallable<Void>() { 916 917 @Override 918 public Void call(Jedis jedis) { 919 String completedKey = completedKeyString(queueId); 920 String stateKey = stateKeyString(); 921 String dataKey = dataKeyString(); 922 SScanner sscanner = new SScanner(); 923 ScanParams scanParams = new ScanParams().count(BATCH_SIZE); 924 String cursor = "0"; 925 do { 926 ScanResult<String> scanResult = sscanner.sscan(jedis, completedKey, cursor, scanParams); 927 cursor = scanResult.getStringCursor(); 928 List<String> workIds = scanResult.getResult(); 929 if (!workIds.isEmpty()) { 930 // delete these works if before the completion time 931 List<String> keys = Arrays.asList(completedKey, stateKey, dataKey); 932 List<String> args = new ArrayList<String>(1 + workIds.size()); 933 args.add(String.valueOf(completionTime)); 934 args.addAll(workIds); 935 jedis.evalsha(delCompletedSha, keys, args); 936 } 937 } while (!"0".equals(cursor)); 938 return null; 939 } 940 }); 941 } 942 943}