001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static java.lang.Math.min; 022import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.PREFIX_OPTION; 023import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.STORE_NAME_OPTION; 024import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.STORE_TTL_OPTION; 025import static org.nuxeo.ecm.core.work.BaseOverflowRecordFilter.THRESHOLD_SIZE_OPTION; 026import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED; 027import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 028 029import java.time.Duration; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.TimeUnit; 035import java.util.function.Predicate; 036 037import javax.naming.NamingException; 038import javax.transaction.RollbackException; 039import javax.transaction.Status; 040import javax.transaction.Synchronization; 041import javax.transaction.SystemException; 042import javax.transaction.Transaction; 043import javax.transaction.TransactionManager; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.ecm.core.event.EventServiceComponent; 048import org.nuxeo.ecm.core.work.api.Work; 049import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 050import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 051import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 052import org.nuxeo.lib.stream.codec.Codec; 053import org.nuxeo.lib.stream.computation.ComputationPolicy; 054import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder; 055import org.nuxeo.lib.stream.computation.Record; 056import org.nuxeo.lib.stream.computation.RecordFilter; 057import org.nuxeo.lib.stream.computation.RecordFilterChain; 058import org.nuxeo.lib.stream.computation.Settings; 059import org.nuxeo.lib.stream.computation.StreamManager; 060import org.nuxeo.lib.stream.computation.StreamProcessor; 061import org.nuxeo.lib.stream.computation.Topology; 062import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl; 063import org.nuxeo.lib.stream.log.LogLag; 064import org.nuxeo.lib.stream.log.LogManager; 065import org.nuxeo.lib.stream.log.LogOffset; 066import org.nuxeo.lib.stream.log.Name; 067import org.nuxeo.runtime.api.Framework; 068import org.nuxeo.runtime.codec.CodecService; 069import org.nuxeo.runtime.metrics.NuxeoMetricSet; 070import org.nuxeo.runtime.model.ComponentContext; 071import org.nuxeo.runtime.model.ComponentManager; 072import org.nuxeo.runtime.model.Descriptor; 073import org.nuxeo.runtime.services.config.ConfigurationService; 074import org.nuxeo.runtime.stream.StreamService; 075import org.nuxeo.runtime.transaction.TransactionHelper; 076 077import io.dropwizard.metrics5.MetricName; 078 079/** 080 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed 081 * for performance reason. 082 * 083 * @since 9.3 084 */ 085public class StreamWorkManager extends WorkManagerImpl { 086 087 protected static final Log log = LogFactory.getLog(StreamWorkManager.class); 088 089 public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config"; 090 091 public static final String DEFAULT_WORK_LOG_CONFIG = "work"; 092 093 public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec"; 094 095 public static final String DEFAULT_WORK_CODEC = "legacy"; 096 097 public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor"; 098 099 public static final String DEFAULT_WORK_OVER_PROVISIONING = "3"; 100 101 public static final int DEFAULT_CONCURRENCY = 4; 102 103 // @since 11.1 104 protected WorkQueueMetrics lastMetrics; 105 106 protected long lastMetricTime; 107 108 protected long CACHE_LAST_METRIC_DURATION_MS = 1000; 109 110 // @since 11.1 111 public static final String NAMESPACE_PREFIX = "work/"; 112 113 /** 114 * @since 10.2 115 */ 116 public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds"; 117 118 /** 119 * @since 10.2 120 */ 121 public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled"; 122 123 /** 124 * @since 10.2 125 */ 126 public static final long STATETTL_DEFAULT_VALUE = 3600; 127 128 /** 129 * @since 11.1 130 */ 131 public static final String COMPUTATION_FILTER_CLASS_KEY = "nuxeo.stream.work.computation.filter.class"; 132 133 /** 134 * @since 11.1 135 */ 136 public static final String COMPUTATION_FILTER_STORE_KEY = "nuxeo.stream.work.computation.filter.storeName"; 137 138 /** 139 * @since 11.1 140 */ 141 public static final String COMPUTATION_FILTER_STORE_TTL_KEY = "nuxeo.stream.work.computation.filter.storeTTL"; 142 143 /** 144 * @since 11.1 145 */ 146 public static final String COMPUTATION_FILTER_THRESHOLD_SIZE_KEY = "nuxeo.stream.work.computation.filter.thresholdSize"; 147 148 /** 149 * @since 11.1 150 */ 151 public static final String COMPUTATION_FILTER_PREFIX_KEY = "nuxeo.stream.work.computation.filter.storeKeyPrefix"; 152 153 protected Topology topology; 154 155 protected Topology topologyDisabled; 156 157 protected Settings settings; 158 159 protected StreamProcessor streamProcessor; 160 161 protected LogManager logManager; 162 163 protected StreamManager streamManager; 164 165 protected boolean storeState; 166 167 protected long stateTTL; 168 169 protected int getOverProvisioningFactor() { 170 // Enable over provisioning only if the log can be distributed 171 if (getLogManager().supportSubscribe()) { 172 return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING)); 173 } 174 return 1; 175 } 176 177 protected String getCodecName() { 178 return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC); 179 } 180 181 protected Codec<Record> getCodec() { 182 return Framework.getService(CodecService.class).getCodec(getCodecName(), Record.class); 183 } 184 185 @Override 186 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 187 String queueId = getCategoryQueueId(work.getCategory()); 188 if (log.isDebugEnabled()) { 189 log.debug(String.format( 190 "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", 191 work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work)); 192 } 193 if (!isQueuingEnabled(queueId)) { 194 log.info("Queue disabled, scheduling canceled: " + queueId); 195 return; 196 } 197 if (CANCEL_SCHEDULED.equals(scheduling)) { 198 if (storeState) { 199 if (WorkStateHelper.getState(work.getId()) != null) { 200 WorkStateHelper.setCanceled(work.getId()); 201 } 202 } else { 203 log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s", 204 STORESTATE_KEY, work)); 205 } 206 return; 207 } 208 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 209 return; 210 } 211 WorkSchedulePath.newInstance(work); 212 String key = work.getPartitionKey(); 213 LogOffset offset; 214 try { 215 offset = streamManager.append(NAMESPACE_PREFIX + queueId, Record.of(key, WorkComputation.serialize(work))); 216 } catch (IllegalArgumentException e) { 217 log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), 218 NAMESPACE_PREFIX + queueId)); 219 return; 220 } 221 if (work.isCoalescing()) { 222 WorkStateHelper.setLastOffset(work.getId(), offset.offset(), stateTTL); 223 } 224 if (work.isGroupJoin()) { 225 if (log.isDebugEnabled()) { 226 log.debug(String.format("Submit Work: %s to GroupJoin: %s, offset: %s", work.getId(), 227 work.getPartitionKey(), offset)); 228 } 229 WorkStateHelper.addGroupJoinWork(work.getPartitionKey()); 230 } 231 if (storeState) { 232 WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, stateTTL); 233 } 234 } 235 236 @Override 237 public int getApplicationStartedOrder() { 238 // start before the WorkManagerImpl 239 return EventServiceComponent.APPLICATION_STARTED_ORDER - 2; 240 } 241 242 @Override 243 public void start(ComponentContext context) { 244 super.start(context); 245 ConfigurationService configuration = Framework.getService(ConfigurationService.class); 246 storeState = configuration.isBooleanTrue(STORESTATE_KEY); 247 stateTTL = configuration.getLong(STATETTL_KEY, STATETTL_DEFAULT_VALUE); 248 } 249 250 protected RecordFilterChain getRecordFilter() { 251 String filterClass = getRecordFilterClass(); 252 if (filterClass == null) { 253 return null; 254 } 255 RecordFilterChain filter = new RecordFilterChainImpl(); 256 Class<? extends RecordFilter> klass; 257 try { 258 klass = (Class<RecordFilter>) Class.forName(filterClass); 259 if (!RecordFilter.class.isAssignableFrom(klass)) { 260 throw new IllegalArgumentException("Invalid class for RecordFilter: " + filterClass); 261 } 262 RecordFilter ret = klass.getDeclaredConstructor().newInstance(); 263 ret.init(getRecordFilterOptions()); 264 filter.addFilter(ret); 265 } catch (ReflectiveOperationException e) { 266 throw new IllegalArgumentException("Invalid class for RecordFilter: " + filterClass, e); 267 } 268 return filter; 269 } 270 271 protected Map<String, String> getRecordFilterOptions() { 272 Map<String, String> ret = new HashMap<>(); 273 ConfigurationService configuration = Framework.getService(ConfigurationService.class); 274 configuration.getString(COMPUTATION_FILTER_STORE_KEY).ifPresent(value -> ret.put(STORE_NAME_OPTION, value)); 275 configuration.getString(COMPUTATION_FILTER_PREFIX_KEY).ifPresent(value -> ret.put(PREFIX_OPTION, value)); 276 configuration.getInteger(COMPUTATION_FILTER_THRESHOLD_SIZE_KEY) 277 .ifPresent(value -> ret.put(THRESHOLD_SIZE_OPTION, value.toString())); 278 configuration.getString(COMPUTATION_FILTER_STORE_TTL_KEY).ifPresent(value -> ret.put(STORE_TTL_OPTION, value)); 279 return ret; 280 } 281 282 protected String getRecordFilterClass() { 283 ConfigurationService configuration = Framework.getService(ConfigurationService.class); 284 return configuration.getString(COMPUTATION_FILTER_CLASS_KEY).orElse(null); 285 } 286 287 @Override 288 public void init() { 289 if (started) { 290 return; 291 } 292 WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service"); 293 wmi.active = false; 294 log.debug("Initializing"); 295 synchronized (this) { 296 if (started) { 297 return; 298 } 299 getDescriptors(QUEUES_EP).forEach(d -> categoryToQueueId.put(d.getId(), d.getId())); 300 index(); 301 initTopology(); 302 logManager = getLogManager(); 303 streamManager = getStreamManager(); 304 streamManager.register("StreamWorkManagerDisable", topologyDisabled, settings); 305 streamProcessor = streamManager.registerAndCreateProcessor("StreamWorkManager", topology, settings); 306 started = true; 307 new ComponentListener().install(); 308 log.info("Initialized"); 309 } 310 } 311 312 class ComponentListener implements ComponentManager.Listener { 313 @Override 314 public void beforeStop(ComponentManager mgr, boolean isStandby) { 315 if (!shutdown(10, TimeUnit.SECONDS)) { 316 log.error("Some processors are still active"); 317 } 318 } 319 320 @Override 321 public void afterStart(ComponentManager mgr, boolean isResume) { 322 if (isProcessingDisabled()) { 323 log.warn("WorkManager processing has been disabled on this node"); 324 return; 325 } 326 streamProcessor.start(); 327 for (Descriptor d : getDescriptors(QUEUES_EP)) { 328 activateQueueMetrics(d.getId()); 329 } 330 } 331 332 @Override 333 public void afterStop(ComponentManager mgr, boolean isStandby) { 334 Framework.getRuntime().getComponentManager().removeListener(this); 335 for (Descriptor d : getDescriptors(QUEUES_EP)) { 336 deactivateQueueMetrics(d.getId()); 337 } 338 } 339 } 340 341 protected LogManager getLogManager() { 342 String config = getLogConfig(); 343 log.info("Init StreamWorkManager with Log configuration: " + config); 344 StreamService service = Framework.getService(StreamService.class); 345 return service.getLogManager(); 346 } 347 348 protected StreamManager getStreamManager() { 349 StreamService service = Framework.getService(StreamService.class); 350 return service.getStreamManager(); 351 } 352 353 protected String getLogConfig() { 354 return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG); 355 } 356 357 @Override 358 public boolean isProcessingEnabled(String queueId) { 359 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 360 return wqd != null && wqd.isProcessingEnabled(); 361 } 362 363 protected void initTopology() { 364 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 365 // create the single topology with one root per work pool 366 Topology.Builder builder = Topology.builder(); 367 descriptors.stream().filter(WorkQueueDescriptor::isProcessingEnabled).forEach(d -> builder.addComputation( 368 () -> new WorkComputation(NAMESPACE_PREFIX + d.getId()), 369 Collections.singletonList(INPUT_1 + ":" + NAMESPACE_PREFIX + d.getId()))); 370 topology = builder.build(); 371 // create a topology for the disabled work pools in order to init their input streams 372 Topology.Builder builderDisabled = Topology.builder(); 373 descriptors.stream() 374 .filter(Predicate.not(WorkQueueDescriptor::isProcessingEnabled)) 375 .forEach(d -> builderDisabled.addComputation(() -> new WorkComputation(d.getId()), 376 Collections.singletonList(INPUT_1 + ":" + NAMESPACE_PREFIX + d.getId()))); 377 topologyDisabled = builderDisabled.build(); 378 // The retry policy is handled at AbstractWork level, but we want to skip failure 379 ComputationPolicy policy = new ComputationPolicyBuilder().continueOnFailure(true).build(); 380 RecordFilterChain filter = getRecordFilter(); 381 settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY), getCodec(), policy, filter); 382 descriptors.forEach(item -> settings.setConcurrency(item.getId(), item.getMaxThreads())); 383 descriptors.forEach(item -> settings.setPartitions(item.getId(), getPartitions(item.getMaxThreads()))); 384 } 385 386 protected int getPartitions(int maxThreads) { 387 if (maxThreads == 1) { 388 // when the pool size is one the we don't want any concurrency 389 return 1; 390 } 391 return getOverProvisioningFactor() * maxThreads; 392 } 393 394 public class WorkScheduling implements Synchronization { 395 public final Work work; 396 397 public final Scheduling scheduling; 398 399 public WorkScheduling(Work work, Scheduling scheduling) { 400 this.work = work; 401 this.scheduling = scheduling; 402 } 403 404 @Override 405 public void beforeCompletion() { 406 } 407 408 @Override 409 public void afterCompletion(int status) { 410 if (status == Status.STATUS_COMMITTED) { 411 StreamWorkManager.this.schedule(work, scheduling, false); 412 } else { 413 if (status != Status.STATUS_ROLLEDBACK) { 414 throw new IllegalArgumentException("Unsupported transaction status " + status); 415 } 416 } 417 418 } 419 } 420 421 @Override 422 void activateQueue(WorkQueueDescriptor config) { 423 // queue processing is activated only from component listener afterStart 424 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 425 throw new IllegalArgumentException("cannot activate all queues"); 426 } 427 log.info("Activated queue " + config.id + " " + config.toString()); 428 if (config.isProcessingEnabled()) { 429 activateQueueMetrics(config.id); 430 } 431 } 432 433 @Override 434 void deactivateQueue(WorkQueueDescriptor config) { 435 // queue processing is deactivated only on shutdown 436 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 437 throw new IllegalArgumentException("cannot deactivate all queues"); 438 } 439 if (config.isProcessingEnabled()) { 440 deactivateQueueMetrics(config.id); 441 } 442 log.info("Deactivated work queue not supported: " + config.id); 443 } 444 445 @Override 446 protected void activateQueueMetrics(String queueId) { 447 NuxeoMetricSet queueMetrics = new NuxeoMetricSet(MetricName.build("nuxeo.works.global.queue").tagged("queue", queueId)); 448 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled"); 449 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running"); 450 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed"); 451 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled"); 452 registry.registerAll(queueMetrics); 453 } 454 455 @Override 456 protected void deactivateQueueMetrics(String queueId) { 457 String queueMetricsName = MetricName.build("nuxeo.works.global.queue").tagged("queue", queueId).getKey(); 458 registry.removeMatching((name, metric) -> name.getKey().startsWith(queueMetricsName)); 459 } 460 461 @Override 462 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) { 463 log.warn("Shutdown a queue is not supported with computation implementation"); 464 return false; 465 } 466 467 @Override 468 public boolean shutdown(long timeout, TimeUnit timeUnit) { 469 log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"); 470 shutdownInProgress = true; 471 try { 472 long shutdownDelay = Framework.getService(ConfigurationService.class).getLong(SHUTDOWN_DELAY_MS_KEY, 0); 473 boolean ret = streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay))); 474 if (!ret) { 475 log.error("Not able to stop worker pool within the timeout."); 476 } 477 return ret; 478 } finally { 479 shutdownInProgress = false; 480 } 481 } 482 483 @Override 484 public int getQueueSize(String queueId, Work.State state) { 485 switch (state) { 486 case SCHEDULED: 487 return getMetrics(queueId).getScheduled().intValue(); 488 case RUNNING: 489 return getMetrics(queueId).getRunning().intValue(); 490 default: 491 return 0; 492 } 493 } 494 495 protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) { 496 long now = System.currentTimeMillis(); 497 if (lastMetrics != null && lastMetrics.queueId == queueId 498 && (now - lastMetricTime) < CACHE_LAST_METRIC_DURATION_MS) { 499 return lastMetrics; 500 } 501 // JMX threads have distinct class loader that need to be changed to get metrics 502 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 503 try { 504 Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader()); 505 lastMetrics = getMetrics(queueId); 506 lastMetricTime = System.currentTimeMillis(); 507 return lastMetrics; 508 } finally { 509 Thread.currentThread().setContextClassLoader(classLoader); 510 } 511 } 512 513 @Override 514 public WorkQueueMetrics getMetrics(String queueId) { 515 LogLag lag = logManager.getLag(Name.ofUrn(NAMESPACE_PREFIX + queueId), Name.ofUrn(NAMESPACE_PREFIX + queueId)); 516 long running = 0; 517 if (lag.lag() > 0) { 518 // we don't have the exact running metric 519 // give an approximation that can be higher that actual one because of the over provisioning 520 running = min(lag.lag(), settings.getPartitions(queueId)); 521 } 522 return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0); 523 } 524 525 @Override 526 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 527 if (queueId != null) { 528 return awaitCompletionOnQueue(queueId, duration, unit); 529 } 530 for (Descriptor item : getDescriptors(QUEUES_EP)) { 531 if (!awaitCompletionOnQueue(item.getId(), duration, unit)) { 532 return false; 533 } 534 } 535 return true; 536 } 537 538 protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException { 539 if (!isStarted()) { 540 return true; 541 } 542 log.debug("awaitCompletion " + queueId + " starting"); 543 // wait for the lag to be null 544 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 545 long deadline = System.currentTimeMillis() + durationMs; 546 while (System.currentTimeMillis() < deadline) { 547 Thread.sleep(100); 548 int lag = getMetrics(queueId).getScheduled().intValue(); 549 if (lag == 0) { 550 if (log.isDebugEnabled()) { 551 log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId)); 552 } 553 return true; 554 } 555 if (!log.isDebugEnabled()) { 556 log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId)); 557 } 558 } 559 log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId))); 560 return false; 561 } 562 563 /** 564 * @deprecated since 10.2 because unused 565 */ 566 @Deprecated 567 public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) 568 throws InterruptedException { 569 if (!isStarted()) { 570 return true; 571 } 572 // wait that the low watermark get stable 573 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 574 long deadline = System.currentTimeMillis() + durationMs; 575 long lowWatermark = getLowWaterMark(queueId); 576 while (System.currentTimeMillis() < deadline) { 577 Thread.sleep(100); 578 long wm = getLowWaterMark(queueId); 579 if (wm == lowWatermark) { 580 log.debug("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm); 581 return true; 582 } 583 if (log.isDebugEnabled()) { 584 log.debug("awaitCompletion low wm for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: " 585 + (wm - lowWatermark)); 586 } 587 lowWatermark = wm; 588 } 589 log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0)); 590 return false; 591 } 592 593 protected long getLowWaterMark(String queueId) { 594 if (queueId != null) { 595 return streamProcessor.getLowWatermark(queueId); 596 } 597 return streamProcessor.getLowWatermark(); 598 } 599 600 @Override 601 public Work.State getWorkState(String workId) { 602 if (!storeState) { 603 return null; 604 } 605 return WorkStateHelper.getState(workId); 606 } 607 608 @Override 609 public Work find(String s, Work.State state) { 610 // always not found 611 return null; 612 } 613 614 @Override 615 public List<Work> listWork(String s, Work.State state) { 616 return Collections.emptyList(); 617 } 618 619 @Override 620 public List<String> listWorkIds(String s, Work.State state) { 621 return Collections.emptyList(); 622 } 623 624 @Override 625 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 626 TransactionManager transactionManager; 627 try { 628 transactionManager = TransactionHelper.lookupTransactionManager(); 629 } catch (NamingException e) { 630 transactionManager = null; 631 } 632 if (transactionManager == null) { 633 log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId()); 634 return false; 635 } 636 try { 637 Transaction transaction = transactionManager.getTransaction(); 638 if (transaction == null) { 639 if (log.isDebugEnabled()) { 640 log.debug("Not scheduled work after commit because of missing transaction: " + work.getId()); 641 } 642 return false; 643 } 644 int status = transaction.getStatus(); 645 if (status == Status.STATUS_ACTIVE) { 646 if (log.isDebugEnabled()) { 647 log.debug("Scheduled after commit: " + work.getId()); 648 } 649 transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling)); 650 return true; 651 } else if (status == Status.STATUS_COMMITTED) { 652 // called in afterCompletion, we can schedule immediately 653 if (log.isDebugEnabled()) { 654 log.debug("Scheduled immediately: " + work.getId()); 655 } 656 return false; 657 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 658 if (log.isDebugEnabled()) { 659 log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId()); 660 } 661 return true; 662 } else { 663 if (log.isDebugEnabled()) { 664 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 665 + work.getId()); 666 } 667 return false; 668 } 669 } catch (SystemException | RollbackException e) { 670 log.error("Cannot schedule after commit", e); 671 return false; 672 } 673 } 674 675 @Override 676 public boolean supportsProcessingDisabling() { 677 return true; 678 } 679 680}