001/* 002 * (C) Copyright 2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.ObjectInputStream; 026import java.io.ObjectOutputStream; 027import java.io.UnsupportedEncodingException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.redis.RedisAdmin; 043import org.nuxeo.ecm.core.redis.RedisCallable; 044import org.nuxeo.ecm.core.redis.RedisExecutor; 045import org.nuxeo.ecm.core.work.WorkHolder; 046import org.nuxeo.ecm.core.work.WorkManagerImpl; 047import org.nuxeo.ecm.core.work.WorkQueueDescriptorRegistry; 048import org.nuxeo.ecm.core.work.WorkQueuing; 049import org.nuxeo.ecm.core.work.api.Work; 050import org.nuxeo.ecm.core.work.api.Work.State; 051import org.nuxeo.runtime.api.Framework; 052 053import redis.clients.jedis.Jedis; 054import redis.clients.jedis.Pipeline; 055import redis.clients.jedis.Protocol; 056import redis.clients.jedis.ScanParams; 057import redis.clients.jedis.ScanResult; 058import redis.clients.util.SafeEncoder; 059 060/** 061 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 062 * 063 * @since 5.8 064 */ 065public class RedisWorkQueuing implements WorkQueuing { 066 067 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 068 069 protected static final String UTF_8 = "UTF-8"; 070 071 /** 072 * Global hash of Work instance id -> serialized Work instance. 073 */ 074 protected static final String KEY_DATA = "data"; 075 076 /** 077 * Global hash of Work instance id -> Work state. The completed state ( {@value #STATE_COMPLETED_B}) is followed by 078 * a completion time in milliseconds. 079 */ 080 protected static final String KEY_STATE = "state"; 081 082 /** 083 * Per-queue list of suspended Work instance ids. 084 */ 085 protected static final String KEY_SUSPENDED_PREFIX = "prev:"; 086 087 /** 088 * Per-queue list of scheduled Work instance ids. 089 */ 090 protected static final String KEY_QUEUE_PREFIX = "queue:"; 091 092 /** 093 * Per-queue set of scheduled Work instance ids. 094 */ 095 protected static final String KEY_SCHEDULED_PREFIX = "sched:"; 096 097 /** 098 * Per-queue set of running Work instance ids. 099 */ 100 protected static final String KEY_RUNNING_PREFIX = "run:"; 101 102 /** 103 * Per-queue set of completed Work instance ids. 104 */ 105 protected static final String KEY_COMPLETED_PREFIX = "done:"; 106 107 protected static final byte STATE_SCHEDULED_B = 'Q'; 108 109 protected static final byte STATE_CANCELED_B = 'X'; 110 111 protected static final byte STATE_RUNNING_B = 'R'; 112 113 protected static final byte STATE_COMPLETED_B = 'C'; 114 115 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 116 117 protected static final byte[] STATE_CANCELED = new byte[] { STATE_CANCELED_B }; 118 119 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 120 121 protected static final byte[] STATE_COMPLETED = new byte[] { STATE_COMPLETED_B }; 122 123 protected final WorkManagerImpl mgr; 124 125 // @GuardedBy("this") 126 protected Map<String, BlockingQueue<Runnable>> allQueued = new HashMap<String, BlockingQueue<Runnable>>(); 127 128 protected RedisExecutor redisExecutor; 129 130 protected RedisAdmin redisAdmin; 131 132 protected String redisNamespace; 133 134 // lua scripts 135 protected String schedulingWorkSha; 136 protected String runningWorkSha; 137 protected String completedWorkSha; 138 protected String cleanCompletedWorkSha; 139 140 public RedisWorkQueuing(WorkManagerImpl mgr, WorkQueueDescriptorRegistry workQueueDescriptors) { 141 this.mgr = mgr; 142 } 143 144 @Override 145 public void init() { 146 redisExecutor = Framework.getLocalService(RedisExecutor.class); 147 redisAdmin = Framework.getService(RedisAdmin.class); 148 redisNamespace = redisAdmin.namespace("work"); 149 try { 150 for (String queueId : getSuspendedQueueIds()) { 151 int n = scheduleSuspendedWork(queueId); 152 log.info("Re-scheduling " + n + " work instances suspended from queue: " + queueId); 153 } 154 } catch (IOException e) { 155 throw new RuntimeException(e); 156 } 157 try { 158 schedulingWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "scheduling-work"); 159 runningWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "running-work"); 160 completedWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "completed-work"); 161 cleanCompletedWorkSha = redisAdmin.load("org.nuxeo.ecm.core.redis", "clean-completed-work"); 162 } catch (IOException e) { 163 throw new RuntimeException(e); 164 } 165 } 166 167 @Override 168 public BlockingQueue<Runnable> initWorkQueue(String queueId) { 169 if (allQueued.containsKey(queueId)) { 170 throw new IllegalStateException(queueId + " is already configured"); 171 } 172 final BlockingQueue<Runnable> queue = newBlockingQueue(queueId); 173 allQueued.put(queueId, queue); 174 return queue; 175 } 176 177 @Override 178 public boolean workSchedule(String queueId, Work work) { 179 return getWorkQueue(queueId).offer(new WorkHolder(work)); 180 } 181 182 public BlockingQueue<Runnable> getWorkQueue(String queueId) { 183 if (!allQueued.containsKey(queueId)) { 184 throw new IllegalStateException(queueId + " was not configured yet"); 185 } 186 return allQueued.get(queueId); 187 } 188 189 190 @Override 191 public void workRunning(String queueId, Work work) { 192 try { 193 workSetRunning(queueId, work); 194 } catch (IOException e) { 195 throw new RuntimeException(e); 196 } 197 } 198 199 @Override 200 public void workCompleted(String queueId, Work work) { 201 try { 202 workSetCompleted(queueId, work); 203 } catch (IOException e) { 204 throw new RuntimeException(e); 205 } 206 } 207 208 protected BlockingQueue<Runnable> newBlockingQueue(String queueId) { 209 return new RedisBlockingQueue(queueId, this); 210 } 211 212 @Override 213 public List<Work> listWork(String queueId, State state) { 214 switch (state) { 215 case SCHEDULED: 216 return listScheduled(queueId); 217 case RUNNING: 218 return listRunning(queueId); 219 case COMPLETED: 220 return listCompleted(queueId); 221 default: 222 throw new IllegalArgumentException(String.valueOf(state)); 223 } 224 } 225 226 @Override 227 public List<String> listWorkIds(String queueId, State state) { 228 if (state == null) { 229 return listNonCompletedIds(queueId); 230 } 231 switch (state) { 232 case SCHEDULED: 233 return listScheduledIds(queueId); 234 case RUNNING: 235 return listRunningIds(queueId); 236 case COMPLETED: 237 return listCompletedIds(queueId); 238 default: 239 throw new IllegalArgumentException(String.valueOf(state)); 240 } 241 } 242 243 protected List<Work> listScheduled(String queueId) { 244 try { 245 return listWorkList(queuedKey(queueId)); 246 } catch (IOException e) { 247 throw new RuntimeException(e); 248 } 249 } 250 251 protected List<Work> listRunning(String queueId) { 252 try { 253 return listWorkSet(runningKey(queueId)); 254 } catch (IOException e) { 255 throw new RuntimeException(e); 256 } 257 } 258 259 protected List<Work> listCompleted(String queueId) { 260 try { 261 return listWorkSet(completedKey(queueId)); 262 } catch (IOException e) { 263 throw new RuntimeException(e); 264 } 265 } 266 267 protected List<String> listScheduledIds(String queueId) { 268 try { 269 return listWorkIdsList(queuedKey(queueId)); 270 } catch (IOException e) { 271 throw new RuntimeException(e); 272 } 273 } 274 275 protected List<String> listRunningIds(String queueId) { 276 try { 277 return listWorkIdsSet(runningKey(queueId)); 278 } catch (IOException e) { 279 throw new RuntimeException(e); 280 } 281 } 282 283 protected List<String> listNonCompletedIds(String queueId) { 284 List<String> list = listScheduledIds(queueId); 285 list.addAll(listRunningIds(queueId)); 286 return list; 287 } 288 289 protected List<String> listCompletedIds(String queueId) { 290 try { 291 return listWorkIdsSet(completedKey(queueId)); 292 } catch (IOException e) { 293 throw new RuntimeException(e); 294 } 295 } 296 297 @Override 298 public int count(String queueId, State state) { 299 switch (state) { 300 case SCHEDULED: 301 return getScheduledSize(queueId); 302 case RUNNING: 303 return getRunningSize(queueId); 304 case COMPLETED: 305 return getCompletedSize(queueId); 306 default: 307 throw new IllegalArgumentException(String.valueOf(state)); 308 } 309 } 310 311 protected int getScheduledSize(String queueId) { 312 try { 313 return getRedisSetSize(scheduledKey(queueId)); 314 } catch (IOException e) { 315 throw new RuntimeException(e); 316 } 317 } 318 319 protected int getRunningSize(String queueId) { 320 try { 321 return getRedisSetSize(runningKey(queueId)); 322 } catch (IOException e) { 323 throw new RuntimeException(e); 324 } 325 } 326 327 protected int getCompletedSize(String queueId) { 328 try { 329 return getRedisSetSize(completedKey(queueId)); 330 } catch (IOException e) { 331 throw new RuntimeException(e); 332 } 333 } 334 335 @Override 336 public Work find(String workId, State state) { 337 if (isWorkInState(workId, state)) { 338 return getWork(bytes(workId)); 339 } 340 return null; 341 } 342 343 @Override 344 public boolean isWorkInState(String workId, State state) { 345 State s = getWorkState(workId); 346 if (state == null) { 347 return s == State.SCHEDULED || s == State.RUNNING; 348 } 349 return s == state; 350 } 351 352 @Override 353 public Work removeScheduled(String queueId, String workId) { 354 try { 355 return removeScheduledWork(queueId, workId); 356 } catch (IOException cause) { 357 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 358 } 359 } 360 361 @Override 362 public State getWorkState(String workId) { 363 return getWorkStateInfo(workId); 364 } 365 366 @Override 367 public int setSuspending(String queueId) { 368 try { 369 int n = suspendScheduledWork(queueId); 370 log.info("Suspending " + n + " work instances from queue: " + queueId); 371 allQueued.remove(queueId); 372 return n; 373 } catch (IOException e) { 374 throw new RuntimeException(e); 375 } 376 } 377 378 @Override 379 public void clearCompletedWork(String queueId, long completionTime) { 380 try { 381 if (completionTime <= 0) { 382 removeAllCompletedWork(queueId); 383 } else { 384 removeCompletedWork(queueId, completionTime); 385 } 386 } catch (IOException e) { 387 throw new RuntimeException(e); 388 } 389 } 390 391 /* 392 * ******************** Redis Interface ******************** 393 */ 394 395 protected String string(byte[] bytes) { 396 try { 397 return new String(bytes, UTF_8); 398 } catch (IOException e) { 399 throw new RuntimeException(e); 400 } 401 } 402 403 protected byte[] bytes(String string) { 404 try { 405 return string.getBytes(UTF_8); 406 } catch (IOException e) { 407 // cannot happen for UTF-8 408 throw new RuntimeException(e); 409 } 410 } 411 412 protected byte[] keyBytes(String prefix, String queueId) { 413 return keyBytes(prefix + queueId); 414 } 415 416 protected byte[] keyBytes(String prefix) { 417 return bytes(redisNamespace + prefix); 418 } 419 420 protected byte[] suspendedKey(String queueId) { 421 return keyBytes(KEY_SUSPENDED_PREFIX, queueId); 422 } 423 424 protected byte[] queuedKey(String queueId) { 425 return keyBytes(KEY_QUEUE_PREFIX, queueId); 426 } 427 428 protected byte[] runningKey(String queueId) { 429 return keyBytes(KEY_RUNNING_PREFIX, queueId); 430 } 431 432 protected byte[] scheduledKey(String queueId) { 433 return keyBytes(KEY_SCHEDULED_PREFIX, queueId); 434 } 435 436 protected byte[] completedKey(String queueId) { 437 return keyBytes(KEY_COMPLETED_PREFIX, queueId); 438 } 439 440 protected String completedKeyString(String queueId) { 441 return redisNamespace + KEY_COMPLETED_PREFIX + queueId; 442 } 443 444 protected byte[] stateKey() { 445 return keyBytes(KEY_STATE); 446 } 447 448 protected String stateKeyString() { 449 return redisNamespace + KEY_STATE; 450 } 451 452 protected byte[] dataKey() { 453 return keyBytes(KEY_DATA); 454 } 455 456 protected String dataKeyString() { 457 return redisNamespace + KEY_DATA; 458 } 459 460 protected byte[] serializeWork(Work work) throws IOException { 461 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 462 ObjectOutputStream out = new ObjectOutputStream(baout); 463 out.writeObject(work); 464 out.flush(); 465 out.close(); 466 return baout.toByteArray(); 467 } 468 469 protected Work deserializeWork(byte[] workBytes) { 470 if (workBytes == null) { 471 return null; 472 } 473 InputStream bain = new ByteArrayInputStream(workBytes); 474 try (ObjectInputStream in = new ObjectInputStream(bain)) { 475 return (Work) in.readObject(); 476 } catch (RuntimeException cause) { 477 throw cause; 478 } catch (IOException | ClassNotFoundException cause) { 479 throw new RuntimeException("Cannot deserialize work", cause); 480 } 481 } 482 483 protected int getRedisListSize(final byte[] key) throws IOException { 484 return redisExecutor.execute(new RedisCallable<Long>() { 485 486 @Override 487 public Long call(Jedis jedis) { 488 return jedis.llen(key); 489 } 490 491 }).intValue(); 492 } 493 494 protected int getRedisSetSize(final byte[] key) throws IOException { 495 return redisExecutor.execute(new RedisCallable<Long>() { 496 497 @Override 498 public Long call(Jedis jedis) { 499 return jedis.scard(key); 500 } 501 502 }).intValue(); 503 } 504 505 506 /** 507 * Persists a work instance and adds it to the scheduled queue. 508 * 509 * @param queueId the queue id 510 * @param work the work instance 511 * @throws IOException 512 */ 513 public void addScheduledWork(final String queueId, Work work) throws IOException { 514 final byte[] workId = bytes(work.getId()); 515 final byte[] workData = serializeWork(work); 516 final List<byte[]> keys = Arrays.asList(dataKey(), stateKey(), scheduledKey(queueId),queuedKey(queueId)); 517 final List<byte[]> args = Arrays.asList(workId, workData, STATE_SCHEDULED); 518 redisExecutor.execute(new RedisCallable<Void>() { 519 520 @Override 521 public Void call(Jedis jedis) { 522 jedis.evalsha(schedulingWorkSha.getBytes(), keys, args); 523 if (log.isDebugEnabled()) { 524 log.debug("Add scheduled " + work); 525 } 526 return null; 527 } 528 529 }); 530 } 531 532 /** 533 * Finds which queues have suspended work. 534 * 535 * @return a set of queue ids 536 * @since 5.8 537 */ 538 protected Set<String> getSuspendedQueueIds() throws IOException { 539 return getQueueIds(KEY_SUSPENDED_PREFIX); 540 } 541 542 protected Set<String> getScheduledQueueIds() { 543 return getQueueIds(KEY_QUEUE_PREFIX); 544 } 545 546 protected Set<String> getRunningQueueIds() { 547 return getQueueIds(KEY_RUNNING_PREFIX); 548 } 549 550 @Override 551 public Set<String> getCompletedQueueIds() { 552 return getQueueIds(KEY_COMPLETED_PREFIX); 553 } 554 555 /** 556 * Finds which queues have work for a given state prefix. 557 * 558 * @return a set of queue ids 559 * @since 5.8 560 */ 561 protected Set<String> getQueueIds(final String queuePrefix) { 562 return redisExecutor.execute(new RedisCallable<Set<String>>() { 563 @Override 564 public Set<String> call(Jedis jedis) { 565 int offset = keyBytes(queuePrefix).length; 566 Set<byte[]> keys = jedis.keys(keyBytes(queuePrefix, "*")); 567 Set<String> queueIds = new HashSet<String>(keys.size()); 568 for (byte[] bytes : keys) { 569 try { 570 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 571 queueIds.add(queueId); 572 } catch (IOException e) { 573 throw new NuxeoException(e); 574 } 575 } 576 return queueIds; 577 } 578 579 }); 580 } 581 582 /** 583 * Resumes all suspended work instances by moving them to the scheduled queue. 584 * 585 * @param queueId the queue id 586 * @return the number of work instances scheduled 587 * @since 5.8 588 */ 589 public int scheduleSuspendedWork(final String queueId) throws IOException { 590 return redisExecutor.execute(new RedisCallable<Integer>() { 591 @Override 592 public Integer call(Jedis jedis) { 593 for (int n = 0;; n++) { 594 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 595 if (workIdBytes == null) { 596 return Integer.valueOf(n); 597 } 598 } 599 } 600 601 }).intValue(); 602 } 603 604 /** 605 * Suspends all scheduled work instances by moving them to the suspended queue. 606 * 607 * @param queueId the queue id 608 * @return the number of work instances suspended 609 * @since 5.8 610 */ 611 public int suspendScheduledWork(final String queueId) throws IOException { 612 return redisExecutor.execute(new RedisCallable<Integer>() { 613 614 @Override 615 public Integer call(Jedis jedis) { 616 for (int n = 0;; n++) { 617 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 618 if (workIdBytes == null) { 619 return n; 620 } 621 } 622 } 623 }).intValue(); 624 } 625 626 /** 627 * Switches a work to state running. 628 * 629 * @param queueId the queue id 630 * @param work the work 631 */ 632 protected void workSetRunning(final String queueId, Work work) throws IOException { 633 final byte[] workId = bytes(work.getId()); 634 final List<byte[]> keys = Arrays.asList(stateKey(), scheduledKey(queueId), runningKey(queueId)); 635 final List<byte[]> args = Arrays.asList(workId, STATE_RUNNING); 636 redisExecutor.execute(new RedisCallable<Void>() { 637 638 @Override 639 public Void call(Jedis jedis) { 640 jedis.evalsha(runningWorkSha.getBytes(), keys, args); 641 if (log.isDebugEnabled()) { 642 log.debug("Mark work as running " + work); 643 } 644 return null; 645 } 646 }); 647 } 648 649 /** 650 * Switches a work to state completed, and saves its new state. 651 */ 652 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 653 final byte[] workId = bytes(work.getId()); 654 final byte[] workData = serializeWork(work); 655 final byte[] state = bytes(((char) STATE_COMPLETED_B) + String.valueOf(work.getCompletionTime())); 656 final List<byte[]> keys = Arrays.asList(dataKey(), stateKey(), runningKey(queueId), completedKey(queueId)); 657 final List<byte[]> args = Arrays.asList(workId, workData, state); 658 redisExecutor.execute(new RedisCallable<Void>() { 659 660 @Override 661 public Void call(Jedis jedis) { 662 jedis.evalsha(completedWorkSha.getBytes(), keys, args); 663 if (log.isDebugEnabled()) { 664 log.debug("Mark work as completed " + work); 665 } 666 return null; 667 } 668 }); 669 } 670 671 /** 672 * Gets the work state. 673 * 674 * @param workId the work id 675 * @return the state, or {@code null} if not found 676 */ 677 protected State getWorkStateInfo(final String workId) { 678 final byte[] workIdBytes = bytes(workId); 679 return redisExecutor.execute(new RedisCallable<State>() { 680 @Override 681 public State call(Jedis jedis) { 682 // get state 683 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 684 if (bytes == null || bytes.length == 0) { 685 return null; 686 } 687 switch (bytes[0]) { 688 case STATE_SCHEDULED_B: 689 return State.SCHEDULED; 690 case STATE_CANCELED_B: 691 return State.CANCELED; 692 case STATE_RUNNING_B: 693 return State.RUNNING; 694 case STATE_COMPLETED_B: 695 return State.COMPLETED; 696 default: 697 String msg; 698 try { 699 msg = new String(bytes, UTF_8); 700 } catch (UnsupportedEncodingException e) { 701 msg = Arrays.toString(bytes); 702 } 703 log.error("Unknown work state: " + msg + ", work: " + workId); 704 return null; 705 } 706 } 707 }); 708 } 709 710 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 711 return redisExecutor.execute(new RedisCallable<List<String>>() { 712 713 @Override 714 public List<String> call(Jedis jedis) { 715 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 716 List<String> list = new ArrayList<String>(keys.size()); 717 for (byte[] workIdBytes : keys) { 718 list.add(string(workIdBytes)); 719 } 720 return list; 721 } 722 723 }); 724 } 725 726 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 727 return redisExecutor.execute(new RedisCallable<List<String>>() { 728 729 @Override 730 public List<String> call(Jedis jedis) { 731 732 Set<byte[]> keys = jedis.smembers(queueBytes); 733 List<String> list = new ArrayList<String>(keys.size()); 734 for (byte[] workIdBytes : keys) { 735 list.add(string(workIdBytes)); 736 } 737 return list; 738 } 739 740 }); 741 742 } 743 744 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 745 return redisExecutor.execute(new RedisCallable<List<Work>>() { 746 @Override 747 public List<Work> call(Jedis jedis) { 748 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 749 List<Work> list = new ArrayList<Work>(keys.size()); 750 for (byte[] workIdBytes : keys) { 751 // get data 752 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 753 Work work = deserializeWork(workBytes); 754 list.add(work); 755 } 756 return list; 757 } 758 }); 759 } 760 761 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 762 return redisExecutor.execute(new RedisCallable<List<Work>>() { 763 @Override 764 public List<Work> call(Jedis jedis) { 765 Set<byte[]> keys = jedis.smembers(queueBytes); 766 List<Work> list = new ArrayList<Work>(keys.size()); 767 for (byte[] workIdBytes : keys) { 768 // get data 769 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 770 Work work = deserializeWork(workBytes); 771 list.add(work); 772 } 773 return list; 774 } 775 }); 776 } 777 778 protected Work getWork(byte[] workIdBytes) { 779 try { 780 return getWorkData(workIdBytes); 781 } catch (IOException e) { 782 throw new RuntimeException(e); 783 } 784 } 785 786 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 787 return redisExecutor.execute(new RedisCallable<Work>() { 788 789 @Override 790 public Work call(Jedis jedis) { 791 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 792 return deserializeWork(workBytes); 793 } 794 795 }); 796 } 797 798 /** 799 * Removes first work from work queue. 800 * 801 * @param queueId the queue id 802 * @return the work, or {@code null} if the scheduled queue is empty 803 */ 804 protected Work getWorkFromQueue(final String queueId) throws IOException { 805 return redisExecutor.execute(new RedisCallable<Work>() { 806 807 @Override 808 public Work call(Jedis jedis) { 809 // pop from queue 810 byte[] workIdBytes = jedis.rpop(queuedKey(queueId)); 811 if (workIdBytes == null) { 812 return null; 813 } 814 // get data 815 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 816 return deserializeWork(workBytes); 817 } 818 819 }); 820 } 821 822 /** 823 * Removes a given work from queue, move the work from scheduled to completed set. 824 * 825 * @throws IOException 826 */ 827 protected Work removeScheduledWork(final String queueId, final String workId) throws IOException { 828 final byte[] workIdBytes = bytes(workId); 829 return redisExecutor.execute(new RedisCallable<Work>() { 830 831 @Override 832 public Work call(Jedis jedis) { 833 // remove from queue 834 Long n = jedis.lrem(queuedKey(queueId), 0, workIdBytes); 835 if (n == null || n.intValue() == 0) { 836 return null; 837 } 838 // remove from set 839 jedis.srem(scheduledKey(queueId), workIdBytes); 840 // set state to completed at current time 841 byte[] completedBytes = bytes(String.valueOf(System.currentTimeMillis())); 842 jedis.hset(stateKey(), workIdBytes, completedBytes); 843 // get data 844 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 845 Work work = deserializeWork(workBytes); 846 log.debug("Remove scheduled " + work); 847 return work; 848 } 849 850 }); 851 } 852 853 /** 854 * Helper to call SSCAN but fall back on a custom implementation based on SMEMBERS if the backend (embedded) does 855 * not support SSCAN. 856 * 857 * @since 7.3 858 */ 859 public static class SScanner { 860 861 // results of SMEMBERS for last key, in embedded mode 862 protected List<String> smembers; 863 864 protected ScanResult<String> sscan(Jedis jedis, String key, String cursor, ScanParams params) { 865 ScanResult<String> scanResult; 866 try { 867 scanResult = jedis.sscan(key, cursor, params); 868 } catch (Exception e) { 869 // when testing with embedded fake redis, we may get an un-declared exception 870 if (!(e.getCause() instanceof NoSuchMethodException)) { 871 throw e; 872 } 873 // no SSCAN available in embedded, do a full SMEMBERS 874 if (smembers == null) { 875 Set<String> set = jedis.smembers(key); 876 smembers = new ArrayList<>(set); 877 } 878 Collection<byte[]> bparams = params.getParams(); 879 int count = 1000; 880 for (Iterator<byte[]> it = bparams.iterator(); it.hasNext();) { 881 byte[] param = it.next(); 882 if (param.equals(Protocol.Keyword.MATCH.raw)) { 883 throw new UnsupportedOperationException("MATCH not supported"); 884 } 885 if (param.equals(Protocol.Keyword.COUNT.raw)) { 886 count = Integer.parseInt(SafeEncoder.encode(it.next())); 887 } 888 } 889 int pos = Integer.parseInt(cursor); // don't check range, callers are cool 890 int end = Math.min(pos + count, smembers.size()); 891 int nextPos = end == smembers.size() ? 0 : end; 892 scanResult = new ScanResult<>(String.valueOf(nextPos), smembers.subList(pos, end)); 893 if (nextPos == 0) { 894 smembers = null; 895 } 896 } 897 return scanResult; 898 } 899 } 900 901 /** 902 * Don't pass more than approx. this number of parameters to Lua calls. 903 */ 904 private static final int BATCH_SIZE = 5000; 905 906 protected void removeAllCompletedWork(final String queueId) throws IOException { 907 removeCompletedWork(queueId, 0); 908 } 909 910 protected void removeCompletedWork(final String queueId, final long completionTime) throws IOException { 911 redisExecutor.execute(new RedisCallable<Void>() { 912 913 @Override 914 public Void call(Jedis jedis) { 915 String completedKey = completedKeyString(queueId); 916 String stateKey = stateKeyString(); 917 String dataKey = dataKeyString(); 918 SScanner sscanner = new SScanner(); 919 ScanParams scanParams = new ScanParams().count(BATCH_SIZE); 920 String cursor = "0"; 921 do { 922 ScanResult<String> scanResult = sscanner.sscan(jedis, completedKey, cursor, scanParams); 923 cursor = scanResult.getStringCursor(); 924 List<String> workIds = scanResult.getResult(); 925 if (!workIds.isEmpty()) { 926 // delete these works if before the completion time 927 List<String> keys = Arrays.asList(completedKey, stateKey, dataKey); 928 List<String> args = new ArrayList<String>(1 + workIds.size()); 929 args.add(String.valueOf(completionTime)); 930 args.addAll(workIds); 931 jedis.evalsha(cleanCompletedWorkSha, keys, args); 932 } 933 } while (!"0".equals(cursor)); 934 return null; 935 } 936 }); 937 } 938 939}