001/* 002 * (C) Copyright 2013-2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.redis.contribs; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.ObjectInputStream; 028import java.io.ObjectOutputStream; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.ecm.core.redis.RedisAdmin; 041import org.nuxeo.ecm.core.redis.RedisExecutor; 042import org.nuxeo.ecm.core.work.NuxeoBlockingQueue; 043import org.nuxeo.ecm.core.work.WorkHolder; 044import org.nuxeo.ecm.core.work.WorkQueuing; 045import org.nuxeo.ecm.core.work.api.Work; 046import org.nuxeo.ecm.core.work.api.Work.State; 047import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 048import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 049import org.nuxeo.runtime.api.Framework; 050 051import redis.clients.jedis.exceptions.JedisException; 052 053/** 054 * Implementation of a {@link WorkQueuing} storing {@link Work} instances in Redis. 055 * 056 * @since 5.8 057 */ 058public class RedisWorkQueuing implements WorkQueuing { 059 060 private static final Log log = LogFactory.getLog(RedisWorkQueuing.class); 061 062 /** 063 * Global hash of Work instance id -> serialized Work instance. 064 */ 065 protected static final String KEY_DATA = "data"; 066 067 /** 068 * Global hash of Work instance id -> Work state. The completed state is followed by a completion time in 069 * milliseconds. 070 */ 071 protected static final String KEY_STATE = "state"; 072 073 /** 074 * Per-queue list of suspended Work instance ids. 075 */ 076 protected static final String KEY_SUSPENDED_PREFIX = "prev"; 077 078 protected static final byte[] KEY_SUSPENDED = KEY_SUSPENDED_PREFIX.getBytes(); 079 080 /** 081 * Per-queue list of scheduled Work instance ids. 082 */ 083 protected static final String KEY_QUEUE_PREFIX = "queue"; 084 085 protected static final byte[] KEY_QUEUE = KEY_QUEUE_PREFIX.getBytes(); 086 087 /** 088 * Per-queue set of scheduled Work instance ids. 089 */ 090 protected static final String KEY_SCHEDULED_PREFIX = "sched"; 091 092 protected static final byte[] KEY_SCHEDULED = KEY_SCHEDULED_PREFIX.getBytes(); 093 094 /** 095 * Per-queue set of running Work instance ids. 096 */ 097 protected static final String KEY_RUNNING_PREFIX = "run"; 098 099 protected static final byte[] KEY_RUNNING = KEY_RUNNING_PREFIX.getBytes(); 100 101 /** 102 * Per-queue set of counters. 103 */ 104 protected static final String KEY_COMPLETED_PREFIX = "done"; 105 106 protected static final byte[] KEY_COMPLETED = KEY_COMPLETED_PREFIX.getBytes(); 107 108 protected static final String KEY_CANCELED_PREFIX = "cancel"; 109 110 protected static final byte[] KEY_CANCELED = KEY_CANCELED_PREFIX.getBytes(); 111 112 protected static final String KEY_COUNT_PREFIX = "count"; 113 114 protected static final byte STATE_SCHEDULED_B = 'Q'; 115 116 protected static final byte STATE_RUNNING_B = 'R'; 117 118 protected static final byte STATE_RUNNING_C = 'C'; 119 120 protected static final byte[] STATE_SCHEDULED = new byte[] { STATE_SCHEDULED_B }; 121 122 protected static final byte[] STATE_RUNNING = new byte[] { STATE_RUNNING_B }; 123 124 protected static final byte[] STATE_UNKNOWN = new byte[0]; 125 126 protected Listener listener; 127 128 protected final Map<String, NuxeoBlockingQueue> allQueued = new HashMap<>(); 129 130 protected String redisNamespace; 131 132 // lua scripts 133 protected byte[] initWorkQueueSha; 134 135 protected byte[] metricsWorkQueueSha; 136 137 protected byte[] schedulingWorkSha; 138 139 protected byte[] popWorkSha; 140 141 protected byte[] runningWorkSha; 142 143 protected byte[] cancelledScheduledWorkSha; 144 145 protected byte[] completedWorkSha; 146 147 protected byte[] cancelledRunningWorkSha; 148 149 public RedisWorkQueuing(Listener listener) { 150 this.listener = listener; 151 loadConfig(); 152 } 153 154 void loadConfig() { 155 RedisAdmin admin = Framework.getService(RedisAdmin.class); 156 redisNamespace = admin.namespace("work"); 157 try { 158 initWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "init-work-queue").getBytes(); 159 metricsWorkQueueSha = admin.load("org.nuxeo.ecm.core.redis", "metrics-work-queue").getBytes(); 160 schedulingWorkSha = admin.load("org.nuxeo.ecm.core.redis", "scheduling-work").getBytes(); 161 popWorkSha = admin.load("org.nuxeo.ecm.core.redis", "pop-work").getBytes(); 162 runningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "running-work").getBytes(); 163 cancelledScheduledWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-scheduled-work").getBytes(); 164 completedWorkSha = admin.load("org.nuxeo.ecm.core.redis", "completed-work").getBytes(); 165 cancelledRunningWorkSha = admin.load("org.nuxeo.ecm.core.redis", "cancelled-running-work").getBytes(); 166 } catch (IOException e) { 167 throw new RuntimeException("Cannot load LUA scripts", e); 168 } 169 } 170 171 @Override 172 public NuxeoBlockingQueue init(WorkQueueDescriptor config) { 173 evalSha(initWorkQueueSha, keys(config.id), Collections.emptyList()); 174 RedisBlockingQueue queue = new RedisBlockingQueue(config.id, this); 175 allQueued.put(config.id, queue); 176 return queue; 177 } 178 179 @Override 180 public NuxeoBlockingQueue getQueue(String queueId) { 181 return allQueued.get(queueId); 182 } 183 184 @Override 185 public void workSchedule(String queueId, Work work) { 186 getQueue(queueId).offer(new WorkHolder(work)); 187 } 188 189 @Override 190 public void workRunning(String queueId, Work work) { 191 try { 192 workSetRunning(queueId, work); 193 } catch (IOException e) { 194 throw new RuntimeException(e); 195 } 196 } 197 198 @Override 199 public void workCanceled(String queueId, Work work) { 200 try { 201 workSetCancelledScheduled(queueId, work); 202 } catch (IOException e) { 203 throw new RuntimeException(e); 204 } 205 } 206 207 @Override 208 public void workCompleted(String queueId, Work work) { 209 try { 210 workSetCompleted(queueId, work); 211 } catch (IOException e) { 212 throw new RuntimeException(e); 213 } 214 } 215 216 @Override 217 public void workReschedule(String queueId, Work work) { 218 try { 219 workSetReschedule(queueId, work); 220 } catch (IOException e) { 221 throw new RuntimeException(e); 222 } 223 } 224 225 @Override 226 public List<Work> listWork(String queueId, State state) { 227 switch (state) { 228 case SCHEDULED: 229 return listScheduled(queueId); 230 case RUNNING: 231 return listRunning(queueId); 232 default: 233 throw new IllegalArgumentException(String.valueOf(state)); 234 } 235 } 236 237 @Override 238 public List<String> listWorkIds(String queueId, State state) { 239 if (state == null) { 240 return listNonCompletedIds(queueId); 241 } 242 switch (state) { 243 case SCHEDULED: 244 return listScheduledIds(queueId); 245 case RUNNING: 246 return listRunningIds(queueId); 247 default: 248 throw new IllegalArgumentException(String.valueOf(state)); 249 } 250 } 251 252 protected List<Work> listScheduled(String queueId) { 253 try { 254 return listWorkList(queuedKey(queueId)); 255 } catch (IOException e) { 256 throw new RuntimeException(e); 257 } 258 } 259 260 protected List<Work> listRunning(String queueId) { 261 try { 262 return listWorkSet(runningKey(queueId)); 263 } catch (IOException e) { 264 throw new RuntimeException(e); 265 } 266 } 267 268 protected List<String> listScheduledIds(String queueId) { 269 try { 270 return listWorkIdsList(queuedKey(queueId)); 271 } catch (IOException e) { 272 throw new RuntimeException(e); 273 } 274 } 275 276 protected List<String> listRunningIds(String queueId) { 277 try { 278 return listWorkIdsSet(runningKey(queueId)); 279 } catch (IOException e) { 280 throw new RuntimeException(e); 281 } 282 } 283 284 protected List<String> listNonCompletedIds(String queueId) { 285 List<String> list = listScheduledIds(queueId); 286 list.addAll(listRunningIds(queueId)); 287 return list; 288 } 289 290 @Override 291 public long count(String queueId, State state) { 292 switch (state) { 293 case SCHEDULED: 294 return metrics(queueId).scheduled.longValue(); 295 case RUNNING: 296 return metrics(queueId).running.longValue(); 297 default: 298 throw new IllegalArgumentException(String.valueOf(state)); 299 } 300 } 301 302 @Override 303 public Work find(String workId, State state) { 304 if (isWorkInState(workId, state)) { 305 return getWork(bytes(workId)); 306 } 307 return null; 308 } 309 310 @Override 311 public boolean isWorkInState(String workId, State state) { 312 State s = getWorkState(workId); 313 if (state == null) { 314 return s == State.SCHEDULED || s == State.RUNNING; 315 } 316 return s == state; 317 } 318 319 @Override 320 public void removeScheduled(String queueId, String workId) { 321 try { 322 removeScheduledWork(queueId, workId); 323 } catch (IOException cause) { 324 throw new RuntimeException("Cannot remove scheduled work " + workId + " from " + queueId, cause); 325 } 326 } 327 328 @Override 329 public State getWorkState(String workId) { 330 return getWorkStateInfo(workId); 331 } 332 333 @Override 334 public void setActive(String queueId, boolean value) { 335 WorkQueueMetrics metrics = getQueue(queueId).setActive(value); 336 if (value) { 337 listener.queueActivated(metrics); 338 } else { 339 listener.queueDeactivated(metrics); 340 } 341 } 342 343 /* 344 * ******************** Redis Interface ******************** 345 */ 346 347 protected String string(byte[] bytes) { 348 return new String(bytes, UTF_8); 349 } 350 351 protected byte[] bytes(String string) { 352 return string.getBytes(UTF_8); 353 } 354 355 protected byte[] bytes(Work.State state) { 356 switch (state) { 357 case SCHEDULED: 358 return STATE_SCHEDULED; 359 case RUNNING: 360 return STATE_RUNNING; 361 default: 362 return STATE_UNKNOWN; 363 } 364 } 365 366 protected static String key(String... names) { 367 return String.join(":", names); 368 } 369 370 protected byte[] keyBytes(String prefix, String queueId) { 371 return keyBytes(key(prefix, queueId)); 372 } 373 374 protected byte[] keyBytes(String prefix) { 375 return bytes(redisNamespace.concat(prefix)); 376 } 377 378 protected byte[] workId(Work work) { 379 return workId(work.getId()); 380 } 381 382 protected byte[] workId(String id) { 383 return bytes(id); 384 } 385 386 protected byte[] suspendedKey(String queueId) { 387 return keyBytes(key(KEY_SUSPENDED_PREFIX, queueId)); 388 } 389 390 protected byte[] queuedKey(String queueId) { 391 return keyBytes(key(KEY_QUEUE_PREFIX, queueId)); 392 } 393 394 protected byte[] countKey(String queueId) { 395 return keyBytes(key(KEY_COUNT_PREFIX, queueId)); 396 } 397 398 protected byte[] runningKey(String queueId) { 399 return keyBytes(key(KEY_RUNNING_PREFIX, queueId)); 400 } 401 402 protected byte[] scheduledKey(String queueId) { 403 return keyBytes(key(KEY_SCHEDULED_PREFIX, queueId)); 404 } 405 406 protected byte[] completedKey(String queueId) { 407 return keyBytes(key(KEY_COMPLETED_PREFIX, queueId)); 408 } 409 410 protected byte[] canceledKey(String queueId) { 411 return keyBytes(key(KEY_CANCELED_PREFIX, queueId)); 412 } 413 414 protected byte[] stateKey() { 415 return keyBytes(KEY_STATE); 416 } 417 418 protected byte[] dataKey() { 419 return keyBytes(KEY_DATA); 420 } 421 422 protected byte[] serializeWork(Work work) throws IOException { 423 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 424 ObjectOutputStream out = new ObjectOutputStream(baout); 425 out.writeObject(work); 426 out.flush(); 427 out.close(); 428 return baout.toByteArray(); 429 } 430 431 protected Work deserializeWork(byte[] workBytes) { 432 if (workBytes == null) { 433 return null; 434 } 435 InputStream bain = new ByteArrayInputStream(workBytes); 436 try (ObjectInputStream in = new ObjectInputStream(bain)) { 437 return (Work) in.readObject(); 438 } catch (RuntimeException cause) { 439 throw cause; 440 } catch (IOException | ClassNotFoundException cause) { 441 throw new RuntimeException("Cannot deserialize work", cause); 442 } 443 } 444 445 /** 446 * Finds which queues have suspended work. 447 * 448 * @return a set of queue ids 449 * @since 5.8 450 */ 451 protected Set<String> getSuspendedQueueIds() throws IOException { 452 return getQueueIds(KEY_SUSPENDED_PREFIX); 453 } 454 455 protected Set<String> getScheduledQueueIds() { 456 return getQueueIds(KEY_QUEUE_PREFIX); 457 } 458 459 protected Set<String> getRunningQueueIds() { 460 return getQueueIds(KEY_RUNNING_PREFIX); 461 } 462 463 /** 464 * Finds which queues have work for a given state prefix. 465 * 466 * @return a set of queue ids 467 * @since 5.8 468 */ 469 protected Set<String> getQueueIds(final String queuePrefix) { 470 return Framework.getService(RedisExecutor.class).execute(jedis -> { 471 int offset = keyBytes(queuePrefix).length; 472 Set<byte[]> keys = jedis.keys(keyBytes(key(queuePrefix, "*"))); 473 Set<String> queueIds = new HashSet<>(keys.size()); 474 for (byte[] bytes : keys) { 475 String queueId = new String(bytes, offset, bytes.length - offset, UTF_8); 476 queueIds.add(queueId); 477 } 478 return queueIds; 479 }); 480 } 481 482 /** 483 * Resumes all suspended work instances by moving them to the scheduled queue. 484 * 485 * @param queueId the queue id 486 * @return the number of work instances scheduled 487 * @since 5.8 488 */ 489 public int scheduleSuspendedWork(final String queueId) throws IOException { 490 return Framework.getService(RedisExecutor.class).execute(jedis -> { 491 for (int n = 0;; n++) { 492 byte[] workIdBytes = jedis.rpoplpush(suspendedKey(queueId), queuedKey(queueId)); 493 if (workIdBytes == null) { 494 return Integer.valueOf(n); 495 } 496 } 497 }).intValue(); 498 } 499 500 /** 501 * Suspends all scheduled work instances by moving them to the suspended queue. 502 * 503 * @param queueId the queue id 504 * @return the number of work instances suspended 505 * @since 5.8 506 */ 507 public int suspendScheduledWork(final String queueId) throws IOException { 508 return Framework.getService(RedisExecutor.class).execute(jedis -> { 509 for (int n = 0;; n++) { 510 byte[] workIdBytes = jedis.rpoplpush(queuedKey(queueId), suspendedKey(queueId)); 511 if (workIdBytes == null) { 512 return n; 513 } 514 } 515 }).intValue(); 516 } 517 518 @Override 519 public WorkQueueMetrics metrics(String queueId) { 520 return metrics(queueId, evalSha(metricsWorkQueueSha, keys(queueId), Collections.emptyList())); 521 } 522 523 WorkQueueMetrics metrics(String queueId, Number[] counters) { 524 return new WorkQueueMetrics(queueId, counters[0], counters[1], counters[2], counters[3]); 525 } 526 527 /** 528 * Persists a work instance and adds it to the scheduled queue. 529 * 530 * @param queueId the queue id 531 * @param work the work instance 532 */ 533 public void workSetScheduled(final String queueId, Work work) throws IOException { 534 listener.queueChanged(work, metrics(queueId, evalSha(schedulingWorkSha, keys(queueId), args(work, true)))); 535 } 536 537 /** 538 * Switches a work to state completed, and saves its new state. 539 */ 540 protected void workSetCancelledScheduled(final String queueId, final Work work) throws IOException { 541 listener.queueChanged(work, 542 metrics(queueId, evalSha(cancelledScheduledWorkSha, keys(queueId), args(work, true)))); 543 } 544 545 /** 546 * Switches a work to state running. 547 * 548 * @param queueId the queue id 549 * @param work the work 550 */ 551 protected void workSetRunning(final String queueId, Work work) throws IOException { 552 listener.queueChanged(work, metrics(queueId, evalSha(runningWorkSha, keys(queueId), args(work, true)))); 553 } 554 555 /** 556 * Switches a work to state completed, and saves its new state. 557 */ 558 protected void workSetCompleted(final String queueId, final Work work) throws IOException { 559 listener.queueChanged(work, metrics(queueId, evalSha(completedWorkSha, keys(queueId), args(work, false)))); 560 } 561 562 /** 563 * Switches a work to state canceled, and saves its new state. 564 */ 565 protected void workSetReschedule(final String queueId, final Work work) throws IOException { 566 listener.queueChanged(work, 567 metrics(queueId, evalSha(cancelledRunningWorkSha, keys(queueId), args(work, true)))); 568 } 569 570 protected List<byte[]> keys(String queueid) { 571 return Arrays.asList(dataKey(), stateKey(), countKey(queueid), scheduledKey(queueid), queuedKey(queueid), 572 runningKey(queueid), completedKey(queueid), canceledKey(queueid)); 573 } 574 575 protected List<byte[]> args(String workId) throws IOException { 576 return Arrays.asList(workId(workId)); 577 } 578 579 protected List<byte[]> args(Work work, boolean serialize) throws IOException { 580 List<byte[]> args = Arrays.asList(workId(work), bytes(work.getWorkInstanceState())); 581 if (serialize) { 582 args = new ArrayList<>(args); 583 args.add(serializeWork(work)); 584 } 585 return args; 586 } 587 588 /** 589 * Gets the work state. 590 * 591 * @param workId the work id 592 * @return the state, or {@code null} if not found 593 */ 594 protected State getWorkStateInfo(final String workId) { 595 final byte[] workIdBytes = bytes(workId); 596 return Framework.getService(RedisExecutor.class).execute(jedis -> { 597 // get state 598 byte[] bytes = jedis.hget(stateKey(), workIdBytes); 599 if (bytes == null || bytes.length == 0) { 600 return null; 601 } 602 switch (bytes[0]) { 603 case STATE_SCHEDULED_B: 604 return State.SCHEDULED; 605 case STATE_RUNNING_B: 606 return State.RUNNING; 607 default: 608 String msg = new String(bytes, UTF_8); 609 log.error("Unknown work state: " + msg + ", work: " + workId); 610 return null; 611 } 612 }); 613 } 614 615 protected List<String> listWorkIdsList(final byte[] queueBytes) throws IOException { 616 return Framework.getService(RedisExecutor.class).execute(jedis -> { 617 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 618 List<String> list = new ArrayList<>(keys.size()); 619 for (byte[] workIdBytes : keys) { 620 list.add(string(workIdBytes)); 621 } 622 return list; 623 }); 624 } 625 626 protected List<String> listWorkIdsSet(final byte[] queueBytes) throws IOException { 627 return Framework.getService(RedisExecutor.class).execute(jedis -> { 628 629 Set<byte[]> keys = jedis.smembers(queueBytes); 630 List<String> list = new ArrayList<>(keys.size()); 631 for (byte[] workIdBytes : keys) { 632 list.add(string(workIdBytes)); 633 } 634 return list; 635 }); 636 637 } 638 639 protected List<Work> listWorkList(final byte[] queueBytes) throws IOException { 640 return Framework.getService(RedisExecutor.class).execute(jedis -> { 641 List<byte[]> keys = jedis.lrange(queueBytes, 0, -1); 642 List<Work> list = new ArrayList<>(keys.size()); 643 for (byte[] workIdBytes : keys) { 644 // get data 645 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 646 Work work = deserializeWork(workBytes); 647 list.add(work); 648 } 649 return list; 650 }); 651 } 652 653 protected List<Work> listWorkSet(final byte[] queueBytes) throws IOException { 654 return Framework.getService(RedisExecutor.class).execute(jedis -> { 655 Set<byte[]> keys = jedis.smembers(queueBytes); 656 List<Work> list = new ArrayList<>(keys.size()); 657 for (byte[] workIdBytes : keys) { 658 // get data 659 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 660 Work work = deserializeWork(workBytes); 661 list.add(work); 662 } 663 return list; 664 }); 665 } 666 667 protected Work getWork(byte[] workIdBytes) { 668 try { 669 return getWorkData(workIdBytes); 670 } catch (IOException e) { 671 throw new RuntimeException(e); 672 } 673 } 674 675 protected Work getWorkData(final byte[] workIdBytes) throws IOException { 676 return Framework.getService(RedisExecutor.class).execute(jedis -> { 677 byte[] workBytes = jedis.hget(dataKey(), workIdBytes); 678 return deserializeWork(workBytes); 679 }); 680 } 681 682 /** 683 * Removes first work from work queue. 684 * 685 * @param queueId the queue id 686 * @return the work, or {@code null} if the scheduled queue is empty 687 */ 688 protected Work getWorkFromQueue(final String queueId) throws IOException { 689 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 690 List<byte[]> keys = keys(queueId); 691 List<byte[]> args = Collections.singletonList(STATE_RUNNING); 692 List<?> result = (List<?>) redisExecutor.evalsha(popWorkSha, keys, args); 693 if (result == null) { 694 return null; 695 } 696 697 List<Number> numbers = (List<Number>) result.get(0); 698 WorkQueueMetrics metrics = metrics(queueId, coerceNullToZero(numbers)); 699 Object bytes = result.get(1); 700 if (bytes instanceof String) { 701 bytes = bytes((String) bytes); 702 } 703 Work work = deserializeWork((byte[]) bytes); 704 705 listener.queueChanged(work, metrics); 706 707 return work; 708 } 709 710 /** 711 * Removes a given work from queue, move the work from scheduled to completed set. 712 */ 713 protected void removeScheduledWork(final String queueId, final String workId) throws IOException { 714 evalSha(cancelledScheduledWorkSha, keys(queueId), args(workId)); 715 } 716 717 Number[] evalSha(byte[] sha, List<byte[]> keys, List<byte[]> args) throws JedisException { 718 RedisExecutor redisExecutor = Framework.getService(RedisExecutor.class); 719 List<Number> numbers = (List<Number>) redisExecutor.evalsha(sha, keys, args); 720 return coerceNullToZero(numbers); 721 } 722 723 protected static Number[] coerceNullToZero(List<Number> numbers) { 724 return coerceNullToZero(numbers.toArray(new Number[numbers.size()])); 725 } 726 727 protected static Number[] coerceNullToZero(Number[] counters) { 728 for (int i = 0; i < counters.length; ++i) { 729 if (counters[i] == null) { 730 counters[i] = 0; 731 } 732 } 733 return counters; 734 } 735 736 @Override 737 public void listen(Listener listener) { 738 this.listener = listener; 739 740 } 741 742 @Override 743 public boolean supportsProcessingDisabling() { 744 return true; 745 } 746 747}