001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static java.lang.Math.min; 022import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED; 023 024import java.time.Duration; 025import java.util.Collections; 026import java.util.List; 027import java.util.concurrent.TimeUnit; 028 029import javax.naming.NamingException; 030import javax.transaction.RollbackException; 031import javax.transaction.Status; 032import javax.transaction.Synchronization; 033import javax.transaction.SystemException; 034import javax.transaction.Transaction; 035import javax.transaction.TransactionManager; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.nuxeo.ecm.core.event.EventServiceComponent; 040import org.nuxeo.ecm.core.work.api.Work; 041import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 042import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 043import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 044import org.nuxeo.lib.stream.codec.Codec; 045import org.nuxeo.lib.stream.computation.Record; 046import org.nuxeo.lib.stream.computation.Settings; 047import org.nuxeo.lib.stream.computation.StreamProcessor; 048import org.nuxeo.lib.stream.computation.Topology; 049import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 050import org.nuxeo.lib.stream.log.LogAppender; 051import org.nuxeo.lib.stream.log.LogLag; 052import org.nuxeo.lib.stream.log.LogManager; 053import org.nuxeo.lib.stream.log.LogOffset; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.codec.CodecService; 056import org.nuxeo.runtime.metrics.NuxeoMetricSet; 057import org.nuxeo.runtime.model.ComponentContext; 058import org.nuxeo.runtime.model.ComponentManager; 059import org.nuxeo.runtime.model.Descriptor; 060import org.nuxeo.runtime.services.config.ConfigurationService; 061import org.nuxeo.runtime.stream.StreamService; 062import org.nuxeo.runtime.transaction.TransactionHelper; 063 064import com.codahale.metrics.MetricRegistry; 065 066/** 067 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed 068 * for performance reason. 069 * 070 * @since 9.3 071 */ 072public class StreamWorkManager extends WorkManagerImpl { 073 074 protected static final Log log = LogFactory.getLog(StreamWorkManager.class); 075 076 public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config"; 077 078 public static final String DEFAULT_WORK_LOG_CONFIG = "work"; 079 080 public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec"; 081 082 public static final String DEFAULT_WORK_CODEC = "legacy"; 083 084 public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor"; 085 086 public static final String DEFAULT_WORK_OVER_PROVISIONING = "3"; 087 088 public static final int DEFAULT_CONCURRENCY = 4; 089 090 /** 091 * @since 10.2 092 */ 093 public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds"; 094 095 /** 096 * @since 10.2 097 */ 098 public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled"; 099 100 /** 101 * @since 10.2 102 */ 103 public static final String STATETTL_DEFAULT_VALUE = "3600"; 104 105 protected Topology topology; 106 107 protected Settings settings; 108 109 protected StreamProcessor streamProcessor; 110 111 protected LogManager logManager; 112 113 protected boolean storeState; 114 115 protected long stateTTL; 116 117 protected int getOverProvisioningFactor() { 118 // Enable over provisioning only if the log can be distributed 119 if (getLogManager().supportSubscribe()) { 120 return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING)); 121 } 122 return 1; 123 } 124 125 protected String getCodecName() { 126 return Framework.getProperty(WORK_CODEC_PROP, DEFAULT_WORK_CODEC); 127 } 128 129 protected Codec<Record> getCodec() { 130 return Framework.getService(CodecService.class).getCodec(getCodecName(), Record.class); 131 } 132 133 @Override 134 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 135 String queueId = getCategoryQueueId(work.getCategory()); 136 if (log.isDebugEnabled()) { 137 log.debug(String.format( 138 "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", 139 work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work)); 140 } 141 if (!isQueuingEnabled(queueId)) { 142 log.info("Queue disabled, scheduling canceled: " + queueId); 143 return; 144 } 145 if (CANCEL_SCHEDULED.equals(scheduling)) { 146 if (storeState) { 147 if (WorkStateHelper.getState(work.getId()) != null) { 148 WorkStateHelper.setCanceled(work.getId()); 149 } 150 } else { 151 log.warn(String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s", 152 STORESTATE_KEY, work)); 153 } 154 return; 155 } 156 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 157 return; 158 } 159 WorkSchedulePath.newInstance(work); 160 // We don't need to set a codec because appender is initialized with proper codec during processor init 161 LogAppender<Record> appender = logManager.getAppender(queueId); 162 if (appender == null) { 163 log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), 164 queueId)); 165 return; 166 } 167 String key = work.getPartitionKey(); 168 LogOffset offset = appender.append(key, Record.of(key, WorkComputation.serialize(work))); 169 if (work.isCoalescing()) { 170 WorkStateHelper.setLastOffset(work.getId(), offset.offset(), stateTTL); 171 } 172 if (storeState) { 173 WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, stateTTL); 174 } 175 } 176 177 @Override 178 public int getApplicationStartedOrder() { 179 // start before the WorkManagerImpl 180 return EventServiceComponent.APPLICATION_STARTED_ORDER - 2; 181 } 182 183 @Override 184 public void start(ComponentContext context) { 185 super.start(context); 186 ConfigurationService configuration = Framework.getService(ConfigurationService.class); 187 storeState = configuration.isBooleanPropertyTrue(STORESTATE_KEY); 188 stateTTL = Long.parseLong(configuration.getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE)); 189 } 190 191 @Override 192 public void init() { 193 if (started) { 194 return; 195 } 196 WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service"); 197 wmi.active = false; 198 log.debug("Initializing"); 199 synchronized (this) { 200 if (started) { 201 return; 202 } 203 getDescriptors(QUEUES_EP).forEach(d -> categoryToQueueId.put(d.getId(), d.getId())); 204 index(); 205 initTopology(); 206 logManager = getLogManager(); 207 streamProcessor = new LogStreamProcessor(logManager); 208 streamProcessor.init(topology, settings); 209 started = true; 210 new ComponentListener().install(); 211 log.info("Initialized"); 212 } 213 } 214 215 class ComponentListener implements ComponentManager.Listener { 216 @Override 217 public void beforeStop(ComponentManager mgr, boolean isStandby) { 218 if (!shutdown(10, TimeUnit.SECONDS)) { 219 log.error("Some processors are still active"); 220 } 221 } 222 223 @Override 224 public void afterStart(ComponentManager mgr, boolean isResume) { 225 streamProcessor.start(); 226 for (Descriptor d : getDescriptors(QUEUES_EP)) { 227 activateQueueMetrics(d.getId()); 228 } 229 } 230 231 @Override 232 public void afterStop(ComponentManager mgr, boolean isStandby) { 233 Framework.getRuntime().getComponentManager().removeListener(this); 234 for (Descriptor d : getDescriptors(QUEUES_EP)) { 235 deactivateQueueMetrics(d.getId()); 236 } 237 } 238 } 239 240 protected LogManager getLogManager() { 241 String config = getLogConfig(); 242 log.info("Init StreamWorkManager with Log configuration: " + config); 243 StreamService service = Framework.getService(StreamService.class); 244 return service.getLogManager(getLogConfig()); 245 } 246 247 protected String getLogConfig() { 248 return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG); 249 } 250 251 @Override 252 public boolean isProcessingEnabled(String queueId) { 253 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 254 return wqd != null && wqd.isProcessingEnabled(); 255 } 256 257 protected void initTopology() { 258 // create a single topology with one root per work pool 259 Topology.Builder builder = Topology.builder(); 260 List<WorkQueueDescriptor> descriptors = getDescriptors(QUEUES_EP); 261 descriptors.stream().filter(WorkQueueDescriptor::isProcessingEnabled).forEach(d -> builder.addComputation( 262 () -> new WorkComputation(d.getId()), Collections.singletonList("i1:" + d.getId()))); 263 topology = builder.build(); 264 settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY), getCodec()); 265 descriptors.forEach(item -> settings.setConcurrency(item.getId(), item.getMaxThreads())); 266 descriptors.forEach(item -> settings.setPartitions(item.getId(), getPartitions(item.getMaxThreads()))); 267 } 268 269 protected int getPartitions(int maxThreads) { 270 if (maxThreads == 1) { 271 // when the pool size is one the we don't want any concurrency 272 return 1; 273 } 274 return getOverProvisioningFactor() * maxThreads; 275 } 276 277 public class WorkScheduling implements Synchronization { 278 public final Work work; 279 280 public final Scheduling scheduling; 281 282 public WorkScheduling(Work work, Scheduling scheduling) { 283 this.work = work; 284 this.scheduling = scheduling; 285 } 286 287 @Override 288 public void beforeCompletion() { 289 } 290 291 @Override 292 public void afterCompletion(int status) { 293 if (status == Status.STATUS_COMMITTED) { 294 StreamWorkManager.this.schedule(work, scheduling, false); 295 } else { 296 if (status != Status.STATUS_ROLLEDBACK) { 297 throw new IllegalArgumentException("Unsupported transaction status " + status); 298 } 299 } 300 301 } 302 } 303 304 @Override 305 void activateQueue(WorkQueueDescriptor config) { 306 // queue processing is activated only from component listener afterStart 307 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 308 throw new IllegalArgumentException("cannot activate all queues"); 309 } 310 log.info("Activated queue " + config.id + " " + config.toString()); 311 if (config.isProcessingEnabled()) { 312 activateQueueMetrics(config.id); 313 } 314 } 315 316 @Override 317 void deactivateQueue(WorkQueueDescriptor config) { 318 // queue processing is deactivated only on shutdown 319 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 320 throw new IllegalArgumentException("cannot deactivate all queues"); 321 } 322 if (config.isProcessingEnabled()) { 323 deactivateQueueMetrics(config.id); 324 } 325 log.info("Deactivated work queue not supported: " + config.id); 326 } 327 328 @Override 329 protected void activateQueueMetrics(String queueId) { 330 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 331 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled"); 332 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running"); 333 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed"); 334 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled"); 335 registry.registerAll(queueMetrics); 336 } 337 338 @Override 339 protected void deactivateQueueMetrics(String queueId) { 340 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 341 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 342 } 343 344 @Override 345 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) { 346 log.warn("Shutdown a queue is not supported with computation implementation"); 347 return false; 348 } 349 350 @Override 351 public boolean shutdown(long timeout, TimeUnit timeUnit) { 352 log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"); 353 shutdownInProgress = true; 354 try { 355 long shutdownDelay = Long.parseLong(Framework.getService(ConfigurationService.class) 356 .getProperty(SHUTDOWN_DELAY_MS_KEY, "0")); 357 boolean ret = streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay))); 358 if (!ret) { 359 log.error("Not able to stop worker pool within the timeout."); 360 } 361 return ret; 362 } finally { 363 shutdownInProgress = false; 364 } 365 } 366 367 @Override 368 public int getQueueSize(String queueId, Work.State state) { 369 switch (state) { 370 case SCHEDULED: 371 return getMetrics(queueId).getScheduled().intValue(); 372 case RUNNING: 373 return getMetrics(queueId).getRunning().intValue(); 374 default: 375 return 0; 376 } 377 } 378 379 protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) { 380 // JMX threads have distinct class loader that need to be changed to get metrics 381 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 382 try { 383 Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader()); 384 return getMetrics(queueId); 385 } finally { 386 Thread.currentThread().setContextClassLoader(classLoader); 387 } 388 } 389 390 @Override 391 public WorkQueueMetrics getMetrics(String queueId) { 392 LogLag lag = logManager.getLag(queueId, queueId); 393 long running = 0; 394 if (lag.lag() > 0) { 395 // we don't have the exact running metric 396 // give an approximation that can be higher that actual one because of the over provisioning 397 running = min(lag.lag(), settings.getPartitions(queueId)); 398 } 399 return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0); 400 } 401 402 @Override 403 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 404 if (queueId != null) { 405 return awaitCompletionOnQueue(queueId, duration, unit); 406 } 407 for (Descriptor item : getDescriptors(QUEUES_EP)) { 408 if (!awaitCompletionOnQueue(item.getId(), duration, unit)) { 409 return false; 410 } 411 } 412 return true; 413 } 414 415 protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException { 416 if (!isStarted()) { 417 return true; 418 } 419 log.debug("awaitCompletion " + queueId + " starting"); 420 // wait for the lag to be null 421 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 422 long deadline = System.currentTimeMillis() + durationMs; 423 while (System.currentTimeMillis() < deadline) { 424 Thread.sleep(100); 425 int lag = getMetrics(queueId).getScheduled().intValue(); 426 if (lag == 0) { 427 if (log.isDebugEnabled()) { 428 log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId)); 429 } 430 return true; 431 } 432 if (!log.isDebugEnabled()) { 433 log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId)); 434 } 435 } 436 log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId))); 437 return false; 438 } 439 440 /** 441 * @deprecated since 10.2 because unused 442 */ 443 @Deprecated 444 public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) 445 throws InterruptedException { 446 if (!isStarted()) { 447 return true; 448 } 449 // wait that the low watermark get stable 450 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 451 long deadline = System.currentTimeMillis() + durationMs; 452 long lowWatermark = getLowWaterMark(queueId); 453 while (System.currentTimeMillis() < deadline) { 454 Thread.sleep(100); 455 long wm = getLowWaterMark(queueId); 456 if (wm == lowWatermark) { 457 log.debug("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm); 458 return true; 459 } 460 if (log.isDebugEnabled()) { 461 log.debug("awaitCompletion low wm for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: " 462 + (wm - lowWatermark)); 463 } 464 lowWatermark = wm; 465 } 466 log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0)); 467 return false; 468 } 469 470 protected long getLowWaterMark(String queueId) { 471 if (queueId != null) { 472 return streamProcessor.getLowWatermark(queueId); 473 } 474 return streamProcessor.getLowWatermark(); 475 } 476 477 @Override 478 public Work.State getWorkState(String workId) { 479 if (!storeState) { 480 return null; 481 } 482 return WorkStateHelper.getState(workId); 483 } 484 485 @Override 486 public Work find(String s, Work.State state) { 487 // always not found 488 return null; 489 } 490 491 @Override 492 public List<Work> listWork(String s, Work.State state) { 493 return Collections.emptyList(); 494 } 495 496 @Override 497 public List<String> listWorkIds(String s, Work.State state) { 498 return Collections.emptyList(); 499 } 500 501 @Override 502 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 503 TransactionManager transactionManager; 504 try { 505 transactionManager = TransactionHelper.lookupTransactionManager(); 506 } catch (NamingException e) { 507 transactionManager = null; 508 } 509 if (transactionManager == null) { 510 log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId()); 511 return false; 512 } 513 try { 514 Transaction transaction = transactionManager.getTransaction(); 515 if (transaction == null) { 516 if (log.isDebugEnabled()) { 517 log.debug("Not scheduled work after commit because of missing transaction: " + work.getId()); 518 } 519 return false; 520 } 521 int status = transaction.getStatus(); 522 if (status == Status.STATUS_ACTIVE) { 523 if (log.isDebugEnabled()) { 524 log.debug("Scheduled after commit: " + work.getId()); 525 } 526 transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling)); 527 return true; 528 } else if (status == Status.STATUS_COMMITTED) { 529 // called in afterCompletion, we can schedule immediately 530 if (log.isDebugEnabled()) { 531 log.debug("Scheduled immediately: " + work.getId()); 532 } 533 return false; 534 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 535 if (log.isDebugEnabled()) { 536 log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId()); 537 } 538 return true; 539 } else { 540 if (log.isDebugEnabled()) { 541 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 542 + work.getId()); 543 } 544 return false; 545 } 546 } catch (SystemException | RollbackException e) { 547 log.error("Cannot schedule after commit", e); 548 return false; 549 } 550 } 551 552 @Override 553 public boolean supportsProcessingDisabling() { 554 return true; 555 } 556 557}