001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Random; 027 028import javax.security.auth.login.LoginContext; 029import javax.security.auth.login.LoginException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.common.logging.SequenceTracer; 034import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentLocation; 038import org.nuxeo.ecm.core.api.IdRef; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 041import org.nuxeo.ecm.core.work.api.Work; 042import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045import org.nuxeo.runtime.transaction.TransactionRuntimeException; 046 047/** 048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 049 * <p> 050 * It also deals with transaction management, and prevents running work instances that are suspending. 051 * <p> 052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 053 * available. 054 * <p> 055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 056 * state and call {@link #suspended()}. 057 * <p> 058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 059 * 060 * @since 5.6 061 */ 062public abstract class AbstractWork implements Work { 063 064 private static final long serialVersionUID = 1L; 065 066 private static final Log log = LogFactory.getLog(AbstractWork.class); 067 068 protected static final Random RANDOM = new Random(); 069 070 protected String id; 071 072 /** Suspend requested by the work manager. */ 073 protected transient volatile boolean suspending; 074 075 /** Suspend acknowledged by the work instance. */ 076 protected transient volatile boolean suspended; 077 078 protected State state; 079 080 protected Progress progress; 081 082 /** Repository name for the Work instance, if relevant. */ 083 protected String repositoryName; 084 085 /** 086 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 087 * instance will act. 088 * <p> 089 * Either docId or docIds is set. Not both. 090 */ 091 protected String docId; 092 093 /** 094 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 095 * instance will act. 096 * <p> 097 * Either docId or docIds is set. Not both. 098 */ 099 protected List<String> docIds; 100 101 /** 102 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 103 */ 104 protected boolean isTree; 105 106 /** 107 * The originating username to use when opening the {@link CoreSession}. 108 * 109 * @since 8.1 110 */ 111 protected String originatingUsername; 112 113 protected String status; 114 115 protected long schedulingTime; 116 117 protected long startTime; 118 119 protected long completionTime; 120 121 protected transient CoreSession session; 122 123 protected transient LoginContext loginContext; 124 125 protected WorkSchedulePath schedulePath; 126 127 protected String callerThread; 128 129 /** 130 * Constructs a {@link Work} instance with a unique id. 131 */ 132 public AbstractWork() { 133 // we user RANDOM to deal with these cases: 134 // - several calls in the time granularity of nanoTime() 135 // - several concurrent calls on different servers 136 this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt())); 137 callerThread = SequenceTracer.getThreadName(); 138 } 139 140 public AbstractWork(String id) { 141 this.id = id; 142 progress = PROGRESS_INDETERMINATE; 143 schedulingTime = System.currentTimeMillis(); 144 callerThread = SequenceTracer.getThreadName(); 145 } 146 147 @Override 148 public String getId() { 149 return id; 150 } 151 152 @Override 153 public WorkSchedulePath getSchedulePath() { 154 // schedulePath is transient so will become null after deserialization 155 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 156 } 157 158 @Override 159 public void setSchedulePath(WorkSchedulePath path) { 160 schedulePath = path; 161 } 162 163 public void setDocument(String repositoryName, String docId, boolean isTree) { 164 this.repositoryName = repositoryName; 165 this.docId = docId; 166 docIds = null; 167 this.isTree = isTree; 168 } 169 170 public void setDocument(String repositoryName, String docId) { 171 setDocument(repositoryName, docId, false); 172 } 173 174 public void setDocuments(String repositoryName, List<String> docIds) { 175 this.repositoryName = repositoryName; 176 docId = null; 177 this.docIds = new ArrayList<String>(docIds); 178 } 179 180 /** 181 * @since 8.1 182 */ 183 public void setOriginatingUsername(String originatingUsername) { 184 this.originatingUsername = originatingUsername; 185 } 186 187 @Override 188 public void setWorkInstanceSuspending() { 189 suspending = true; 190 } 191 192 @Override 193 public boolean isSuspending() { 194 return suspending; 195 } 196 197 @Override 198 public void suspended() { 199 suspended = true; 200 } 201 202 @Override 203 public boolean isWorkInstanceSuspended() { 204 return suspended; 205 } 206 207 @Override 208 public void setWorkInstanceState(State state) { 209 this.state = state; 210 if (log.isTraceEnabled()) { 211 log.trace(this + " state=" + state); 212 } 213 } 214 215 @Override 216 public State getWorkInstanceState() { 217 return state; 218 } 219 220 @Override 221 @Deprecated 222 public State getState() { 223 return state; 224 } 225 226 @Override 227 public void setProgress(Progress progress) { 228 this.progress = progress; 229 if (log.isTraceEnabled()) { 230 log.trace(String.valueOf(this)); 231 } 232 } 233 234 @Override 235 public Progress getProgress() { 236 return progress; 237 } 238 239 /** 240 * Sets a human-readable status for this work instance. 241 * 242 * @param status the status 243 */ 244 public void setStatus(String status) { 245 this.status = status; 246 } 247 248 @Override 249 public String getStatus() { 250 return status; 251 } 252 253 /** 254 * May be called by implementing classes to open a session on the repository. 255 * 256 * @return the session (also available in {@code session} field) 257 * @deprecated since 8.1. Use {@link #openSystemSession()}. 258 */ 259 @Deprecated 260 public CoreSession initSession() { 261 return initSession(repositoryName); 262 } 263 264 /** 265 * May be called by implementing classes to open a System session on the repository. 266 * 267 * @since 8.1 268 */ 269 public void openSystemSession() { 270 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 271 } 272 273 /** 274 * May be called by implementing classes to open a Use session on the repository. 275 * <p> 276 * It uses the set {@link #originatingUsername} to open the session. 277 * 278 * @since 8.1 279 */ 280 public void openUserSession() { 281 if (originatingUsername == null) { 282 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 283 } 284 285 try { 286 loginContext = Framework.loginAsUser(originatingUsername); 287 } catch (LoginException e) { 288 throw new NuxeoException(e); 289 } 290 291 session = CoreInstance.openCoreSession(repositoryName); 292 } 293 294 /** 295 * May be called by implementing classes to open a session on the given repository. 296 * 297 * @param repositoryName the repository name 298 * @return the session (also available in {@code session} field) 299 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 300 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 301 */ 302 @Deprecated 303 public CoreSession initSession(String repositoryName) { 304 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 305 return session; 306 } 307 308 /** 309 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 310 * 311 * @since 5.8 312 */ 313 public void closeSession() { 314 if (session != null) { 315 session.close(); 316 session = null; 317 } 318 } 319 320 @Override 321 public void run() { 322 if (isSuspending()) { 323 // don't run anything if we're being started while a suspend 324 // has been requested 325 suspended(); 326 return; 327 } 328 if (SequenceTracer.isEnabled()) { 329 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 330 } 331 Exception suppressed = null; 332 int retryCount = getRetryCount(); // may be 0 333 for (int i = 0; i <= retryCount; i++) { 334 if (i > 0) { 335 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 336 log.trace("Concurrent update", suppressed); 337 } 338 Exception e = runWorkWithTransactionAndCheckExceptions(); 339 if (e == null) { 340 // no exception, work is done 341 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 342 return; 343 } 344 if (suppressed == null) { 345 suppressed = e; 346 } else { 347 suppressed.addSuppressed(e); 348 } 349 } 350 // all retries have been done, throw the exception 351 if (suppressed != null) { 352 String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class=" 353 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 354 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 355 throw new RuntimeException(msg, suppressed); 356 } 357 } 358 359 private String getTitleOr(String defaultTitle) { 360 try { 361 return getTitle(); 362 } catch (Exception e) { 363 return defaultTitle; 364 } 365 } 366 367 /** 368 * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry. 369 * 370 * @since 5.9.4 371 */ 372 protected Exception runWorkWithTransactionAndCheckExceptions() { 373 List<Exception> suppressed = Collections.emptyList(); 374 try { 375 TransactionHelper.noteSuppressedExceptions(); 376 try { 377 runWorkWithTransaction(); 378 } finally { 379 suppressed = TransactionHelper.getSuppressedExceptions(); 380 } 381 } catch (ConcurrentUpdateException e) { 382 // happens typically during save() 383 return e; 384 } catch (TransactionRuntimeException e) { 385 // error at commit time 386 if (suppressed.isEmpty()) { 387 return e; 388 } 389 } 390 // reached if no catch, or if TransactionRuntimeException caught 391 if (suppressed.isEmpty()) { 392 return null; 393 } 394 // exceptions during commit caused a rollback in SessionImpl#end 395 Exception e = suppressed.get(0); 396 for (int i = 1; i < suppressed.size(); i++) { 397 e.addSuppressed(suppressed.get(i)); 398 } 399 return e; 400 } 401 402 /** 403 * Does work under a transaction. 404 * 405 * @since 5.9.4 406 * @throws ConcurrentUpdateException, TransactionRuntimeException 407 */ 408 protected void runWorkWithTransaction() throws ConcurrentUpdateException { 409 TransactionHelper.startTransaction(); 410 boolean ok = false; 411 Exception exc = null; 412 try { 413 WorkSchedulePath.handleEnter(this); 414 // --- do work 415 setStartTime(); 416 work(); // may throw ConcurrentUpdateException 417 ok = true; 418 // --- end work 419 } catch (Exception e) { 420 exc = e; 421 if (e instanceof ConcurrentUpdateException) { 422 throw (ConcurrentUpdateException) e; 423 } else if (e instanceof RuntimeException) { 424 throw (RuntimeException) e; 425 } else if (e instanceof InterruptedException) { 426 // restore interrupted status for the thread pool worker 427 Thread.currentThread().interrupt(); 428 } 429 throw new RuntimeException(e); 430 } finally { 431 WorkSchedulePath.handleReturn(); 432 try { 433 setCompletionTime(); 434 cleanUp(ok, exc); 435 } finally { 436 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 437 if (!ok || isSuspending()) { 438 log.trace(this + " is suspending, rollbacking"); 439 TransactionHelper.setTransactionRollbackOnly(); 440 } 441 TransactionHelper.commitOrRollbackTransaction(); 442 } 443 } 444 } 445 } 446 447 @Override 448 public abstract void work(); 449 450 /** 451 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 452 * 453 * @return 0 for no retry, or more if some retries are possible 454 * @see #work 455 * @since 5.8 456 */ 457 public int getRetryCount() { 458 return 0; 459 } 460 461 /** 462 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 463 * error or was interrupted. 464 * 465 * @param ok {@code true} if the work completed normally 466 * @param e the exception, if available 467 */ 468 @Override 469 public void cleanUp(boolean ok, Exception e) { 470 if (!ok) { 471 if (e instanceof InterruptedException) { 472 log.debug("Suspended work: " + this); 473 } else { 474 if (!(e instanceof ConcurrentUpdateException)) { 475 if (!isSuspending() || !(e instanceof InterruptedException)) { 476 log.error("Exception during work: " + this, e); 477 if (WorkSchedulePath.captureStack) { 478 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 479 } 480 } 481 } 482 } 483 } 484 closeSession(); 485 486 try { 487 // loginContext may be null in tests 488 if (loginContext != null) { 489 loginContext.logout(); 490 } 491 } catch (LoginException le) { 492 throw new NuxeoException(le); 493 } 494 } 495 496 @Override 497 public String getOriginatingUsername() { 498 return originatingUsername; 499 } 500 501 @Override 502 public long getSchedulingTime() { 503 return schedulingTime; 504 } 505 506 @Override 507 public long getStartTime() { 508 return startTime; 509 } 510 511 @Override 512 public long getCompletionTime() { 513 return completionTime; 514 } 515 516 @Override 517 public void setStartTime() { 518 startTime = System.currentTimeMillis(); 519 } 520 521 protected void setCompletionTime() { 522 completionTime = System.currentTimeMillis(); 523 } 524 525 @Override 526 public String getCategory() { 527 return getClass().getSimpleName(); 528 } 529 530 @Override 531 public String toString() { 532 StringBuilder buf = new StringBuilder(); 533 buf.append(getClass().getSimpleName()); 534 buf.append('('); 535 if (docId != null) { 536 buf.append(docId); 537 buf.append(", "); 538 } else if (docIds != null && docIds.size() > 0) { 539 buf.append(docIds.get(0)); 540 buf.append("..., "); 541 } 542 buf.append(getSchedulePath().getParentPath()); 543 buf.append(", "); 544 buf.append(getProgress()); 545 buf.append(", "); 546 buf.append(getStatus()); 547 buf.append(')'); 548 return buf.toString(); 549 } 550 551 @Override 552 public DocumentLocation getDocument() { 553 if (docId != null) { 554 return newDocumentLocation(docId); 555 } 556 return null; 557 } 558 559 @Override 560 public List<DocumentLocation> getDocuments() { 561 if (docIds != null) { 562 List<DocumentLocation> res = new ArrayList<DocumentLocation>(docIds.size()); 563 for (String docId : docIds) { 564 res.add(newDocumentLocation(docId)); 565 } 566 return res; 567 } 568 if (docId != null) { 569 return Collections.singletonList(newDocumentLocation(docId)); 570 } 571 return Collections.emptyList(); 572 } 573 574 protected DocumentLocation newDocumentLocation(String docId) { 575 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 576 } 577 578 @Override 579 public boolean isDocumentTree() { 580 return isTree; 581 } 582 583 /** 584 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 585 * running a long process. 586 */ 587 public void commitOrRollbackTransaction() { 588 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 589 TransactionHelper.commitOrRollbackTransaction(); 590 } 591 } 592 593 /** 594 * Starts a new transaction. 595 * <p> 596 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 597 * process. 598 * 599 * @return true if a new transaction was started 600 */ 601 public boolean startTransaction() { 602 return TransactionHelper.startTransaction(); 603 } 604 605 @Override 606 public boolean equals(Object other) { 607 if (!(other instanceof Work)) { 608 return false; 609 } 610 return ((Work) other).getId().equals(getId()); 611 } 612 613 @Override 614 public int hashCode() { 615 return getId().hashCode(); 616 } 617 618}