001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Random; 027 028import javax.security.auth.login.LoginContext; 029import javax.security.auth.login.LoginException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.common.logging.SequenceTracer; 034import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentLocation; 038import org.nuxeo.ecm.core.api.IdRef; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 041import org.nuxeo.ecm.core.work.api.Work; 042import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045import org.nuxeo.runtime.transaction.TransactionRuntimeException; 046 047/** 048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 049 * <p> 050 * It also deals with transaction management, and prevents running work instances that are suspending. 051 * <p> 052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 053 * available. 054 * <p> 055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 056 * state and call {@link #suspended()}. 057 * <p> 058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 059 * 060 * @since 5.6 061 */ 062public abstract class AbstractWork implements Work { 063 064 private static final long serialVersionUID = 1L; 065 066 private static final Log log = LogFactory.getLog(AbstractWork.class); 067 068 protected static final Random RANDOM = new Random(); 069 070 protected String id; 071 072 /** Suspend requested by the work manager. */ 073 protected transient volatile boolean suspending; 074 075 /** Suspend acknowledged by the work instance. */ 076 protected transient volatile boolean suspended; 077 078 protected State state; 079 080 protected Progress progress; 081 082 /** Repository name for the Work instance, if relevant. */ 083 protected String repositoryName; 084 085 /** 086 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 087 * instance will act. 088 * <p> 089 * Either docId or docIds is set. Not both. 090 */ 091 protected String docId; 092 093 /** 094 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 095 * instance will act. 096 * <p> 097 * Either docId or docIds is set. Not both. 098 */ 099 protected List<String> docIds; 100 101 /** 102 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 103 */ 104 protected boolean isTree; 105 106 /** 107 * The originating username to use when opening the {@link CoreSession}. 108 * 109 * @since 8.1 110 */ 111 protected String originatingUsername; 112 113 protected String status; 114 115 protected long schedulingTime; 116 117 protected long startTime; 118 119 protected long completionTime; 120 121 protected transient CoreSession session; 122 123 protected transient LoginContext loginContext; 124 125 protected WorkSchedulePath schedulePath; 126 127 protected String callerThread; 128 129 /** 130 * Constructs a {@link Work} instance with a unique id. 131 */ 132 public AbstractWork() { 133 // we user RANDOM to deal with these cases: 134 // - several calls in the time granularity of nanoTime() 135 // - several concurrent calls on different servers 136 this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt())); 137 callerThread = SequenceTracer.getThreadName(); 138 } 139 140 public AbstractWork(String id) { 141 this.id = id; 142 progress = PROGRESS_INDETERMINATE; 143 schedulingTime = System.currentTimeMillis(); 144 callerThread = SequenceTracer.getThreadName(); 145 } 146 147 @Override 148 public String getId() { 149 return id; 150 } 151 152 @Override 153 public WorkSchedulePath getSchedulePath() { 154 // schedulePath is transient so will become null after deserialization 155 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 156 } 157 158 @Override 159 public void setSchedulePath(WorkSchedulePath path) { 160 schedulePath = path; 161 } 162 163 public void setDocument(String repositoryName, String docId, boolean isTree) { 164 this.repositoryName = repositoryName; 165 this.docId = docId; 166 docIds = null; 167 this.isTree = isTree; 168 } 169 170 public void setDocument(String repositoryName, String docId) { 171 setDocument(repositoryName, docId, false); 172 } 173 174 public void setDocuments(String repositoryName, List<String> docIds) { 175 this.repositoryName = repositoryName; 176 docId = null; 177 this.docIds = new ArrayList<String>(docIds); 178 } 179 180 /** 181 * @since 8.1 182 */ 183 public void setOriginatingUsername(String originatingUsername) { 184 this.originatingUsername = originatingUsername; 185 } 186 187 @Override 188 public void setWorkInstanceSuspending() { 189 suspending = true; 190 } 191 192 @Override 193 public boolean isSuspending() { 194 return suspending; 195 } 196 197 @Override 198 public void suspended() { 199 suspended = true; 200 } 201 202 @Override 203 public boolean isWorkInstanceSuspended() { 204 return suspended; 205 } 206 207 @Override 208 public void setWorkInstanceState(State state) { 209 this.state = state; 210 if (log.isTraceEnabled()) { 211 log.trace(this + " state=" + state); 212 } 213 } 214 215 @Override 216 public State getWorkInstanceState() { 217 return state; 218 } 219 220 @Override 221 @Deprecated 222 public State getState() { 223 return state; 224 } 225 226 @Override 227 public void setProgress(Progress progress) { 228 this.progress = progress; 229 if (log.isTraceEnabled()) { 230 log.trace(String.valueOf(this)); 231 } 232 } 233 234 @Override 235 public Progress getProgress() { 236 return progress; 237 } 238 239 /** 240 * Sets a human-readable status for this work instance. 241 * 242 * @param status the status 243 */ 244 public void setStatus(String status) { 245 this.status = status; 246 } 247 248 @Override 249 public String getStatus() { 250 return status; 251 } 252 253 /** 254 * May be called by implementing classes to open a session on the repository. 255 * 256 * @return the session (also available in {@code session} field) 257 * @deprecated since 8.1. Use {@link #openSystemSession()}. 258 */ 259 @Deprecated 260 public CoreSession initSession() { 261 return initSession(repositoryName); 262 } 263 264 /** 265 * May be called by implementing classes to open a System session on the repository. 266 * 267 * @since 8.1 268 */ 269 public void openSystemSession() { 270 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 271 } 272 273 /** 274 * May be called by implementing classes to open a Use session on the repository. 275 * <p> 276 * It uses the set {@link #originatingUsername} to open the session. 277 * 278 * @since 8.1 279 */ 280 public void openUserSession() { 281 if (originatingUsername == null) { 282 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 283 } 284 285 try { 286 loginContext = Framework.loginAsUser(originatingUsername); 287 } catch (LoginException e) { 288 throw new NuxeoException(e); 289 } 290 291 session = CoreInstance.openCoreSession(repositoryName); 292 } 293 294 /** 295 * May be called by implementing classes to open a session on the given repository. 296 * 297 * @param repositoryName the repository name 298 * @return the session (also available in {@code session} field) 299 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 300 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 301 */ 302 @Deprecated 303 public CoreSession initSession(String repositoryName) { 304 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 305 return session; 306 } 307 308 /** 309 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 310 * 311 * @since 5.8 312 */ 313 public void closeSession() { 314 if (session != null) { 315 session.close(); 316 session = null; 317 } 318 } 319 320 @Override 321 public void run() { 322 if (isSuspending()) { 323 // don't run anything if we're being started while a suspend 324 // has been requested 325 suspended(); 326 return; 327 } 328 if (SequenceTracer.isEnabled()) { 329 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 330 } 331 Exception suppressed = null; 332 int retryCount = getRetryCount(); // may be 0 333 for (int i = 0; i <= retryCount; i++) { 334 if (i > 0) { 335 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 336 log.trace("Concurrent update", suppressed); 337 } 338 Exception e = runWorkWithTransactionAndCheckExceptions(); 339 if (e == null) { 340 // no exception, work is done 341 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 342 return; 343 } 344 if (suppressed == null) { 345 suppressed = e; 346 } else { 347 suppressed.addSuppressed(e); 348 } 349 } 350 // all retries have been done, throw the exception 351 if (suppressed != null) { 352 String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class=" 353 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 354 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 355 throw new RuntimeException(msg, suppressed); 356 } 357 } 358 359 private String getTitleOr(String defaultTitle) { 360 try { 361 return getTitle(); 362 } catch (Exception e) { 363 return defaultTitle; 364 } 365 } 366 367 /** 368 * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry. 369 * 370 * @since 5.9.4 371 */ 372 protected Exception runWorkWithTransactionAndCheckExceptions() { 373 List<Exception> suppressed = Collections.emptyList(); 374 try { 375 TransactionHelper.noteSuppressedExceptions(); 376 try { 377 runWorkWithTransaction(); 378 } finally { 379 suppressed = TransactionHelper.getSuppressedExceptions(); 380 } 381 } catch (ConcurrentUpdateException e) { 382 // happens typically during save() 383 return e; 384 } catch (TransactionRuntimeException e) { 385 // error at commit time 386 if (suppressed.isEmpty()) { 387 return e; 388 } 389 } 390 // reached if no catch, or if TransactionRuntimeException caught 391 if (suppressed.isEmpty()) { 392 return null; 393 } 394 // exceptions during commit caused a rollback in SessionImpl#end 395 Exception e = suppressed.get(0); 396 for (int i = 1; i < suppressed.size(); i++) { 397 e.addSuppressed(suppressed.get(i)); 398 } 399 return e; 400 } 401 402 /** 403 * Does work under a transaction. 404 * 405 * @since 5.9.4 406 * @throws ConcurrentUpdateException, TransactionRuntimeException 407 */ 408 protected void runWorkWithTransaction() throws ConcurrentUpdateException { 409 TransactionHelper.startTransaction(); 410 boolean ok = false; 411 Exception exc = null; 412 try { 413 WorkSchedulePath.handleEnter(this); 414 // --- do work 415 setStartTime(); 416 work(); // may throw ConcurrentUpdateException 417 ok = true; 418 // --- end work 419 } catch (Exception e) { 420 exc = e; 421 if (e instanceof ConcurrentUpdateException) { 422 throw (ConcurrentUpdateException) e; 423 } else if (e instanceof RuntimeException) { 424 throw (RuntimeException) e; 425 } else if (e instanceof InterruptedException) { 426 // restore interrupted status for the thread pool worker 427 Thread.currentThread().interrupt(); 428 } 429 throw new RuntimeException(e); 430 } finally { 431 WorkSchedulePath.handleReturn(); 432 try { 433 setCompletionTime(); 434 cleanUp(ok, exc); 435 } finally { 436 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 437 if (!ok || isSuspending()) { 438 TransactionHelper.setTransactionRollbackOnly(); 439 } 440 TransactionHelper.commitOrRollbackTransaction(); 441 } 442 } 443 } 444 } 445 446 @Override 447 public abstract void work(); 448 449 /** 450 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 451 * 452 * @return 0 for no retry, or more if some retries are possible 453 * @see #work 454 * @since 5.8 455 */ 456 public int getRetryCount() { 457 return 0; 458 } 459 460 /** 461 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 462 * error or was interrupted. 463 * 464 * @param ok {@code true} if the work completed normally 465 * @param e the exception, if available 466 */ 467 @Override 468 public void cleanUp(boolean ok, Exception e) { 469 if (!ok) { 470 if (e instanceof InterruptedException) { 471 log.debug("Suspended work: " + this); 472 } else { 473 if (!(e instanceof ConcurrentUpdateException)) { 474 if (!isSuspending() || !(e instanceof InterruptedException)) { 475 log.error("Exception during work: " + this, e); 476 if (WorkSchedulePath.captureStack) { 477 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 478 } 479 } 480 } 481 } 482 } 483 closeSession(); 484 485 try { 486 // loginContext may be null in tests 487 if (loginContext != null) { 488 loginContext.logout(); 489 } 490 } catch (LoginException le) { 491 throw new NuxeoException(le); 492 } 493 } 494 495 @Override 496 public String getOriginatingUsername() { 497 return originatingUsername; 498 } 499 500 @Override 501 public long getSchedulingTime() { 502 return schedulingTime; 503 } 504 505 @Override 506 public long getStartTime() { 507 return startTime; 508 } 509 510 @Override 511 public long getCompletionTime() { 512 return completionTime; 513 } 514 515 @Override 516 public void setStartTime() { 517 startTime = System.currentTimeMillis(); 518 } 519 520 protected void setCompletionTime() { 521 completionTime = System.currentTimeMillis(); 522 } 523 524 @Override 525 public String getCategory() { 526 return getClass().getSimpleName(); 527 } 528 529 @Override 530 public String toString() { 531 StringBuilder buf = new StringBuilder(); 532 buf.append(getClass().getSimpleName()); 533 buf.append('('); 534 if (docId != null) { 535 buf.append(docId); 536 buf.append(", "); 537 } else if (docIds != null && docIds.size() > 0) { 538 buf.append(docIds.get(0)); 539 buf.append("..., "); 540 } 541 buf.append(getSchedulePath().getParentPath()); 542 buf.append(", "); 543 buf.append(getProgress()); 544 buf.append(", "); 545 buf.append(getStatus()); 546 buf.append(')'); 547 return buf.toString(); 548 } 549 550 @Override 551 public DocumentLocation getDocument() { 552 if (docId != null) { 553 return newDocumentLocation(docId); 554 } 555 return null; 556 } 557 558 @Override 559 public List<DocumentLocation> getDocuments() { 560 if (docIds != null) { 561 List<DocumentLocation> res = new ArrayList<DocumentLocation>(docIds.size()); 562 for (String docId : docIds) { 563 res.add(newDocumentLocation(docId)); 564 } 565 return res; 566 } 567 if (docId != null) { 568 return Collections.singletonList(newDocumentLocation(docId)); 569 } 570 return Collections.emptyList(); 571 } 572 573 protected DocumentLocation newDocumentLocation(String docId) { 574 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 575 } 576 577 @Override 578 public boolean isDocumentTree() { 579 return isTree; 580 } 581 582 @Override 583 public String getWorkInstanceResult() { 584 return null; 585 } 586 587 /** 588 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 589 * running a long process. 590 */ 591 public void commitOrRollbackTransaction() { 592 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 593 TransactionHelper.commitOrRollbackTransaction(); 594 } 595 } 596 597 /** 598 * Starts a new transaction. 599 * <p> 600 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 601 * process. 602 * 603 * @return true if a new transaction was started 604 */ 605 public boolean startTransaction() { 606 return TransactionHelper.startTransaction(); 607 } 608 609 @Override 610 public boolean equals(Object other) { 611 if (!(other instanceof Work)) { 612 return false; 613 } 614 return ((Work) other).getId().equals(getId()); 615 } 616 617 @Override 618 public int hashCode() { 619 return getId().hashCode(); 620 } 621 622}