001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.api.event.CoreEventConstants.REPOSITORY_NAME; 022import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 023 024import java.io.Serializable; 025import java.security.SecureRandom; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Random; 032 033import javax.security.auth.login.LoginContext; 034import javax.security.auth.login.LoginException; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.nuxeo.common.logging.SequenceTracer; 039import org.nuxeo.common.utils.ExceptionUtils; 040import org.nuxeo.ecm.core.api.CloseableCoreSession; 041import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 042import org.nuxeo.ecm.core.api.CoreInstance; 043import org.nuxeo.ecm.core.api.CoreSession; 044import org.nuxeo.ecm.core.api.DocumentLocation; 045import org.nuxeo.ecm.core.api.IdRef; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 048import org.nuxeo.ecm.core.event.Event; 049import org.nuxeo.ecm.core.event.EventContext; 050import org.nuxeo.ecm.core.event.EventService; 051import org.nuxeo.ecm.core.event.impl.EventContextImpl; 052import org.nuxeo.ecm.core.event.impl.EventImpl; 053import org.nuxeo.ecm.core.work.api.Work; 054import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 055import org.nuxeo.runtime.api.Framework; 056import org.nuxeo.runtime.transaction.TransactionHelper; 057 058/** 059 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 060 * <p> 061 * It also deals with transaction management, and prevents running work instances that are suspending. 062 * <p> 063 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 064 * available. 065 * <p> 066 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 067 * state and call {@link #suspended()}. 068 * <p> 069 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 070 * 071 * @since 5.6 072 */ 073public abstract class AbstractWork implements Work { 074 075 private static final long serialVersionUID = 1L; 076 077 private static final Log log = LogFactory.getLog(AbstractWork.class); 078 079 protected static final Random RANDOM = new SecureRandom(); 080 081 public static final String WORK_FAILED_EVENT = "workFailed"; 082 083 public static final String WORK_INSTANCE = "workInstance"; 084 085 public static final String FAILURE_MSG = "failureMsg"; 086 087 public static final String FAILURE_EXCEPTION = "failureException"; 088 089 protected String id; 090 091 /** Suspend requested by the work manager. */ 092 protected transient volatile boolean suspending; 093 094 /** Suspend acknowledged by the work instance. */ 095 protected transient volatile boolean suspended; 096 097 protected State state; 098 099 protected Progress progress; 100 101 /** Repository name for the Work instance, if relevant. */ 102 protected String repositoryName; 103 104 /** 105 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 106 * instance will act. 107 * <p> 108 * Either docId or docIds is set. Not both. 109 */ 110 protected String docId; 111 112 /** 113 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 114 * instance will act. 115 * <p> 116 * Either docId or docIds is set. Not both. 117 */ 118 protected List<String> docIds; 119 120 /** 121 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 122 */ 123 protected boolean isTree; 124 125 /** 126 * The originating username to use when opening the {@link CoreSession}. 127 * 128 * @since 8.1 129 */ 130 protected String originatingUsername; 131 132 protected String status; 133 134 protected long schedulingTime; 135 136 protected long startTime; 137 138 protected long completionTime; 139 140 protected transient CoreSession session; 141 142 protected transient LoginContext loginContext; 143 144 protected WorkSchedulePath schedulePath; 145 146 protected String callerThread; 147 148 /** 149 * Constructs a {@link Work} instance with a unique id. 150 */ 151 public AbstractWork() { 152 // we user RANDOM to deal with these cases: 153 // - several calls in the time granularity of nanoTime() 154 // - several concurrent calls on different servers 155 this(System.nanoTime() + "." + (RANDOM.nextInt() & 0x7fffffff)); 156 callerThread = SequenceTracer.getThreadName(); 157 } 158 159 public AbstractWork(String id) { 160 this.id = id; 161 progress = PROGRESS_INDETERMINATE; 162 schedulingTime = System.currentTimeMillis(); 163 callerThread = SequenceTracer.getThreadName(); 164 } 165 166 @Override 167 public String getId() { 168 return id; 169 } 170 171 @Override 172 public WorkSchedulePath getSchedulePath() { 173 // schedulePath is transient so will become null after deserialization 174 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 175 } 176 177 @Override 178 public void setSchedulePath(WorkSchedulePath path) { 179 schedulePath = path; 180 } 181 182 public void setDocument(String repositoryName, String docId, boolean isTree) { 183 this.repositoryName = repositoryName; 184 this.docId = docId; 185 docIds = null; 186 this.isTree = isTree; 187 } 188 189 public void setDocument(String repositoryName, String docId) { 190 setDocument(repositoryName, docId, false); 191 } 192 193 public void setDocuments(String repositoryName, List<String> docIds) { 194 this.repositoryName = repositoryName; 195 docId = null; 196 this.docIds = new ArrayList<>(docIds); 197 } 198 199 /** 200 * @since 8.1 201 */ 202 public void setOriginatingUsername(String originatingUsername) { 203 this.originatingUsername = originatingUsername; 204 } 205 206 @Override 207 public void setWorkInstanceSuspending() { 208 suspending = true; 209 } 210 211 @Override 212 public boolean isSuspending() { 213 return suspending; 214 } 215 216 @Override 217 public void suspended() { 218 suspended = true; 219 } 220 221 @Override 222 public boolean isWorkInstanceSuspended() { 223 return suspended; 224 } 225 226 @Override 227 public void setWorkInstanceState(State state) { 228 this.state = state; 229 if (log.isTraceEnabled()) { 230 log.trace(this + " state=" + state); 231 } 232 } 233 234 @Override 235 public State getWorkInstanceState() { 236 return state; 237 } 238 239 @Override 240 public void setProgress(Progress progress) { 241 this.progress = progress; 242 if (log.isTraceEnabled()) { 243 log.trace(String.valueOf(this)); 244 } 245 } 246 247 @Override 248 public Progress getProgress() { 249 return progress; 250 } 251 252 /** 253 * Sets a human-readable status for this work instance. 254 * 255 * @param status the status 256 */ 257 public void setStatus(String status) { 258 this.status = status; 259 } 260 261 @Override 262 public String getStatus() { 263 return status; 264 } 265 266 /** 267 * May be called by implementing classes to open a session on the repository. 268 * 269 * @return the session (also available in {@code session} field) 270 * @deprecated since 8.1. Use {@link #openSystemSession()}. 271 */ 272 @Deprecated 273 public CoreSession initSession() { 274 return initSession(repositoryName); 275 } 276 277 /** 278 * May be called by implementing classes to open a System session on the repository. 279 * 280 * @since 8.1 281 */ 282 public void openSystemSession() { 283 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 284 } 285 286 /** 287 * May be called by implementing classes to open a Use session on the repository. 288 * <p> 289 * It uses the set {@link #originatingUsername} to open the session. 290 * 291 * @since 8.1 292 */ 293 public void openUserSession() { 294 if (originatingUsername == null) { 295 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 296 } 297 298 try { 299 loginContext = Framework.loginAsUser(originatingUsername); 300 } catch (LoginException e) { 301 throw new NuxeoException(e); 302 } 303 304 session = CoreInstance.openCoreSession(repositoryName); 305 } 306 307 /** 308 * May be called by implementing classes to open a session on the given repository. 309 * 310 * @param repositoryName the repository name 311 * @return the session (also available in {@code session} field) 312 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 313 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 314 */ 315 @Deprecated 316 public CoreSession initSession(String repositoryName) { 317 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 318 return session; 319 } 320 321 /** 322 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 323 * 324 * @since 5.8 325 */ 326 public void closeSession() { 327 if (session != null) { 328 ((CloseableCoreSession) session).close(); 329 session = null; 330 } 331 } 332 333 @Override 334 public void run() { 335 if (isSuspending()) { 336 // don't run anything if we're being started while a suspend 337 // has been requested 338 suspended(); 339 return; 340 } 341 if (SequenceTracer.isEnabled()) { 342 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 343 } 344 RuntimeException suppressed = null; 345 int retryCount = getRetryCount(); // may be 0 346 for (int i = 0; i <= retryCount; i++) { 347 if (i > 0) { 348 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 349 log.trace("Concurrent update", suppressed); 350 } 351 if (ExceptionUtils.hasInterruptedCause(suppressed)) { 352 // if we're here suppressed != null so we destroy SequenceTracer 353 log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down"); 354 break; 355 } 356 try { 357 runWorkWithTransaction(); 358 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 359 return; 360 } catch (RuntimeException e) { 361 if (suppressed == null) { 362 suppressed = e; 363 } else { 364 suppressed.addSuppressed(e); 365 } 366 } 367 } 368 369 workFailed(suppressed); 370 } 371 372 /** 373 * Builds failure event properties. Work implementations can override this method to inject 374 * more event properties than the default. 375 * @since 10.1 376 */ 377 public Map<String, Serializable> buildWorkFailureEventProps(RuntimeException exception) { 378 379 Map<String, Serializable> eventProps = new HashMap<>(); 380 eventProps.put(WORK_INSTANCE, this); // Work objects are serializable so send the whole thing 381 382 if (session != null) { 383 eventProps.put(REPOSITORY_NAME, session.getRepositoryName()); 384 } 385 386 if (exception != null) { 387 eventProps.put(FAILURE_MSG, exception.getMessage()); 388 eventProps.put(FAILURE_EXCEPTION, exception.getClass().getName()); 389 } 390 return eventProps; 391 } 392 393 /** 394 * Called when the worker failed to run successfully even after retrying. 395 * @since 10.1 396 * @param exception the exception that occurred 397 */ 398 public void workFailed(RuntimeException exception) { 399 400 EventService service = Framework.getService(EventService.class); 401 EventContext eventContext = new EventContextImpl(null, session != null ? session.getPrincipal() : null); 402 eventContext.setProperties(buildWorkFailureEventProps(exception)); 403 Event event = new EventImpl(WORK_FAILED_EVENT, eventContext); 404 event.setIsCommitEvent(true); 405 service.fireEvent(event); 406 407 if (exception != null) { 408 String msg = "Work failed after " + getRetryCount() + " " + (getRetryCount() == 1 ? "retry" : "retries") + ", class=" 409 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 410 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 411 // all retries have been done, throw the exception 412 throw new NuxeoException(msg, exception); 413 } 414 } 415 416 private String getTitleOr(String defaultTitle) { 417 try { 418 return getTitle(); 419 } catch (Exception e) { 420 return defaultTitle; 421 } 422 } 423 424 /** 425 * Does work under a transaction. 426 * 427 * @since 5.9.4 428 */ 429 protected void runWorkWithTransaction() { 430 TransactionHelper.startTransaction(); 431 boolean ok = false; 432 Exception exc = null; 433 try { 434 WorkSchedulePath.handleEnter(this); 435 // --- do work 436 setStartTime(); 437 work(); // may throw ConcurrentUpdateException 438 ok = true; 439 // --- end work 440 } catch (Exception e) { 441 exc = e; 442 if (e instanceof RuntimeException) { 443 throw (RuntimeException) e; 444 } else if (e instanceof InterruptedException) { 445 // restore interrupted status for the thread pool worker 446 Thread.currentThread().interrupt(); 447 } 448 throw new RuntimeException(e); 449 } finally { 450 WorkSchedulePath.handleReturn(); 451 try { 452 setCompletionTime(); 453 cleanUp(ok, exc); 454 } finally { 455 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 456 if (!ok || isSuspending()) { 457 log.trace(this + " is suspending, rollbacking"); 458 TransactionHelper.setTransactionRollbackOnly(); 459 } 460 TransactionHelper.commitOrRollbackTransaction(); 461 } 462 } 463 } 464 } 465 466 @Override 467 public abstract void work(); 468 469 /** 470 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 471 * 472 * @return 0 for no retry, or more if some retries are possible 473 * @see #work 474 * @since 5.8 475 */ 476 public int getRetryCount() { 477 return 0; 478 } 479 480 /** 481 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 482 * error or was interrupted. 483 * 484 * @param ok {@code true} if the work completed normally 485 * @param e the exception, if available 486 */ 487 @Override 488 public void cleanUp(boolean ok, Exception e) { 489 if (!ok) { 490 if (ExceptionUtils.hasInterruptedCause(e)) { 491 log.debug("Interrupted work: " + this); 492 } else { 493 if (!(e instanceof ConcurrentUpdateException)) { 494 if (!isSuspending()) { 495 log.error("Exception during work: " + this, e); 496 if (WorkSchedulePath.isCaptureStackEnabled()) { 497 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 498 } 499 } 500 } 501 } 502 } 503 closeSession(); 504 505 try { 506 // loginContext may be null in tests 507 if (loginContext != null) { 508 loginContext.logout(); 509 } 510 } catch (LoginException le) { 511 throw new NuxeoException(le); 512 } 513 } 514 515 @Override 516 public String getOriginatingUsername() { 517 return originatingUsername; 518 } 519 520 @Override 521 public long getSchedulingTime() { 522 return schedulingTime; 523 } 524 525 @Override 526 public long getStartTime() { 527 return startTime; 528 } 529 530 @Override 531 public long getCompletionTime() { 532 return completionTime; 533 } 534 535 @Override 536 public void setStartTime() { 537 startTime = System.currentTimeMillis(); 538 } 539 540 protected void setCompletionTime() { 541 completionTime = System.currentTimeMillis(); 542 } 543 544 @Override 545 public String getCategory() { 546 return getClass().getSimpleName(); 547 } 548 549 @Override 550 public String toString() { 551 StringBuilder buf = new StringBuilder(); 552 buf.append(getClass().getSimpleName()); 553 buf.append('('); 554 if (docId != null) { 555 buf.append(docId); 556 buf.append(", "); 557 } else if (docIds != null && docIds.size() > 0) { 558 buf.append(docIds.get(0)); 559 buf.append("..., "); 560 } 561 buf.append(getSchedulePath().getParentPath()); 562 buf.append(", "); 563 buf.append(getProgress()); 564 buf.append(", "); 565 buf.append(getStatus()); 566 buf.append(')'); 567 return buf.toString(); 568 } 569 570 @Override 571 public DocumentLocation getDocument() { 572 if (docId != null) { 573 return newDocumentLocation(docId); 574 } 575 return null; 576 } 577 578 @Override 579 public List<DocumentLocation> getDocuments() { 580 if (docIds != null) { 581 List<DocumentLocation> res = new ArrayList<>(docIds.size()); 582 for (String docId : docIds) { 583 res.add(newDocumentLocation(docId)); 584 } 585 return res; 586 } 587 if (docId != null) { 588 return Collections.singletonList(newDocumentLocation(docId)); 589 } 590 return Collections.emptyList(); 591 } 592 593 protected DocumentLocation newDocumentLocation(String docId) { 594 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 595 } 596 597 @Override 598 public boolean isDocumentTree() { 599 return isTree; 600 } 601 602 /** 603 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 604 * running a long process. 605 */ 606 public void commitOrRollbackTransaction() { 607 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 608 TransactionHelper.commitOrRollbackTransaction(); 609 } 610 } 611 612 /** 613 * Starts a new transaction. 614 * <p> 615 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 616 * process. 617 * 618 * @return true if a new transaction was started 619 */ 620 public boolean startTransaction() { 621 return TransactionHelper.startTransaction(); 622 } 623 624 @Override 625 public boolean equals(Object other) { 626 if (!(other instanceof Work)) { 627 return false; 628 } 629 return ((Work) other).getId().equals(getId()); 630 } 631 632 @Override 633 public int hashCode() { 634 return getId().hashCode(); 635 } 636 637 @Override 638 public String getPartitionKey() { 639 if (docId != null) { 640 return docId; 641 } else if (docIds != null && !docIds.isEmpty()) { 642 return docIds.get(0); 643 } 644 return getId(); 645 } 646}