001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.api.event.CoreEventConstants.REPOSITORY_NAME; 022import static org.nuxeo.ecm.core.work.WorkManagerImpl.DEAD_LETTER_QUEUE; 023import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 024 025import java.io.Serializable; 026import java.security.SecureRandom; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033 034import javax.security.auth.login.LoginException; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.common.utils.ExceptionUtils; 039import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 040import org.nuxeo.ecm.core.api.CoreInstance; 041import org.nuxeo.ecm.core.api.CoreSession; 042import org.nuxeo.ecm.core.api.DocumentLocation; 043import org.nuxeo.ecm.core.api.IdRef; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 046import org.nuxeo.ecm.core.event.Event; 047import org.nuxeo.ecm.core.event.EventContext; 048import org.nuxeo.ecm.core.event.EventService; 049import org.nuxeo.ecm.core.event.impl.EventContextImpl; 050import org.nuxeo.ecm.core.event.impl.EventImpl; 051import org.nuxeo.ecm.core.work.api.Work; 052import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 053import org.nuxeo.lib.stream.computation.Record; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.api.login.NuxeoLoginContext; 056import org.nuxeo.runtime.metrics.MetricsService; 057import org.nuxeo.runtime.stream.StreamService; 058import org.nuxeo.runtime.transaction.TransactionHelper; 059 060import io.dropwizard.metrics5.MetricRegistry; 061import io.dropwizard.metrics5.SharedMetricRegistries; 062import io.opencensus.common.Scope; 063import io.opencensus.trace.AttributeValue; 064import io.opencensus.trace.BlankSpan; 065import io.opencensus.trace.Link; 066import io.opencensus.trace.Span; 067import io.opencensus.trace.SpanContext; 068import io.opencensus.trace.Status; 069import io.opencensus.trace.Tracer; 070import io.opencensus.trace.Tracing; 071import io.opencensus.trace.propagation.BinaryFormat; 072import io.opencensus.trace.propagation.SpanContextParseException; 073 074/** 075 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 076 * <p> 077 * It also deals with transaction management, and prevents running work instances that are suspending. 078 * <p> 079 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 080 * available. 081 * <p> 082 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 083 * state and call {@link #suspended()}. 084 * <p> 085 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 086 * 087 * @since 5.6 088 */ 089public abstract class AbstractWork implements Work { 090 091 private static final long serialVersionUID = 1L; 092 093 private static final Log log = LogFactory.getLog(AbstractWork.class); 094 095 protected static final Random RANDOM = new SecureRandom(); 096 097 public static final String WORK_FAILED_EVENT = "workFailed"; 098 099 public static final String WORK_INSTANCE = "workInstance"; 100 101 public static final String FAILURE_MSG = "failureMsg"; 102 103 public static final String FAILURE_EXCEPTION = "failureException"; 104 105 protected String id; 106 107 /** Suspend requested by the work manager. */ 108 protected transient volatile boolean suspending; 109 110 /** Suspend acknowledged by the work instance. */ 111 protected transient volatile boolean suspended; 112 113 protected State state; 114 115 protected Progress progress; 116 117 /** Repository name for the Work instance, if relevant. */ 118 protected String repositoryName; 119 120 /** 121 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 122 * instance will act. 123 * <p> 124 * Either docId or docIds is set. Not both. 125 */ 126 protected String docId; 127 128 /** 129 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 130 * instance will act. 131 * <p> 132 * Either docId or docIds is set. Not both. 133 */ 134 protected List<String> docIds; 135 136 /** 137 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 138 */ 139 protected boolean isTree; 140 141 /** 142 * The originating username to use when opening the {@link CoreSession}. 143 * 144 * @since 8.1 145 */ 146 protected String originatingUsername; 147 148 protected String status; 149 150 protected long schedulingTime; 151 152 protected long startTime; 153 154 protected long completionTime; 155 156 protected transient CoreSession session; 157 158 protected transient NuxeoLoginContext loginContext; 159 160 protected WorkSchedulePath schedulePath; 161 162 protected String callerThread; 163 164 // @since 11.1 165 public static final String GLOBAL_DLQ_COUNT_REGISTRY_NAME = MetricRegistry.name("nuxeo", "works", "dlq").getKey(); 166 167 static { 168 // Initialize the metric so it is reported as 0 from start. 169 SharedMetricRegistries.getOrCreate(MetricsService.class.getName()).counter(GLOBAL_DLQ_COUNT_REGISTRY_NAME); 170 } 171 172 protected byte[] traceContext; 173 174 /** 175 * Constructs a {@link Work} instance with a unique id. 176 */ 177 public AbstractWork() { 178 // we user RANDOM to deal with these cases: 179 // - several calls in the time granularity of nanoTime() 180 // - several concurrent calls on different servers 181 this(System.nanoTime() + "." + (RANDOM.nextInt() & 0x7fffffff)); 182 } 183 184 public AbstractWork(String id) { 185 this.id = id; 186 progress = PROGRESS_INDETERMINATE; 187 schedulingTime = System.currentTimeMillis(); 188 callerThread = Thread.currentThread().getName(); 189 traceContext = Tracing.getPropagationComponent() 190 .getBinaryFormat() 191 .toByteArray(Tracing.getTracer().getCurrentSpan().getContext()); 192 } 193 194 @Override 195 public String getId() { 196 return id; 197 } 198 199 @Override 200 public WorkSchedulePath getSchedulePath() { 201 // schedulePath is transient so will become null after deserialization 202 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 203 } 204 205 @Override 206 public void setSchedulePath(WorkSchedulePath path) { 207 schedulePath = path; 208 } 209 210 public void setDocument(String repositoryName, String docId, boolean isTree) { 211 this.repositoryName = repositoryName; 212 this.docId = docId; 213 docIds = null; 214 this.isTree = isTree; 215 } 216 217 public void setDocument(String repositoryName, String docId) { 218 setDocument(repositoryName, docId, false); 219 } 220 221 public void setDocuments(String repositoryName, List<String> docIds) { 222 this.repositoryName = repositoryName; 223 docId = null; 224 this.docIds = new ArrayList<>(docIds); 225 } 226 227 /** 228 * @since 8.1 229 */ 230 public void setOriginatingUsername(String originatingUsername) { 231 this.originatingUsername = originatingUsername; 232 } 233 234 @Override 235 public void setWorkInstanceSuspending() { 236 suspending = true; 237 } 238 239 @Override 240 public boolean isSuspending() { 241 return suspending; 242 } 243 244 @Override 245 public void suspended() { 246 suspended = true; 247 } 248 249 @Override 250 public boolean isWorkInstanceSuspended() { 251 return suspended; 252 } 253 254 @Override 255 public void setWorkInstanceState(State state) { 256 this.state = state; 257 if (log.isTraceEnabled()) { 258 log.trace(this + " state=" + state); 259 } 260 } 261 262 @Override 263 public State getWorkInstanceState() { 264 return state; 265 } 266 267 @Override 268 public void setProgress(Progress progress) { 269 this.progress = progress; 270 if (log.isTraceEnabled()) { 271 log.trace(String.valueOf(this)); 272 } 273 } 274 275 @Override 276 public Progress getProgress() { 277 return progress; 278 } 279 280 /** 281 * Sets a human-readable status for this work instance. 282 * 283 * @param status the status 284 */ 285 public void setStatus(String status) { 286 this.status = status; 287 } 288 289 @Override 290 public String getStatus() { 291 return status; 292 } 293 294 /** 295 * May be called by implementing classes to open a session on the repository. 296 * 297 * @return the session (also available in {@code session} field) 298 * @deprecated since 8.1. Use {@link #openSystemSession()}. 299 */ 300 @Deprecated 301 public CoreSession initSession() { 302 return initSession(repositoryName); 303 } 304 305 /** 306 * May be called by implementing classes to open a System session on the repository. 307 * 308 * @since 8.1 309 */ 310 public void openSystemSession() { 311 loginContext = Framework.loginSystem(originatingUsername); 312 session = CoreInstance.getCoreSessionSystem(repositoryName, originatingUsername); 313 } 314 315 /** 316 * May be called by implementing classes to open a Use session on the repository. 317 * <p> 318 * It uses the set {@link #originatingUsername} to open the session. 319 * 320 * @since 8.1 321 */ 322 public void openUserSession() { 323 if (originatingUsername == null) { 324 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 325 } 326 327 try { 328 loginContext = Framework.loginUser(originatingUsername); 329 } catch (LoginException e) { 330 throw new NuxeoException(e); 331 } 332 333 session = CoreInstance.getCoreSession(repositoryName); 334 } 335 336 /** 337 * May be called by implementing classes to open a session on the given repository. 338 * 339 * @param repositoryName the repository name 340 * @return the session (also available in {@code session} field) 341 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 342 * otherwise use {@link CoreInstance#getCoreSessionSystem(String)}. 343 */ 344 @Deprecated 345 public CoreSession initSession(String repositoryName) { 346 session = CoreInstance.getCoreSessionSystem(repositoryName, originatingUsername); 347 return session; 348 } 349 350 /** 351 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 352 * 353 * @since 5.8 354 */ 355 public void closeSession() { 356 // loginContext may be null in tests 357 if (loginContext != null) { 358 loginContext.close(); 359 } 360 } 361 362 @Override 363 public void run() { 364 if (isSuspending()) { 365 // don't run anything if we're being started while a suspend 366 // has been requested 367 suspended(); 368 return; 369 } 370 Span span = getSpanFromContext(traceContext); 371 try (Scope scope = Tracing.getTracer().withSpan(span)) { 372 RuntimeException suppressed = null; 373 int retryCount = getRetryCount(); // may be 0 374 for (int i = 0; i <= retryCount; i++) { 375 if (i > 0) { 376 span.addAnnotation("AbstractWork#run Retrying " + i); 377 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 378 log.trace("Concurrent update", suppressed); 379 } 380 if (ExceptionUtils.hasInterruptedCause(suppressed)) { 381 log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down"); 382 break; 383 } 384 try { 385 runWorkWithTransaction(); 386 span.setStatus(Status.OK); 387 return; 388 } catch (RuntimeException e) { 389 span.addAnnotation("AbstractSession#run Failure: " + e.getMessage()); 390 span.setStatus(Status.UNKNOWN); 391 if (suppressed == null) { 392 suppressed = e; 393 } else { 394 suppressed.addSuppressed(e); 395 } 396 } 397 } 398 workFailed(suppressed); 399 } finally { 400 span.end(); 401 } 402 } 403 404 protected Span getSpanFromContext(byte[] traceContext) { 405 if (traceContext == null || traceContext.length == 0) { 406 return BlankSpan.INSTANCE; 407 } 408 Tracer tracer = Tracing.getTracer(); 409 BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat(); 410 try { 411 // followsFrom relationship 412 SpanContext spanContext = binaryFormat.fromByteArray(traceContext); 413 Span span = tracer.spanBuilderWithRemoteParent("work/" + getClass().getSimpleName(), spanContext) 414 .startSpan(); 415 span.addLink(Link.fromSpanContext(spanContext, Link.Type.PARENT_LINKED_SPAN)); 416 HashMap<String, AttributeValue> map = new HashMap<>(); 417 map.put("tx.thread", AttributeValue.stringAttributeValue(Thread.currentThread().getName())); 418 map.put("work.id", AttributeValue.stringAttributeValue(getId())); 419 map.put("work.category", AttributeValue.stringAttributeValue(getCategory())); 420 String title = getTitle(); 421 if (title != null) { 422 map.put("work.title", AttributeValue.stringAttributeValue(title)); 423 } 424 map.put("work.parent_path", AttributeValue.stringAttributeValue(getSchedulePath().getParentPath())); 425 map.put("work.caller_thread", AttributeValue.stringAttributeValue(callerThread)); 426 map.put("work.to_string", AttributeValue.stringAttributeValue(toString())); 427 if (docId != null) { 428 map.put("work.doc_id", AttributeValue.stringAttributeValue(docId)); 429 } 430 if (docIds != null && !docIds.isEmpty()) { 431 map.put("work.doc_count", AttributeValue.longAttributeValue(docIds.size())); 432 } 433 span.putAttributes(map); 434 return span; 435 } catch (SpanContextParseException e) { 436 log.warn("No span context " + traceContext.length); 437 return BlankSpan.INSTANCE; 438 } 439 } 440 441 /** 442 * Builds failure event properties. Work implementations can override this method to inject 443 * more event properties than the default. 444 * @since 10.1 445 */ 446 public Map<String, Serializable> buildWorkFailureEventProps(RuntimeException exception) { 447 448 Map<String, Serializable> eventProps = new HashMap<>(); 449 eventProps.put(WORK_INSTANCE, this); // Work objects are serializable so send the whole thing 450 451 if (session != null) { 452 eventProps.put(REPOSITORY_NAME, session.getRepositoryName()); 453 } 454 455 if (exception != null) { 456 eventProps.put(FAILURE_MSG, exception.getMessage()); 457 eventProps.put(FAILURE_EXCEPTION, exception.getClass().getName()); 458 } 459 return eventProps; 460 } 461 462 /** 463 * Called when the worker failed to run successfully even after retrying. 464 * @since 10.1 465 * @param exception the exception that occurred 466 */ 467 public void workFailed(RuntimeException exception) { 468 EventService service = Framework.getService(EventService.class); 469 EventContext eventContext = new EventContextImpl(null, session != null ? session.getPrincipal() : null); 470 eventContext.setProperties(buildWorkFailureEventProps(exception)); 471 Event event = new EventImpl(WORK_FAILED_EVENT, eventContext); 472 event.setIsCommitEvent(true); 473 service.fireEvent(event); 474 if (exception != null) { 475 appendWorkToDeadLetterQueue(); 476 String msg = "Work failed after " + getRetryCount() + " " + (getRetryCount() == 1 ? "retry" : "retries") + ", class=" 477 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 478 // all retries have been done, throw the exception 479 throw new NuxeoException(msg, exception); 480 } 481 } 482 483 protected void appendWorkToDeadLetterQueue() { 484 if (!State.RUNNING.equals(getWorkInstanceState())) { 485 // DLQ is only for Works executed by a WorkManager, in this case they are in RUNNING state. 486 return; 487 } 488 try { 489 String key = getCategory() + ":" + getId(); 490 StreamService service = Framework.getService(StreamService.class); 491 if (service != null) { 492 service.getLogManager() 493 .getAppender(DEAD_LETTER_QUEUE) 494 .append(key, Record.of(key, WorkComputation.serialize(this))); 495 MetricRegistry registry = SharedMetricRegistries.getOrCreate(MetricsService.class.getName()); 496 registry.counter(GLOBAL_DLQ_COUNT_REGISTRY_NAME).inc(); 497 } 498 } catch (IllegalArgumentException e) { 499 log.debug("No default log manager, don't save work in failure to a dead letter queue"); 500 } catch (Exception e) { 501 String message = "Failed to save work: " + getId() + " in dead letter queue"; 502 if (ExceptionUtils.hasInterruptedCause(e)) { 503 // During hot reload or forced shutdown the StreamService might be unavailable 504 // using warn level to prevent CI build to fail 505 log.warn(message, e); 506 } else { 507 log.error(message, e); 508 } 509 } 510 } 511 512 private String getTitleOr(String defaultTitle) { 513 try { 514 return getTitle(); 515 } catch (Exception e) { 516 return defaultTitle; 517 } 518 } 519 520 /** 521 * Does work under a transaction. 522 * 523 * @since 5.9.4 524 */ 525 protected void runWorkWithTransaction() { 526 TransactionHelper.startTransaction(); 527 boolean ok = false; 528 Exception exc = null; 529 try { 530 WorkSchedulePath.handleEnter(this); 531 // --- do work 532 setStartTime(); 533 work(); // may throw ConcurrentUpdateException 534 if (isGroupJoin() && WorkStateHelper.removeGroupJoinWork(getPartitionKey())) { 535 if (log.isDebugEnabled()) { 536 log.debug(String.format("Detecting GroupJoin %s completion Work: %s", getPartitionKey(), getId())); 537 } 538 onGroupJoinCompletion(); 539 } 540 ok = true; 541 // --- end work 542 } catch (Exception e) { 543 exc = e; 544 if (e instanceof RuntimeException) { 545 throw (RuntimeException) e; 546 } else if (e instanceof InterruptedException) { 547 // restore interrupted status for the thread pool worker 548 Thread.currentThread().interrupt(); 549 } 550 throw new RuntimeException(e); 551 } finally { 552 WorkSchedulePath.handleReturn(); 553 try { 554 setCompletionTime(); 555 cleanUp(ok, exc); 556 } finally { 557 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 558 if (!ok || isSuspending()) { 559 log.trace(this + " is suspending, rollbacking"); 560 TransactionHelper.setTransactionRollbackOnly(); 561 } 562 TransactionHelper.commitOrRollbackTransaction(); 563 } 564 } 565 } 566 } 567 568 @Override 569 public abstract void work(); 570 571 /** 572 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 573 * 574 * @return 0 for no retry, or more if some retries are possible 575 * @see #work 576 * @since 5.8 577 */ 578 public int getRetryCount() { 579 return 0; 580 } 581 582 /** 583 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 584 * error or was interrupted. 585 * 586 * @param ok {@code true} if the work completed normally 587 * @param e the exception, if available 588 */ 589 @Override 590 public void cleanUp(boolean ok, Exception e) { 591 if (!ok) { 592 if (ExceptionUtils.hasInterruptedCause(e)) { 593 log.debug("Interrupted work: " + this); 594 } else { 595 if (!(e instanceof ConcurrentUpdateException)) { 596 if (!isSuspending()) { 597 log.error("Exception during work: " + this, e); 598 if (WorkSchedulePath.isCaptureStackEnabled()) { 599 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 600 } 601 } 602 } 603 } 604 } 605 closeSession(); 606 } 607 608 @Override 609 public String getOriginatingUsername() { 610 return originatingUsername; 611 } 612 613 @Override 614 public long getSchedulingTime() { 615 return schedulingTime; 616 } 617 618 @Override 619 public long getStartTime() { 620 return startTime; 621 } 622 623 @Override 624 public long getCompletionTime() { 625 return completionTime; 626 } 627 628 @Override 629 public void setStartTime() { 630 startTime = System.currentTimeMillis(); 631 } 632 633 protected void setCompletionTime() { 634 completionTime = System.currentTimeMillis(); 635 } 636 637 @Override 638 public String getCategory() { 639 return getClass().getSimpleName(); 640 } 641 642 @Override 643 public String toString() { 644 StringBuilder sb = new StringBuilder(); 645 sb.append(getClass().getSimpleName()); 646 sb.append('('); 647 if (docId != null) { 648 sb.append(docId); 649 sb.append(", "); 650 } else if (docIds != null && docIds.size() > 0) { 651 sb.append(docIds.get(0)); 652 sb.append("..., "); 653 } 654 sb.append(getSchedulePath().getParentPath()); 655 sb.append(", "); 656 sb.append(getProgress()); 657 sb.append(", "); 658 sb.append(getStatus()); 659 sb.append(')'); 660 return sb.toString(); 661 } 662 663 @Override 664 public DocumentLocation getDocument() { 665 if (docId != null) { 666 return newDocumentLocation(docId); 667 } 668 return null; 669 } 670 671 @Override 672 public List<DocumentLocation> getDocuments() { 673 if (docIds != null) { 674 List<DocumentLocation> res = new ArrayList<>(docIds.size()); 675 for (String docId : docIds) { 676 res.add(newDocumentLocation(docId)); 677 } 678 return res; 679 } 680 if (docId != null) { 681 return Collections.singletonList(newDocumentLocation(docId)); 682 } 683 return Collections.emptyList(); 684 } 685 686 protected DocumentLocation newDocumentLocation(String docId) { 687 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 688 } 689 690 @Override 691 public boolean isDocumentTree() { 692 return isTree; 693 } 694 695 /** 696 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 697 * running a long process. 698 */ 699 public void commitOrRollbackTransaction() { 700 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 701 TransactionHelper.commitOrRollbackTransaction(); 702 } 703 } 704 705 /** 706 * Starts a new transaction. 707 * <p> 708 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 709 * process. 710 * 711 * @return true if a new transaction was started 712 */ 713 public boolean startTransaction() { 714 return TransactionHelper.startTransaction(); 715 } 716 717 @Override 718 public boolean equals(Object other) { 719 if (!(other instanceof Work)) { 720 return false; 721 } 722 return ((Work) other).getId().equals(getId()); 723 } 724 725 @Override 726 public int hashCode() { 727 return getId().hashCode(); 728 } 729 730 @Override 731 public String getPartitionKey() { 732 if (docId != null) { 733 return docId; 734 } else if (docIds != null && !docIds.isEmpty()) { 735 return docIds.get(0); 736 } 737 return getId(); 738 } 739}