001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.api.event.CoreEventConstants.REPOSITORY_NAME; 022import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 023 024import java.io.Serializable; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Random; 031 032import javax.security.auth.login.LoginContext; 033import javax.security.auth.login.LoginException; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.nuxeo.common.logging.SequenceTracer; 038import org.nuxeo.common.utils.ExceptionUtils; 039import org.nuxeo.ecm.core.api.CloseableCoreSession; 040import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 041import org.nuxeo.ecm.core.api.CoreInstance; 042import org.nuxeo.ecm.core.api.CoreSession; 043import org.nuxeo.ecm.core.api.DocumentLocation; 044import org.nuxeo.ecm.core.api.IdRef; 045import org.nuxeo.ecm.core.api.NuxeoException; 046import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 047import org.nuxeo.ecm.core.event.Event; 048import org.nuxeo.ecm.core.event.EventContext; 049import org.nuxeo.ecm.core.event.EventService; 050import org.nuxeo.ecm.core.event.impl.EventContextImpl; 051import org.nuxeo.ecm.core.event.impl.EventImpl; 052import org.nuxeo.ecm.core.work.api.Work; 053import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 054import org.nuxeo.runtime.api.Framework; 055import org.nuxeo.runtime.transaction.TransactionHelper; 056 057/** 058 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 059 * <p> 060 * It also deals with transaction management, and prevents running work instances that are suspending. 061 * <p> 062 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 063 * available. 064 * <p> 065 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 066 * state and call {@link #suspended()}. 067 * <p> 068 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 069 * 070 * @since 5.6 071 */ 072public abstract class AbstractWork implements Work { 073 074 private static final long serialVersionUID = 1L; 075 076 private static final Log log = LogFactory.getLog(AbstractWork.class); 077 078 protected static final Random RANDOM = new Random(); 079 080 public static final String WORK_FAILED_EVENT = "workFailed"; 081 082 public static final String WORK_INSTANCE = "workInstance"; 083 084 public static final String FAILURE_MSG = "failureMsg"; 085 086 public static final String FAILURE_EXCEPTION = "failureException"; 087 088 protected String id; 089 090 /** Suspend requested by the work manager. */ 091 protected transient volatile boolean suspending; 092 093 /** Suspend acknowledged by the work instance. */ 094 protected transient volatile boolean suspended; 095 096 protected State state; 097 098 protected Progress progress; 099 100 /** Repository name for the Work instance, if relevant. */ 101 protected String repositoryName; 102 103 /** 104 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 105 * instance will act. 106 * <p> 107 * Either docId or docIds is set. Not both. 108 */ 109 protected String docId; 110 111 /** 112 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 113 * instance will act. 114 * <p> 115 * Either docId or docIds is set. Not both. 116 */ 117 protected List<String> docIds; 118 119 /** 120 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 121 */ 122 protected boolean isTree; 123 124 /** 125 * The originating username to use when opening the {@link CoreSession}. 126 * 127 * @since 8.1 128 */ 129 protected String originatingUsername; 130 131 protected String status; 132 133 protected long schedulingTime; 134 135 protected long startTime; 136 137 protected long completionTime; 138 139 protected transient CoreSession session; 140 141 protected transient LoginContext loginContext; 142 143 protected WorkSchedulePath schedulePath; 144 145 protected String callerThread; 146 147 /** 148 * Constructs a {@link Work} instance with a unique id. 149 */ 150 public AbstractWork() { 151 // we user RANDOM to deal with these cases: 152 // - several calls in the time granularity of nanoTime() 153 // - several concurrent calls on different servers 154 this(System.nanoTime() + "." + (RANDOM.nextInt() & 0x7fffffff)); 155 callerThread = SequenceTracer.getThreadName(); 156 } 157 158 public AbstractWork(String id) { 159 this.id = id; 160 progress = PROGRESS_INDETERMINATE; 161 schedulingTime = System.currentTimeMillis(); 162 callerThread = SequenceTracer.getThreadName(); 163 } 164 165 @Override 166 public String getId() { 167 return id; 168 } 169 170 @Override 171 public WorkSchedulePath getSchedulePath() { 172 // schedulePath is transient so will become null after deserialization 173 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 174 } 175 176 @Override 177 public void setSchedulePath(WorkSchedulePath path) { 178 schedulePath = path; 179 } 180 181 public void setDocument(String repositoryName, String docId, boolean isTree) { 182 this.repositoryName = repositoryName; 183 this.docId = docId; 184 docIds = null; 185 this.isTree = isTree; 186 } 187 188 public void setDocument(String repositoryName, String docId) { 189 setDocument(repositoryName, docId, false); 190 } 191 192 public void setDocuments(String repositoryName, List<String> docIds) { 193 this.repositoryName = repositoryName; 194 docId = null; 195 this.docIds = new ArrayList<>(docIds); 196 } 197 198 /** 199 * @since 8.1 200 */ 201 public void setOriginatingUsername(String originatingUsername) { 202 this.originatingUsername = originatingUsername; 203 } 204 205 @Override 206 public void setWorkInstanceSuspending() { 207 suspending = true; 208 } 209 210 @Override 211 public boolean isSuspending() { 212 return suspending; 213 } 214 215 @Override 216 public void suspended() { 217 suspended = true; 218 } 219 220 @Override 221 public boolean isWorkInstanceSuspended() { 222 return suspended; 223 } 224 225 @Override 226 public void setWorkInstanceState(State state) { 227 this.state = state; 228 if (log.isTraceEnabled()) { 229 log.trace(this + " state=" + state); 230 } 231 } 232 233 @Override 234 public State getWorkInstanceState() { 235 return state; 236 } 237 238 @Override 239 public void setProgress(Progress progress) { 240 this.progress = progress; 241 if (log.isTraceEnabled()) { 242 log.trace(String.valueOf(this)); 243 } 244 } 245 246 @Override 247 public Progress getProgress() { 248 return progress; 249 } 250 251 /** 252 * Sets a human-readable status for this work instance. 253 * 254 * @param status the status 255 */ 256 public void setStatus(String status) { 257 this.status = status; 258 } 259 260 @Override 261 public String getStatus() { 262 return status; 263 } 264 265 /** 266 * May be called by implementing classes to open a session on the repository. 267 * 268 * @return the session (also available in {@code session} field) 269 * @deprecated since 8.1. Use {@link #openSystemSession()}. 270 */ 271 @Deprecated 272 public CoreSession initSession() { 273 return initSession(repositoryName); 274 } 275 276 /** 277 * May be called by implementing classes to open a System session on the repository. 278 * 279 * @since 8.1 280 */ 281 public void openSystemSession() { 282 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 283 } 284 285 /** 286 * May be called by implementing classes to open a Use session on the repository. 287 * <p> 288 * It uses the set {@link #originatingUsername} to open the session. 289 * 290 * @since 8.1 291 */ 292 public void openUserSession() { 293 if (originatingUsername == null) { 294 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 295 } 296 297 try { 298 loginContext = Framework.loginAsUser(originatingUsername); 299 } catch (LoginException e) { 300 throw new NuxeoException(e); 301 } 302 303 session = CoreInstance.openCoreSession(repositoryName); 304 } 305 306 /** 307 * May be called by implementing classes to open a session on the given repository. 308 * 309 * @param repositoryName the repository name 310 * @return the session (also available in {@code session} field) 311 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 312 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 313 */ 314 @Deprecated 315 public CoreSession initSession(String repositoryName) { 316 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 317 return session; 318 } 319 320 /** 321 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 322 * 323 * @since 5.8 324 */ 325 public void closeSession() { 326 if (session != null) { 327 ((CloseableCoreSession) session).close(); 328 session = null; 329 } 330 } 331 332 @Override 333 public void run() { 334 if (isSuspending()) { 335 // don't run anything if we're being started while a suspend 336 // has been requested 337 suspended(); 338 return; 339 } 340 if (SequenceTracer.isEnabled()) { 341 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 342 } 343 RuntimeException suppressed = null; 344 int retryCount = getRetryCount(); // may be 0 345 for (int i = 0; i <= retryCount; i++) { 346 if (i > 0) { 347 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 348 log.trace("Concurrent update", suppressed); 349 } 350 if (ExceptionUtils.hasInterruptedCause(suppressed)) { 351 // if we're here suppressed != null so we destroy SequenceTracer 352 log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down"); 353 break; 354 } 355 try { 356 runWorkWithTransaction(); 357 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 358 return; 359 } catch (RuntimeException e) { 360 if (suppressed == null) { 361 suppressed = e; 362 } else { 363 suppressed.addSuppressed(e); 364 } 365 } 366 } 367 368 workFailed(suppressed); 369 } 370 371 /** 372 * Builds failure event properties. Work implementations can override this method to inject 373 * more event properties than the default. 374 * @since 10.1 375 */ 376 public Map<String, Serializable> buildWorkFailureEventProps(RuntimeException exception) { 377 378 Map<String, Serializable> eventProps = new HashMap<>(); 379 eventProps.put(WORK_INSTANCE, this); // Work objects are serializable so send the whole thing 380 381 if (session != null) { 382 eventProps.put(REPOSITORY_NAME, session.getRepositoryName()); 383 } 384 385 if (exception != null) { 386 eventProps.put(FAILURE_MSG, exception.getMessage()); 387 eventProps.put(FAILURE_EXCEPTION, exception.getClass().getName()); 388 } 389 return eventProps; 390 } 391 392 /** 393 * Called when the worker failed to run successfully even after retrying. 394 * @since 10.1 395 * @param exception the exception that occurred 396 */ 397 public void workFailed(RuntimeException exception) { 398 399 EventService service = Framework.getService(EventService.class); 400 EventContext eventContext = new EventContextImpl(null, session != null ? session.getPrincipal() : null); 401 eventContext.setProperties(buildWorkFailureEventProps(exception)); 402 Event event = new EventImpl(WORK_FAILED_EVENT, eventContext); 403 event.setIsCommitEvent(true); 404 service.fireEvent(event); 405 406 if (exception != null) { 407 String msg = "Work failed after " + getRetryCount() + " " + (getRetryCount() == 1 ? "retry" : "retries") + ", class=" 408 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 409 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 410 // all retries have been done, throw the exception 411 throw new NuxeoException(msg, exception); 412 } 413 } 414 415 private String getTitleOr(String defaultTitle) { 416 try { 417 return getTitle(); 418 } catch (Exception e) { 419 return defaultTitle; 420 } 421 } 422 423 /** 424 * Does work under a transaction. 425 * 426 * @since 5.9.4 427 */ 428 protected void runWorkWithTransaction() { 429 TransactionHelper.startTransaction(); 430 boolean ok = false; 431 Exception exc = null; 432 try { 433 WorkSchedulePath.handleEnter(this); 434 // --- do work 435 setStartTime(); 436 work(); // may throw ConcurrentUpdateException 437 ok = true; 438 // --- end work 439 } catch (Exception e) { 440 exc = e; 441 if (e instanceof RuntimeException) { 442 throw (RuntimeException) e; 443 } else if (e instanceof InterruptedException) { 444 // restore interrupted status for the thread pool worker 445 Thread.currentThread().interrupt(); 446 } 447 throw new RuntimeException(e); 448 } finally { 449 WorkSchedulePath.handleReturn(); 450 try { 451 setCompletionTime(); 452 cleanUp(ok, exc); 453 } finally { 454 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 455 if (!ok || isSuspending()) { 456 log.trace(this + " is suspending, rollbacking"); 457 TransactionHelper.setTransactionRollbackOnly(); 458 } 459 TransactionHelper.commitOrRollbackTransaction(); 460 } 461 } 462 } 463 } 464 465 @Override 466 public abstract void work(); 467 468 /** 469 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 470 * 471 * @return 0 for no retry, or more if some retries are possible 472 * @see #work 473 * @since 5.8 474 */ 475 public int getRetryCount() { 476 return 0; 477 } 478 479 /** 480 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 481 * error or was interrupted. 482 * 483 * @param ok {@code true} if the work completed normally 484 * @param e the exception, if available 485 */ 486 @Override 487 public void cleanUp(boolean ok, Exception e) { 488 if (!ok) { 489 if (ExceptionUtils.hasInterruptedCause(e)) { 490 log.debug("Interrupted work: " + this); 491 } else { 492 if (!(e instanceof ConcurrentUpdateException)) { 493 if (!isSuspending()) { 494 log.error("Exception during work: " + this, e); 495 if (WorkSchedulePath.isCaptureStackEnabled()) { 496 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 497 } 498 } 499 } 500 } 501 } 502 closeSession(); 503 504 try { 505 // loginContext may be null in tests 506 if (loginContext != null) { 507 loginContext.logout(); 508 } 509 } catch (LoginException le) { 510 throw new NuxeoException(le); 511 } 512 } 513 514 @Override 515 public String getOriginatingUsername() { 516 return originatingUsername; 517 } 518 519 @Override 520 public long getSchedulingTime() { 521 return schedulingTime; 522 } 523 524 @Override 525 public long getStartTime() { 526 return startTime; 527 } 528 529 @Override 530 public long getCompletionTime() { 531 return completionTime; 532 } 533 534 @Override 535 public void setStartTime() { 536 startTime = System.currentTimeMillis(); 537 } 538 539 protected void setCompletionTime() { 540 completionTime = System.currentTimeMillis(); 541 } 542 543 @Override 544 public String getCategory() { 545 return getClass().getSimpleName(); 546 } 547 548 @Override 549 public String toString() { 550 StringBuilder buf = new StringBuilder(); 551 buf.append(getClass().getSimpleName()); 552 buf.append('('); 553 if (docId != null) { 554 buf.append(docId); 555 buf.append(", "); 556 } else if (docIds != null && docIds.size() > 0) { 557 buf.append(docIds.get(0)); 558 buf.append("..., "); 559 } 560 buf.append(getSchedulePath().getParentPath()); 561 buf.append(", "); 562 buf.append(getProgress()); 563 buf.append(", "); 564 buf.append(getStatus()); 565 buf.append(')'); 566 return buf.toString(); 567 } 568 569 @Override 570 public DocumentLocation getDocument() { 571 if (docId != null) { 572 return newDocumentLocation(docId); 573 } 574 return null; 575 } 576 577 @Override 578 public List<DocumentLocation> getDocuments() { 579 if (docIds != null) { 580 List<DocumentLocation> res = new ArrayList<>(docIds.size()); 581 for (String docId : docIds) { 582 res.add(newDocumentLocation(docId)); 583 } 584 return res; 585 } 586 if (docId != null) { 587 return Collections.singletonList(newDocumentLocation(docId)); 588 } 589 return Collections.emptyList(); 590 } 591 592 protected DocumentLocation newDocumentLocation(String docId) { 593 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 594 } 595 596 @Override 597 public boolean isDocumentTree() { 598 return isTree; 599 } 600 601 /** 602 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 603 * running a long process. 604 */ 605 public void commitOrRollbackTransaction() { 606 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 607 TransactionHelper.commitOrRollbackTransaction(); 608 } 609 } 610 611 /** 612 * Starts a new transaction. 613 * <p> 614 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 615 * process. 616 * 617 * @return true if a new transaction was started 618 */ 619 public boolean startTransaction() { 620 return TransactionHelper.startTransaction(); 621 } 622 623 @Override 624 public boolean equals(Object other) { 625 if (!(other instanceof Work)) { 626 return false; 627 } 628 return ((Work) other).getId().equals(getId()); 629 } 630 631 @Override 632 public int hashCode() { 633 return getId().hashCode(); 634 } 635 636 @Override 637 public String getPartitionKey() { 638 if (docId != null) { 639 return docId; 640 } else if (docIds != null && !docIds.isEmpty()) { 641 return docIds.get(0); 642 } 643 return getId(); 644 } 645}