001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static java.lang.Math.min; 022import static org.nuxeo.ecm.core.work.api.WorkManager.Scheduling.CANCEL_SCHEDULED; 023 024import java.lang.reflect.Field; 025import java.time.Duration; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.TimeUnit; 031 032import javax.naming.NamingException; 033import javax.transaction.RollbackException; 034import javax.transaction.Status; 035import javax.transaction.Synchronization; 036import javax.transaction.SystemException; 037import javax.transaction.Transaction; 038import javax.transaction.TransactionManager; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.event.EventServiceComponent; 044import org.nuxeo.ecm.core.work.api.Work; 045import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor; 046import org.nuxeo.ecm.core.work.api.WorkQueueMetrics; 047import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 048import org.nuxeo.lib.stream.computation.Record; 049import org.nuxeo.lib.stream.computation.Settings; 050import org.nuxeo.lib.stream.computation.StreamProcessor; 051import org.nuxeo.lib.stream.computation.Topology; 052import org.nuxeo.lib.stream.computation.Watermark; 053import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 054import org.nuxeo.lib.stream.log.LogAppender; 055import org.nuxeo.lib.stream.log.LogLag; 056import org.nuxeo.lib.stream.log.LogManager; 057import org.nuxeo.runtime.api.Framework; 058import org.nuxeo.runtime.metrics.NuxeoMetricSet; 059import org.nuxeo.runtime.model.ComponentContext; 060import org.nuxeo.runtime.model.ComponentManager; 061import org.nuxeo.runtime.stream.StreamService; 062import org.nuxeo.runtime.transaction.TransactionHelper; 063 064import com.codahale.metrics.MetricRegistry; 065 066/** 067 * WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed 068 * for performance reason. 069 * 070 * @since 9.3 071 */ 072public class StreamWorkManager extends WorkManagerImpl { 073 protected static final Log log = LogFactory.getLog(StreamWorkManager.class); 074 075 public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config"; 076 077 public static final String DEFAULT_WORK_LOG_CONFIG = "work"; 078 079 public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor"; 080 081 public static final String DEFAULT_WORK_OVER_PROVISIONING = "3"; 082 083 public static final int DEFAULT_CONCURRENCY = 4; 084 085 protected Topology topology; 086 087 protected Settings settings; 088 089 protected StreamProcessor streamProcessor; 090 091 protected LogManager logManager; 092 093 protected final Set<String> streamIds = new HashSet<>(); 094 095 protected int getOverProvisioningFactor() { 096 // Enable over provisioning only if the log can be distributed 097 if (getLogManager().supportSubscribe()) { 098 return Integer.parseInt(Framework.getProperty(WORK_OVER_PROVISIONING_PROP, DEFAULT_WORK_OVER_PROVISIONING)); 099 } 100 return 1; 101 } 102 103 @Override 104 public void schedule(Work work, Scheduling scheduling, boolean afterCommit) { 105 String queueId = getStreamForCategory(work.getCategory()); 106 if (log.isDebugEnabled()) { 107 log.debug(String.format( 108 "Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", 109 work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work)); 110 } 111 if (!isQueuingEnabled(queueId)) { 112 log.info("Queue disabled, scheduling canceled: " + queueId); 113 return; 114 } 115 if (CANCEL_SCHEDULED.equals(scheduling)) { 116 log.warn("Canceling a work is not supported by this impl, skipping work: " + work); 117 return; 118 } 119 if (afterCommit && scheduleAfterCommit(work, scheduling)) { 120 return; 121 } 122 WorkSchedulePath.newInstance(work); 123 LogAppender<Record> appender = logManager.getAppender(getStreamForCategory(work.getCategory())); 124 if (appender == null) { 125 log.error(String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), 126 getStreamForCategory(work.getCategory()))); 127 return; 128 } 129 String key = work.getPartitionKey(); 130 appender.append(key, new Record(key, WorkComputation.serialize(work), 131 Watermark.ofTimestamp(System.currentTimeMillis()).getValue(), null)); 132 } 133 134 protected String getStreamForCategory(String category) { 135 if (category != null && streamIds.contains(category)) { 136 return category; 137 } 138 return DEFAULT_CATEGORY; 139 } 140 141 @Override 142 public int getApplicationStartedOrder() { 143 // start before the WorkManagerImpl 144 return EventServiceComponent.APPLICATION_STARTED_ORDER - 2; 145 } 146 147 @Override 148 public void start(ComponentContext context) { 149 init(); 150 } 151 152 @Override 153 public void init() { 154 if (started) { 155 return; 156 } 157 log.debug("Initializing"); 158 synchronized (this) { 159 if (started) { 160 return; 161 } 162 supplantWorkManagerImpl(); 163 workQueueConfig.index(); 164 initTopology(); 165 this.logManager = getLogManager(); 166 this.streamProcessor = new LogStreamProcessor(logManager); 167 streamProcessor.init(topology, settings); 168 started = true; 169 new ComponentListener().install(); 170 log.info("Initialized"); 171 } 172 } 173 174 class ComponentListener implements ComponentManager.Listener { 175 @Override 176 public void beforeStop(ComponentManager mgr, boolean isStandby) { 177 try { 178 if (!shutdown(10, TimeUnit.SECONDS)) { 179 log.error("Some processors are still active"); 180 } 181 } catch (InterruptedException e) { 182 Thread.currentThread().interrupt(); 183 throw new NuxeoException("Interrupted while stopping work manager thread pools", e); 184 } 185 } 186 187 @Override 188 public void afterStart(ComponentManager mgr, boolean isResume) { 189 streamProcessor.start(); 190 for (String id : workQueueConfig.getQueueIds()) { 191 activateQueueMetrics(id); 192 } 193 } 194 195 @Override 196 public void afterStop(ComponentManager mgr, boolean isStandby) { 197 Framework.getRuntime().getComponentManager().removeListener(this); 198 for (String id : workQueueConfig.getQueueIds()) { 199 deactivateQueueMetrics(id); 200 } 201 } 202 } 203 204 protected LogManager getLogManager() { 205 String config = getLogConfig(); 206 log.info("Init StreamWorkManager with Log configuration: " + config); 207 StreamService service = Framework.getService(StreamService.class); 208 return service.getLogManager(getLogConfig()); 209 } 210 211 protected String getLogConfig() { 212 return Framework.getProperty(WORK_LOG_CONFIG_PROP, DEFAULT_WORK_LOG_CONFIG); 213 } 214 215 @Override 216 public boolean isProcessingEnabled(String queueId) { 217 WorkQueueDescriptor wqd = getWorkQueueDescriptor(queueId); 218 return wqd != null && wqd.isProcessingEnabled(); 219 } 220 221 /** 222 * Hack to steal the WorkManagerImpl queue contributions. 223 */ 224 protected void supplantWorkManagerImpl() { 225 WorkManagerImpl wmi = (WorkManagerImpl) Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service"); 226 Class clazz = WorkManagerImpl.class; 227 Field workQueueConfigField; 228 try { 229 workQueueConfigField = clazz.getDeclaredField("workQueueConfig"); 230 } catch (NoSuchFieldException e) { 231 throw new RuntimeException(e); 232 } 233 workQueueConfigField.setAccessible(true); 234 final WorkQueueRegistry wqr; 235 try { 236 wqr = (WorkQueueRegistry) workQueueConfigField.get(wmi); 237 log.debug("Remove contributions from WorkManagerImpl"); 238 // Removes the WorkManagerImpl so it does not create any worker pool 239 workQueueConfigField.set(wmi, new WorkQueueRegistry()); 240 // TODO: should we remove workQueuingConfig registry as well ? 241 } catch (IllegalAccessException e) { 242 throw new RuntimeException(e); 243 } 244 wqr.getQueueIds().forEach(id -> workQueueConfig.addContribution(wqr.get(id))); 245 streamIds.addAll(workQueueConfig.getQueueIds()); 246 workQueueConfig.getQueueIds().forEach(id -> log.info("Registering : " + id)); 247 } 248 249 protected void initTopology() { 250 // create a single topology with one root per work pool 251 Topology.Builder builder = Topology.builder(); 252 workQueueConfig.getQueueIds().stream().filter(item -> workQueueConfig.get(item).isProcessingEnabled()).forEach( 253 item -> builder.addComputation(() -> new WorkComputation(item), 254 Collections.singletonList("i1:" + item))); 255 this.topology = builder.build(); 256 this.settings = new Settings(DEFAULT_CONCURRENCY, getPartitions(DEFAULT_CONCURRENCY)); 257 workQueueConfig.getQueueIds() 258 .forEach(item -> settings.setConcurrency(item, workQueueConfig.get(item).getMaxThreads())); 259 workQueueConfig.getQueueIds().forEach( 260 item -> settings.setPartitions(item, getPartitions(workQueueConfig.get(item).getMaxThreads()))); 261 } 262 263 protected int getPartitions(int maxThreads) { 264 if (maxThreads == 1) { 265 // when the pool size is one the we don't want any concurrency 266 return 1; 267 } 268 return getOverProvisioningFactor() * maxThreads; 269 } 270 271 public class WorkScheduling implements Synchronization { 272 public final Work work; 273 274 public final Scheduling scheduling; 275 276 public WorkScheduling(Work work, Scheduling scheduling) { 277 this.work = work; 278 this.scheduling = scheduling; 279 } 280 281 @Override 282 public void beforeCompletion() { 283 } 284 285 @Override 286 public void afterCompletion(int status) { 287 if (status == Status.STATUS_COMMITTED) { 288 StreamWorkManager.this.schedule(this.work, this.scheduling, false); 289 } else { 290 if (status != Status.STATUS_ROLLEDBACK) { 291 throw new IllegalArgumentException("Unsupported transaction status " + status); 292 } 293 } 294 295 } 296 } 297 298 @Override 299 void activateQueue(WorkQueueDescriptor config) { 300 // queue processing is activated only from component listener afterStart 301 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 302 throw new IllegalArgumentException("cannot activate all queues"); 303 } 304 log.info("Activated queue " + config.id + " " + config.toEffectiveString()); 305 if (config.isProcessingEnabled()) { 306 activateQueueMetrics(config.id); 307 } 308 } 309 310 @Override 311 void deactivateQueue(WorkQueueDescriptor config) { 312 // queue processing is deactivated only on shutdown 313 if (WorkQueueDescriptor.ALL_QUEUES.equals(config.id)) { 314 throw new IllegalArgumentException("cannot deactivate all queues"); 315 } 316 if (config.isProcessingEnabled()) { 317 deactivateQueueMetrics(config.id); 318 } 319 log.info("Deactivated work queue not supported: " + config.id); 320 } 321 322 @Override 323 protected void activateQueueMetrics(String queueId) { 324 NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", "works", "total", queueId); 325 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).scheduled, "scheduled"); 326 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).running, "running"); 327 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).completed, "completed"); 328 queueMetrics.putGauge(() -> getMetricsWithNuxeoClassLoader(queueId).canceled, "canceled"); 329 registry.registerAll(queueMetrics); 330 } 331 332 @Override 333 protected void deactivateQueueMetrics(String queueId) { 334 String queueMetricsName = MetricRegistry.name("nuxeo", "works", "total", queueId); 335 registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName)); 336 } 337 338 @Override 339 public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException { 340 log.warn("Shutdown a queue is not supported with computation implementation"); 341 return false; 342 } 343 344 @Override 345 public boolean shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException { 346 log.info("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"); 347 shutdownInProgress = true; 348 try { 349 boolean ret = streamProcessor.stop(Duration.ofMillis(timeUnit.toMillis(timeout))); 350 if (!ret) { 351 log.error("Not able to stop worker pool within the timeout."); 352 } 353 return ret; 354 } finally { 355 shutdownInProgress = false; 356 } 357 } 358 359 @Override 360 public int getQueueSize(String queueId, Work.State state) { 361 switch (state) { 362 case SCHEDULED: 363 return getMetrics(queueId).getScheduled().intValue(); 364 case RUNNING: 365 return getMetrics(queueId).getRunning().intValue(); 366 default: 367 return 0; 368 } 369 } 370 371 protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) { 372 // JMX threads have distinct class loader that need to be changed to get metrics 373 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 374 try { 375 Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader()); 376 return getMetrics(queueId); 377 } finally { 378 Thread.currentThread().setContextClassLoader(classLoader); 379 } 380 } 381 382 @Override 383 public WorkQueueMetrics getMetrics(String queueId) { 384 LogLag lag = logManager.getLag(queueId, queueId); 385 long running = 0; 386 if (lag.lag() > 0) { 387 // we don't have the exact running metric 388 // give an approximation that can be higher that actual one because of the over provisioning 389 running = min(lag.lag(), settings.getPartitions(queueId)); 390 } 391 return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0); 392 } 393 394 @Override 395 public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException { 396 if (queueId != null) { 397 return awaitCompletionOnQueue(queueId, duration, unit); 398 } 399 for (String item : workQueueConfig.getQueueIds()) { 400 if (!awaitCompletionOnQueue(item, duration, unit)) { 401 return false; 402 } 403 } 404 return true; 405 } 406 407 protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException { 408 if (!isStarted()) { 409 return true; 410 } 411 log.debug("awaitCompletion " + queueId + " starting"); 412 // wait for the lag to be null 413 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 414 long deadline = System.currentTimeMillis() + durationMs; 415 while (System.currentTimeMillis() < deadline) { 416 Thread.sleep(100); 417 int lag = getMetrics(queueId).getScheduled().intValue(); 418 if (lag == 0) { 419 if (log.isDebugEnabled()) { 420 log.debug("awaitCompletion for " + queueId + " completed " + getMetrics(queueId)); 421 } 422 return true; 423 } 424 if (!log.isDebugEnabled()) { 425 log.debug("awaitCompletion for " + queueId + " not completed " + getMetrics(queueId)); 426 } 427 } 428 log.warn(String.format("%s timeout after: %.2fs, %s", queueId, durationMs / 1000.0, getMetrics(queueId))); 429 return false; 430 } 431 432 public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) 433 throws InterruptedException { 434 if (!isStarted()) { 435 return true; 436 } 437 // wait that the low watermark get stable 438 long durationMs = min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1)); // prevent overflow 439 long deadline = System.currentTimeMillis() + durationMs; 440 long lowWatermark = getLowWaterMark(queueId); 441 while (System.currentTimeMillis() < deadline) { 442 Thread.sleep(100); 443 long wm = getLowWaterMark(queueId); 444 if (wm == lowWatermark) { 445 log.debug("awaitCompletion for " + ((queueId == null) ? "all" : queueId) + " completed " + wm); 446 return true; 447 } 448 if (log.isDebugEnabled()) { 449 log.debug("awaitCompletion low wm for " + ((queueId == null) ? "all" : queueId) + ":" + wm + " diff: " 450 + (wm - lowWatermark)); 451 } 452 lowWatermark = wm; 453 } 454 log.warn(String.format("%s timeout after: %.2fs", queueId, durationMs / 1000.0)); 455 return false; 456 } 457 458 protected long getLowWaterMark(String queueId) { 459 if (queueId != null) { 460 return streamProcessor.getLowWatermark(queueId); 461 } 462 return streamProcessor.getLowWatermark(); 463 } 464 465 @Override 466 public Work.State getWorkState(String s) { 467 // always not found 468 return null; 469 } 470 471 @Override 472 public Work find(String s, Work.State state) { 473 // always not found 474 return null; 475 } 476 477 @Override 478 public List<Work> listWork(String s, Work.State state) { 479 return Collections.emptyList(); 480 } 481 482 @Override 483 public List<String> listWorkIds(String s, Work.State state) { 484 return Collections.emptyList(); 485 } 486 487 @Override 488 protected boolean scheduleAfterCommit(Work work, Scheduling scheduling) { 489 TransactionManager transactionManager; 490 try { 491 transactionManager = TransactionHelper.lookupTransactionManager(); 492 } catch (NamingException e) { 493 transactionManager = null; 494 } 495 if (transactionManager == null) { 496 log.warn("Not scheduled work after commit because of missing transaction manager: " + work.getId()); 497 return false; 498 } 499 try { 500 Transaction transaction = transactionManager.getTransaction(); 501 if (transaction == null) { 502 if (log.isDebugEnabled()) { 503 log.debug("Not scheduled work after commit because of missing transaction: " + work.getId()); 504 } 505 return false; 506 } 507 int status = transaction.getStatus(); 508 if (status == Status.STATUS_ACTIVE) { 509 if (log.isDebugEnabled()) { 510 log.debug("Scheduled after commit: " + work.getId()); 511 } 512 transaction.registerSynchronization(new StreamWorkManager.WorkScheduling(work, scheduling)); 513 return true; 514 } else if (status == Status.STATUS_COMMITTED) { 515 // called in afterCompletion, we can schedule immediately 516 if (log.isDebugEnabled()) { 517 log.debug("Scheduled immediately: " + work.getId()); 518 } 519 return false; 520 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 521 if (log.isDebugEnabled()) { 522 log.debug("Cancelling schedule because transaction marked rollback-only: " + work.getId()); 523 } 524 return true; 525 } else { 526 if (log.isDebugEnabled()) { 527 log.debug("Not scheduling work after commit because transaction is in status " + status + ": " 528 + work.getId()); 529 } 530 return false; 531 } 532 } catch (SystemException | RollbackException e) { 533 log.error("Cannot schedule after commit", e); 534 return false; 535 } 536 } 537 538}