001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static java.lang.Math.min; 022import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED; 023 024import java.lang.reflect.Field; 025import java.time.Duration; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031 032import javax.naming.NamingException; 033import javax.transaction.RollbackException; 034import javax.transaction.Status; 035import javax.transaction.Synchronization; 036import javax.transaction.SystemException; 037import javax.transaction.Transaction; 038import javax.transaction.TransactionManager; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.ecm.core.event.EventServiceComponent; 043import org.nuxeo.ecm.core.work.api.Work; 044import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 045import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 046import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 047import org.nuxeo.lib.stream.codec.Codec; 048import org.nuxeo.lib.stream.computation.Record; 049import org.nuxeo.lib.stream.computation.Settings; 050import org.nuxeo.lib.stream.computation.StreamProcessor; 051import org.nuxeo.lib.stream.computation.Topology; 052import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 053import org.nuxeo.lib.stream.log.LogAppender; 054import org.nuxeo.lib.stream.log.LogLag; 055import org.nuxeo.lib.stream.log.LogManager; 056import org.nuxeo.runtime.api.Framework; 057import org.nuxeo.runtime.codec.CodecService; 058import org.nuxeo.runtime.metrics.NuxeoMetricSet; 059import org.nuxeo.runtime.model.ComponentContext; 060import org.nuxeo.runtime.model.ComponentManager; 061import org.nuxeo.runtime.services.config.ConfigurationService; 062import org.nuxeo.runtime.stream.StreamService; 063import org.nuxeo.runtime.transaction.TransactionHelper; 064 065import com.codahale.metrics.MetricRegistry; 066 067/** 068 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed 069 * for performance reason. 070 * 071 * @since 9.3 072 */ 073public class StreamWorkManager extends WorkManagerImpl { 074 075 protected static final Log log = LogFactory.getLog(StreamWorkManager.class); 076 077 public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config"; 078 079 public static final String DEFAULT_WORK_LOG_CONFIG = "work"; 080 081 public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec"; 082 083 public static final String DEFAULT_WORK_CODEC = "legacy"; 084 085 public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor"; 086 087 public static final String DEFAULT_WORK_OVER_PROVISIONING = "3"; 088 089 public static final int DEFAULT_CONCURRENCY = 4; 090 091 /** 092 * @since 10.2 093 */ 094 public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds"; 095 096 /** 097 * @since 10.2 098 */ 099 public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled"; 100 101 /** 102 * @since 10.2 103 */ 104 public static final String STATETTL_DEFAULT_VALUE = "3600"; 105 106 protected Topology topology; 107 108 protected Settings settings; 109 110 protected StreamProcessor streamProcessor; 111 112 protected LogManager logManager; 113 114 protected final Set<String> streamIds = new HashSet<>(); 115 116 protected boolean storeState; 117 118 protected long stateTTL; 119 120 protected int getOverProvisioningFactor() { 121 // Enable over provisioning only if the log can be distributed 122 if (getLogManager().supportSubscribe()) { 123 return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING)); 124 } 125 return 1; 126 } 127 128 protected String getCodecName() { 129 return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC); 130 } 131 132 protected Codec<Record> getCodec() { 133 return Framework.getService(CodecService.class).getCodec(getCodecName(), Record.class); 134 } 135 136 @Override 137 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 138 String queueId = getStreamForCategory(work.getCategory()); 139 if (log.isDebugEnabled()) { 140 log.debug(String.format( 141 "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", 142 work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work)); 143 } 144 if (!isQueuingEnabled(queueId)) { 145 log.info("Queue disabled, scheduling canceled: " + queueId); 146 return; 147 } 148 if (CANCEL_SCHEDULED.equals(scheduling)) { 149 if (storeState) { 150 if (WorkStateHelper.getState(work.getId()) != null) { 151 WorkStateHelper.setCanceled(work.getId()); 152 } 153 } else { 154 log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s", 155 STORESTATE_KEY, work)); 156 } 157 return; 158 } 159 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 160 return; 161 } 162 WorkSchedulePath.newInstance(work); 163 // We don't need to set a codec because appender is initialized with proper codec during processor init 164 LogAppender<Record> appender = logManager.getAppender(getStreamForCategory(work.getCategory())); 165 if (appender == null) { 166 log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), 167 getStreamForCategory(work.getCategory()))); 168 return; 169 } 170 String key = work.getPartitionKey(); 171 appender.append(key, Record.of(key, WorkComputation.serialize(work))); 172 if (storeState) { 173 WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, stateTTL); 174 } 175 } 176 177 protected String getStreamForCategory(String category) { 178 if (category != null && streamIds.contains(category)) { 179 return category; 180 } 181 return DEFAULT_CATEGORY; 182 } 183 184 @Override 185 public int getApplicationStartedOrder() { 186 // start before the WorkManagerImpl 187 return EventServiceComponent.APPLICATION_STARTED_ORDER - 2; 188 } 189 190 @Override 191 public void start(ComponentContext context) { 192 init(); 193 ConfigurationService configuration = Framework.getService(ConfigurationService.class); 194 storeState = configuration.isBooleanPropertyTrue(STORESTATE_KEY); 195 stateTTL = Long.parseLong(configuration.getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE)); 196 } 197 198 @Override 199 public void init() { 200 if (started) { 201 return; 202 } 203 log.debug("Initializing"); 204 synchronized (this) { 205 if (started) { 206 return; 207 } 208 supplantWorkManagerImpl(); 209 workQueueConfig.index(); 210 initTopology(); 211 logManager = getLogManager(); 212 streamProcessor = new LogStreamProcessor(logManager); 213 streamProcessor.init(topology, settings); 214 started = true; 215 new ComponentListener().install(); 216 log.info("Initialized"); 217 } 218 } 219 220 class ComponentListener implements ComponentManager.Listener { 221 @Override 222 public void beforeStop(ComponentManager mgr, boolean isStandby) { 223 if (!shutdown(10, TimeUnit.SECONDS)) { 224 log.error("Some processors are still active"); 225 } 226 } 227 228 @Override 229 public void afterStart(ComponentManager mgr, boolean isResume) { 230 streamProcessor.start(); 231 for (String id : workQueueConfig.getQueueIds()) { 232 activateQueueMetrics(id); 233 } 234 } 235 236 @Override 237 public void afterStop(ComponentManager mgr, boolean isStandby) { 238 Framework.getRuntime().getComponentManager().removeListener(this); 239 for (String id : workQueueConfig.getQueueIds()) { 240 deactivateQueueMetrics(id); 241 } 242 } 243 } 244 245 protected LogManager getLogManager() { 246 String config = getLogConfig(); 247 log.info("Init StreamWorkManager with Log configuration: " + config); 248 StreamService service = Framework.getService(StreamService.class); 249 return service.getLogManager(getLogConfig()); 250 } 251 252 protected String getLogConfig() { 253 return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG); 254 } 255 256 @Override 257 public boolean isProcessingEnabled(String queueId) { 258 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 259 return wqd != null && wqd.isProcessingEnabled(); 260 } 261 262 /** 263 * Hack to steal the WorkManagerImpl queue contributions. 264 */ 265 protected void supplantWorkManagerImpl() { 266 WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service"); 267 Class clazz = WorkManagerImpl.class; 268 Field workQueueConfigField; 269 try { 270 workQueueConfigField = clazz.getDeclaredField("workQueueConfig"); 271 } catch (NoSuchFieldException e) { 272 throw new RuntimeException(e); 273 } 274 workQueueConfigField.setAccessible(true); 275 final WorkQueueRegistry wqr; 276 try { 277 wqr = (WorkQueueRegistry) workQueueConfigField.get(wmi); 278 log.debug("Remove contributions from WorkManagerImpl"); 279 // Removes the WorkManagerImpl so it does not create any worker pool 280 workQueueConfigField.set(wmi, new WorkQueueRegistry()); 281 // TODO: should we remove workQueuingConfig registry as well ? 282 } catch (IllegalAccessException e) { 283 throw new RuntimeException(e); 284 } 285 wqr.getQueueIds().forEach(id -> workQueueConfig.addContribution(wqr.get(id))); 286 streamIds.addAll(workQueueConfig.getQueueIds()); 287 workQueueConfig.getQueueIds().forEach(id -> log.info("Registering : " + id)); 288 } 289 290 protected void initTopology() { 291 // create a single topology with one root per work pool 292 Topology.Builder builder = Topology.builder(); 293 workQueueConfig.getQueueIds().stream().filter(item -> workQueueConfig.get(item).isProcessingEnabled()).forEach( 294 item -> builder.addComputation(() -> new WorkComputation(item), 295 Collections.singletonList("i1:" + item))); 296 topology = builder.build(); 297 settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY), getCodec()); 298 workQueueConfig.getQueueIds() 299 .forEach(item -> settings.setConcurrency(item, workQueueConfig.get(item).getMaxThreads())); 300 workQueueConfig.getQueueIds().forEach( 301 item -> settings.setPartitions(item, getPartitions(workQueueConfig.get(item).getMaxThreads()))); 302 } 303 304 protected int getPartitions(int maxThreads) { 305 if (maxThreads == 1) { 306 // when the pool size is one the we don't want any concurrency 307 return 1; 308 } 309 return getOverProvisioningFactor() * maxThreads; 310 } 311 312 public class WorkScheduling implements Synchronization { 313 public final Work work; 314 315 public final Scheduling scheduling; 316 317 public WorkScheduling(Work work, Scheduling scheduling) { 318 this.work = work; 319 this.scheduling = scheduling; 320 } 321 322 @Override 323 public void beforeCompletion() { 324 } 325 326 @Override 327 public void afterCompletion(int status) { 328 if (status == Status.STATUS_COMMITTED) { 329 StreamWorkManager.this.schedule(work, scheduling, false); 330 } else { 331 if (status != Status.STATUS_ROLLEDBACK) { 332 throw new IllegalArgumentException("Unsupported transaction status " + status); 333 } 334 } 335 336 } 337 } 338 339 @Override 340 void activateQueue(WorkQueueDescriptor config) { 341 // queue processing is activated only from component listener afterStart 342 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 343 throw new IllegalArgumentException("cannot activate all queues"); 344 } 345 log.info("Activated queue " + config.id + " " + config.toEffectiveString()); 346 if (config.isProcessingEnabled()) { 347 activateQueueMetrics(config.id); 348 } 349 } 350 351 @Override 352 void deactivateQueue(WorkQueueDescriptor config) { 353 // queue processing is deactivated only on shutdown 354 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 355 throw new IllegalArgumentException("cannot deactivate all queues"); 356 } 357 if (config.isProcessingEnabled()) { 358 deactivateQueueMetrics(config.id); 359 } 360 log.info("Deactivated work queue not supported: " + config.id); 361 } 362 363 @Override 364 protected void activateQueueMetrics(String queueId) { 365 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 366 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled"); 367 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running"); 368 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed"); 369 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled"); 370 registry.registerAll(queueMetrics); 371 } 372 373 @Override 374 protected void deactivateQueueMetrics(String queueId) { 375 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 376 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 377 } 378 379 @Override 380 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) { 381 log.warn("Shutdown a queue is not supported with computation implementation"); 382 return false; 383 } 384 385 @Override 386 public boolean shutdown(long timeout, TimeUnit timeUnit) { 387 log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"); 388 shutdownInProgress = true; 389 try { 390 long shutdownDelay = Long.parseLong(Framework.getService(ConfigurationService.class) 391 .getProperty(SHUTDOWN_DELAY_MS_KEY, "0")); 392 boolean ret = streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay))); 393 if (!ret) { 394 log.error("Not able to stop worker pool within the timeout."); 395 } 396 return ret; 397 } finally { 398 shutdownInProgress = false; 399 } 400 } 401 402 @Override 403 public int getQueueSize(String queueId, Work.State state) { 404 switch (state) { 405 case SCHEDULED: 406 return getMetrics(queueId).getScheduled().intValue(); 407 case RUNNING: 408 return getMetrics(queueId).getRunning().intValue(); 409 default: 410 return 0; 411 } 412 } 413 414 protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) { 415 // JMX threads have distinct class loader that need to be changed to get metrics 416 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 417 try { 418 Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader()); 419 return getMetrics(queueId); 420 } finally { 421 Thread.currentThread().setContextClassLoader(classLoader); 422 } 423 } 424 425 @Override 426 public WorkQueueMetrics getMetrics(String queueId) { 427 LogLag lag = logManager.getLag(queueId, queueId); 428 long running = 0; 429 if (lag.lag() > 0) { 430 // we don't have the exact running metric 431 // give an approximation that can be higher that actual one because of the over provisioning 432 running = min(lag.lag(), settings.getPartitions(queueId)); 433 } 434 return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0); 435 } 436 437 @Override 438 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 439 if (queueId != null) { 440 return awaitCompletionOnQueue(queueId, duration, unit); 441 } 442 for (String item : workQueueConfig.getQueueIds()) { 443 if (!awaitCompletionOnQueue(item, duration, unit)) { 444 return false; 445 } 446 } 447 return true; 448 } 449 450 protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException { 451 if (!isStarted()) { 452 return true; 453 } 454 log.debug("awaitCompletion " + queueId + " starting"); 455 // wait for the lag to be null 456 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 457 long deadline = System.currentTimeMillis() + durationMs; 458 while (System.currentTimeMillis() < deadline) { 459 Thread.sleep(100); 460 int lag = getMetrics(queueId).getScheduled().intValue(); 461 if (lag == 0) { 462 if (log.isDebugEnabled()) { 463 log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId)); 464 } 465 return true; 466 } 467 if (!log.isDebugEnabled()) { 468 log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId)); 469 } 470 } 471 log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId))); 472 return false; 473 } 474 475 /** 476 * @deprecated since 10.2 because unused 477 */ 478 @Deprecated 479 public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) 480 throws InterruptedException { 481 if (!isStarted()) { 482 return true; 483 } 484 // wait that the low watermark get stable 485 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 486 long deadline = System.currentTimeMillis() + durationMs; 487 long lowWatermark = getLowWaterMark(queueId); 488 while (System.currentTimeMillis() < deadline) { 489 Thread.sleep(100); 490 long wm = getLowWaterMark(queueId); 491 if (wm == lowWatermark) { 492 log.debug("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm); 493 return true; 494 } 495 if (log.isDebugEnabled()) { 496 log.debug("awaitCompletion low wm for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: " 497 + (wm - lowWatermark)); 498 } 499 lowWatermark = wm; 500 } 501 log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0)); 502 return false; 503 } 504 505 protected long getLowWaterMark(String queueId) { 506 if (queueId != null) { 507 return streamProcessor.getLowWatermark(queueId); 508 } 509 return streamProcessor.getLowWatermark(); 510 } 511 512 @Override 513 public Work.State getWorkState(String workId) { 514 if (!storeState) { 515 return null; 516 } 517 return WorkStateHelper.getState(workId); 518 } 519 520 @Override 521 public Work find(String s, Work.State state) { 522 // always not found 523 return null; 524 } 525 526 @Override 527 public List<Work> listWork(String s, Work.State state) { 528 return Collections.emptyList(); 529 } 530 531 @Override 532 public List<String> listWorkIds(String s, Work.State state) { 533 return Collections.emptyList(); 534 } 535 536 @Override 537 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 538 TransactionManager transactionManager; 539 try { 540 transactionManager = TransactionHelper.lookupTransactionManager(); 541 } catch (NamingException e) { 542 transactionManager = null; 543 } 544 if (transactionManager == null) { 545 log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId()); 546 return false; 547 } 548 try { 549 Transaction transaction = transactionManager.getTransaction(); 550 if (transaction == null) { 551 if (log.isDebugEnabled()) { 552 log.debug("Not scheduled work after commit because of missing transaction: " + work.getId()); 553 } 554 return false; 555 } 556 int status = transaction.getStatus(); 557 if (status == Status.STATUS_ACTIVE) { 558 if (log.isDebugEnabled()) { 559 log.debug("Scheduled after commit: " + work.getId()); 560 } 561 transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling)); 562 return true; 563 } else if (status == Status.STATUS_COMMITTED) { 564 // called in afterCompletion, we can schedule immediately 565 if (log.isDebugEnabled()) { 566 log.debug("Scheduled immediately: " + work.getId()); 567 } 568 return false; 569 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 570 if (log.isDebugEnabled()) { 571 log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId()); 572 } 573 return true; 574 } else { 575 if (log.isDebugEnabled()) { 576 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 577 + work.getId()); 578 } 579 return false; 580 } 581 } catch (SystemException | RollbackException e) { 582 log.error("Cannot schedule after commit", e); 583 return false; 584 } 585 } 586 587}