001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Random; 027 028import javax.security.auth.login.LoginContext; 029import javax.security.auth.login.LoginException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.common.logging.SequenceTracer; 034import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentLocation; 038import org.nuxeo.ecm.core.api.IdRef; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 041import org.nuxeo.ecm.core.work.api.Work; 042import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045import org.nuxeo.runtime.transaction.TransactionRuntimeException; 046 047/** 048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 049 * <p> 050 * It also deals with transaction management, and prevents running work instances that are suspending. 051 * <p> 052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 053 * available. 054 * <p> 055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 056 * state and call {@link #suspended()}. 057 * <p> 058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 059 * 060 * @since 5.6 061 */ 062public abstract class AbstractWork implements Work { 063 064 private static final long serialVersionUID = 1L; 065 066 private static final Log log = LogFactory.getLog(AbstractWork.class); 067 068 protected static final Random RANDOM = new Random(); 069 070 protected String id; 071 072 /** Suspend requested by the work manager. */ 073 protected transient volatile boolean suspending; 074 075 /** Suspend acknowledged by the work instance. */ 076 protected transient volatile boolean suspended; 077 078 protected State state; 079 080 protected Progress progress; 081 082 /** Repository name for the Work instance, if relevant. */ 083 protected String repositoryName; 084 085 /** 086 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 087 * instance will act. 088 * <p> 089 * Either docId or docIds is set. Not both. 090 */ 091 protected String docId; 092 093 /** 094 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 095 * instance will act. 096 * <p> 097 * Either docId or docIds is set. Not both. 098 */ 099 protected List<String> docIds; 100 101 /** 102 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 103 */ 104 protected boolean isTree; 105 106 /** 107 * The originating username to use when opening the {@link CoreSession}. 108 * 109 * @since 8.1 110 */ 111 protected String originatingUsername; 112 113 protected String status; 114 115 protected long schedulingTime; 116 117 protected long startTime; 118 119 protected long completionTime; 120 121 protected transient CoreSession session; 122 123 protected transient LoginContext loginContext; 124 125 protected WorkSchedulePath schedulePath; 126 127 protected String callerThread; 128 129 /** 130 * Constructs a {@link Work} instance with a unique id. 131 */ 132 public AbstractWork() { 133 // we user RANDOM to deal with these cases: 134 // - several calls in the time granularity of nanoTime() 135 // - several concurrent calls on different servers 136 this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt())); 137 callerThread = SequenceTracer.getThreadName(); 138 } 139 140 public AbstractWork(String id) { 141 this.id = id; 142 progress = PROGRESS_INDETERMINATE; 143 schedulingTime = System.currentTimeMillis(); 144 callerThread = SequenceTracer.getThreadName(); 145 } 146 147 @Override 148 public String getId() { 149 return id; 150 } 151 152 @Override 153 public WorkSchedulePath getSchedulePath() { 154 // schedulePath is transient so will become null after deserialization 155 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 156 } 157 158 @Override 159 public void setSchedulePath(WorkSchedulePath path) { 160 schedulePath = path; 161 } 162 163 public void setDocument(String repositoryName, String docId, boolean isTree) { 164 this.repositoryName = repositoryName; 165 this.docId = docId; 166 docIds = null; 167 this.isTree = isTree; 168 } 169 170 public void setDocument(String repositoryName, String docId) { 171 setDocument(repositoryName, docId, false); 172 } 173 174 public void setDocuments(String repositoryName, List<String> docIds) { 175 this.repositoryName = repositoryName; 176 docId = null; 177 this.docIds = new ArrayList<>(docIds); 178 } 179 180 /** 181 * @since 8.1 182 */ 183 public void setOriginatingUsername(String originatingUsername) { 184 this.originatingUsername = originatingUsername; 185 } 186 187 @Override 188 public void setWorkInstanceSuspending() { 189 suspending = true; 190 } 191 192 @Override 193 public boolean isSuspending() { 194 return suspending; 195 } 196 197 @Override 198 public void suspended() { 199 suspended = true; 200 } 201 202 @Override 203 public boolean isWorkInstanceSuspended() { 204 return suspended; 205 } 206 207 @Override 208 public void setWorkInstanceState(State state) { 209 this.state = state; 210 if (log.isTraceEnabled()) { 211 log.trace(this + " state=" + state); 212 } 213 } 214 215 @Override 216 public State getWorkInstanceState() { 217 return state; 218 } 219 220 @Override 221 public void setProgress(Progress progress) { 222 this.progress = progress; 223 if (log.isTraceEnabled()) { 224 log.trace(String.valueOf(this)); 225 } 226 } 227 228 @Override 229 public Progress getProgress() { 230 return progress; 231 } 232 233 /** 234 * Sets a human-readable status for this work instance. 235 * 236 * @param status the status 237 */ 238 public void setStatus(String status) { 239 this.status = status; 240 } 241 242 @Override 243 public String getStatus() { 244 return status; 245 } 246 247 /** 248 * May be called by implementing classes to open a session on the repository. 249 * 250 * @return the session (also available in {@code session} field) 251 * @deprecated since 8.1. Use {@link #openSystemSession()}. 252 */ 253 @Deprecated 254 public CoreSession initSession() { 255 return initSession(repositoryName); 256 } 257 258 /** 259 * May be called by implementing classes to open a System session on the repository. 260 * 261 * @since 8.1 262 */ 263 public void openSystemSession() { 264 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 265 } 266 267 /** 268 * May be called by implementing classes to open a Use session on the repository. 269 * <p> 270 * It uses the set {@link #originatingUsername} to open the session. 271 * 272 * @since 8.1 273 */ 274 public void openUserSession() { 275 if (originatingUsername == null) { 276 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 277 } 278 279 try { 280 loginContext = Framework.loginAsUser(originatingUsername); 281 } catch (LoginException e) { 282 throw new NuxeoException(e); 283 } 284 285 session = CoreInstance.openCoreSession(repositoryName); 286 } 287 288 /** 289 * May be called by implementing classes to open a session on the given repository. 290 * 291 * @param repositoryName the repository name 292 * @return the session (also available in {@code session} field) 293 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 294 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 295 */ 296 @Deprecated 297 public CoreSession initSession(String repositoryName) { 298 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 299 return session; 300 } 301 302 /** 303 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 304 * 305 * @since 5.8 306 */ 307 public void closeSession() { 308 if (session != null) { 309 session.close(); 310 session = null; 311 } 312 } 313 314 @Override 315 public void run() { 316 if (isSuspending()) { 317 // don't run anything if we're being started while a suspend 318 // has been requested 319 suspended(); 320 return; 321 } 322 if (SequenceTracer.isEnabled()) { 323 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 324 } 325 Exception suppressed = null; 326 int retryCount = getRetryCount(); // may be 0 327 for (int i = 0; i <= retryCount; i++) { 328 if (i > 0) { 329 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 330 log.trace("Concurrent update", suppressed); 331 } 332 Exception e = runWorkWithTransactionAndCheckExceptions(); 333 if (e == null) { 334 // no exception, work is done 335 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 336 return; 337 } 338 if (suppressed == null) { 339 suppressed = e; 340 } else { 341 suppressed.addSuppressed(e); 342 } 343 } 344 // all retries have been done, throw the exception 345 if (suppressed != null) { 346 String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class=" 347 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 348 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 349 throw new RuntimeException(msg, suppressed); 350 } 351 } 352 353 private String getTitleOr(String defaultTitle) { 354 try { 355 return getTitle(); 356 } catch (Exception e) { 357 return defaultTitle; 358 } 359 } 360 361 /** 362 * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry. 363 * 364 * @since 5.9.4 365 */ 366 protected Exception runWorkWithTransactionAndCheckExceptions() { 367 List<Exception> suppressed = Collections.emptyList(); 368 try { 369 TransactionHelper.noteSuppressedExceptions(); 370 try { 371 runWorkWithTransaction(); 372 } finally { 373 suppressed = TransactionHelper.getSuppressedExceptions(); 374 } 375 } catch (ConcurrentUpdateException e) { 376 // happens typically during save() 377 return e; 378 } catch (TransactionRuntimeException e) { 379 // error at commit time 380 if (suppressed.isEmpty()) { 381 return e; 382 } 383 } 384 // reached if no catch, or if TransactionRuntimeException caught 385 if (suppressed.isEmpty()) { 386 return null; 387 } 388 // exceptions during commit caused a rollback in SessionImpl#end 389 Exception e = suppressed.get(0); 390 for (int i = 1; i < suppressed.size(); i++) { 391 e.addSuppressed(suppressed.get(i)); 392 } 393 return e; 394 } 395 396 /** 397 * Does work under a transaction. 398 * 399 * @since 5.9.4 400 * @throws ConcurrentUpdateException, TransactionRuntimeException 401 */ 402 protected void runWorkWithTransaction() throws ConcurrentUpdateException { 403 TransactionHelper.startTransaction(); 404 boolean ok = false; 405 Exception exc = null; 406 try { 407 WorkSchedulePath.handleEnter(this); 408 // --- do work 409 setStartTime(); 410 work(); // may throw ConcurrentUpdateException 411 ok = true; 412 // --- end work 413 } catch (Exception e) { 414 exc = e; 415 if (e instanceof ConcurrentUpdateException) { 416 throw (ConcurrentUpdateException) e; 417 } else if (e instanceof RuntimeException) { 418 throw (RuntimeException) e; 419 } else if (e instanceof InterruptedException) { 420 // restore interrupted status for the thread pool worker 421 Thread.currentThread().interrupt(); 422 } 423 throw new RuntimeException(e); 424 } finally { 425 WorkSchedulePath.handleReturn(); 426 try { 427 setCompletionTime(); 428 cleanUp(ok, exc); 429 } finally { 430 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 431 if (!ok || isSuspending()) { 432 log.trace(this + " is suspending, rollbacking"); 433 TransactionHelper.setTransactionRollbackOnly(); 434 } 435 TransactionHelper.commitOrRollbackTransaction(); 436 } 437 } 438 } 439 } 440 441 @Override 442 public abstract void work(); 443 444 /** 445 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 446 * 447 * @return 0 for no retry, or more if some retries are possible 448 * @see #work 449 * @since 5.8 450 */ 451 public int getRetryCount() { 452 return 0; 453 } 454 455 /** 456 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 457 * error or was interrupted. 458 * 459 * @param ok {@code true} if the work completed normally 460 * @param e the exception, if available 461 */ 462 @Override 463 public void cleanUp(boolean ok, Exception e) { 464 if (!ok) { 465 if (e instanceof InterruptedException) { 466 log.debug("Suspended work: " + this); 467 } else { 468 if (!(e instanceof ConcurrentUpdateException)) { 469 if (!isSuspending()) { 470 log.error("Exception during work: " + this, e); 471 if (WorkSchedulePath.captureStack) { 472 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 473 } 474 } 475 } 476 } 477 } 478 closeSession(); 479 480 try { 481 // loginContext may be null in tests 482 if (loginContext != null) { 483 loginContext.logout(); 484 } 485 } catch (LoginException le) { 486 throw new NuxeoException(le); 487 } 488 } 489 490 @Override 491 public String getOriginatingUsername() { 492 return originatingUsername; 493 } 494 495 @Override 496 public long getSchedulingTime() { 497 return schedulingTime; 498 } 499 500 @Override 501 public long getStartTime() { 502 return startTime; 503 } 504 505 @Override 506 public long getCompletionTime() { 507 return completionTime; 508 } 509 510 @Override 511 public void setStartTime() { 512 startTime = System.currentTimeMillis(); 513 } 514 515 protected void setCompletionTime() { 516 completionTime = System.currentTimeMillis(); 517 } 518 519 @Override 520 public String getCategory() { 521 return getClass().getSimpleName(); 522 } 523 524 @Override 525 public String toString() { 526 StringBuilder buf = new StringBuilder(); 527 buf.append(getClass().getSimpleName()); 528 buf.append('('); 529 if (docId != null) { 530 buf.append(docId); 531 buf.append(", "); 532 } else if (docIds != null && docIds.size() > 0) { 533 buf.append(docIds.get(0)); 534 buf.append("..., "); 535 } 536 buf.append(getSchedulePath().getParentPath()); 537 buf.append(", "); 538 buf.append(getProgress()); 539 buf.append(", "); 540 buf.append(getStatus()); 541 buf.append(')'); 542 return buf.toString(); 543 } 544 545 @Override 546 public DocumentLocation getDocument() { 547 if (docId != null) { 548 return newDocumentLocation(docId); 549 } 550 return null; 551 } 552 553 @Override 554 public List<DocumentLocation> getDocuments() { 555 if (docIds != null) { 556 List<DocumentLocation> res = new ArrayList<>(docIds.size()); 557 for (String docId : docIds) { 558 res.add(newDocumentLocation(docId)); 559 } 560 return res; 561 } 562 if (docId != null) { 563 return Collections.singletonList(newDocumentLocation(docId)); 564 } 565 return Collections.emptyList(); 566 } 567 568 protected DocumentLocation newDocumentLocation(String docId) { 569 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 570 } 571 572 @Override 573 public boolean isDocumentTree() { 574 return isTree; 575 } 576 577 /** 578 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 579 * running a long process. 580 */ 581 public void commitOrRollbackTransaction() { 582 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 583 TransactionHelper.commitOrRollbackTransaction(); 584 } 585 } 586 587 /** 588 * Starts a new transaction. 589 * <p> 590 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 591 * process. 592 * 593 * @return true if a new transaction was started 594 */ 595 public boolean startTransaction() { 596 return TransactionHelper.startTransaction(); 597 } 598 599 @Override 600 public boolean equals(Object other) { 601 if (!(other instanceof Work)) { 602 return false; 603 } 604 return ((Work) other).getId().equals(getId()); 605 } 606 607 @Override 608 public int hashCode() { 609 return getId().hashCode(); 610 } 611 612}