001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Random; 027 028import javax.security.auth.login.LoginContext; 029import javax.security.auth.login.LoginException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.common.logging.SequenceTracer; 034import org.nuxeo.common.utils.ExceptionUtils; 035import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentLocation; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.NuxeoException; 041import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 042import org.nuxeo.ecm.core.work.api.Work; 043import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047/** 048 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 049 * <p> 050 * It also deals with transaction management, and prevents running work instances that are suspending. 051 * <p> 052 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 053 * available. 054 * <p> 055 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 056 * state and call {@link #suspended()}. 057 * <p> 058 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 059 * 060 * @since 5.6 061 */ 062public abstract class AbstractWork implements Work { 063 064 private static final long serialVersionUID = 1L; 065 066 private static final Log log = LogFactory.getLog(AbstractWork.class); 067 068 protected static final Random RANDOM = new Random(); 069 070 protected String id; 071 072 /** Suspend requested by the work manager. */ 073 protected transient volatile boolean suspending; 074 075 /** Suspend acknowledged by the work instance. */ 076 protected transient volatile boolean suspended; 077 078 protected State state; 079 080 protected Progress progress; 081 082 /** Repository name for the Work instance, if relevant. */ 083 protected String repositoryName; 084 085 /** 086 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 087 * instance will act. 088 * <p> 089 * Either docId or docIds is set. Not both. 090 */ 091 protected String docId; 092 093 /** 094 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 095 * instance will act. 096 * <p> 097 * Either docId or docIds is set. Not both. 098 */ 099 protected List<String> docIds; 100 101 /** 102 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 103 */ 104 protected boolean isTree; 105 106 /** 107 * The originating username to use when opening the {@link CoreSession}. 108 * 109 * @since 8.1 110 */ 111 protected String originatingUsername; 112 113 protected String status; 114 115 protected long schedulingTime; 116 117 protected long startTime; 118 119 protected long completionTime; 120 121 protected transient CoreSession session; 122 123 protected transient LoginContext loginContext; 124 125 protected WorkSchedulePath schedulePath; 126 127 protected String callerThread; 128 129 /** 130 * Constructs a {@link Work} instance with a unique id. 131 */ 132 public AbstractWork() { 133 // we user RANDOM to deal with these cases: 134 // - several calls in the time granularity of nanoTime() 135 // - several concurrent calls on different servers 136 this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt())); 137 callerThread = SequenceTracer.getThreadName(); 138 } 139 140 public AbstractWork(String id) { 141 this.id = id; 142 progress = PROGRESS_INDETERMINATE; 143 schedulingTime = System.currentTimeMillis(); 144 callerThread = SequenceTracer.getThreadName(); 145 } 146 147 @Override 148 public String getId() { 149 return id; 150 } 151 152 @Override 153 public WorkSchedulePath getSchedulePath() { 154 // schedulePath is transient so will become null after deserialization 155 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 156 } 157 158 @Override 159 public void setSchedulePath(WorkSchedulePath path) { 160 schedulePath = path; 161 } 162 163 public void setDocument(String repositoryName, String docId, boolean isTree) { 164 this.repositoryName = repositoryName; 165 this.docId = docId; 166 docIds = null; 167 this.isTree = isTree; 168 } 169 170 public void setDocument(String repositoryName, String docId) { 171 setDocument(repositoryName, docId, false); 172 } 173 174 public void setDocuments(String repositoryName, List<String> docIds) { 175 this.repositoryName = repositoryName; 176 docId = null; 177 this.docIds = new ArrayList<>(docIds); 178 } 179 180 /** 181 * @since 8.1 182 */ 183 public void setOriginatingUsername(String originatingUsername) { 184 this.originatingUsername = originatingUsername; 185 } 186 187 @Override 188 public void setWorkInstanceSuspending() { 189 suspending = true; 190 } 191 192 @Override 193 public boolean isSuspending() { 194 return suspending; 195 } 196 197 @Override 198 public void suspended() { 199 suspended = true; 200 } 201 202 @Override 203 public boolean isWorkInstanceSuspended() { 204 return suspended; 205 } 206 207 @Override 208 public void setWorkInstanceState(State state) { 209 this.state = state; 210 if (log.isTraceEnabled()) { 211 log.trace(this + " state=" + state); 212 } 213 } 214 215 @Override 216 public State getWorkInstanceState() { 217 return state; 218 } 219 220 @Override 221 public void setProgress(Progress progress) { 222 this.progress = progress; 223 if (log.isTraceEnabled()) { 224 log.trace(String.valueOf(this)); 225 } 226 } 227 228 @Override 229 public Progress getProgress() { 230 return progress; 231 } 232 233 /** 234 * Sets a human-readable status for this work instance. 235 * 236 * @param status the status 237 */ 238 public void setStatus(String status) { 239 this.status = status; 240 } 241 242 @Override 243 public String getStatus() { 244 return status; 245 } 246 247 /** 248 * May be called by implementing classes to open a session on the repository. 249 * 250 * @return the session (also available in {@code session} field) 251 * @deprecated since 8.1. Use {@link #openSystemSession()}. 252 */ 253 @Deprecated 254 public CoreSession initSession() { 255 return initSession(repositoryName); 256 } 257 258 /** 259 * May be called by implementing classes to open a System session on the repository. 260 * 261 * @since 8.1 262 */ 263 public void openSystemSession() { 264 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 265 } 266 267 /** 268 * May be called by implementing classes to open a Use session on the repository. 269 * <p> 270 * It uses the set {@link #originatingUsername} to open the session. 271 * 272 * @since 8.1 273 */ 274 public void openUserSession() { 275 if (originatingUsername == null) { 276 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 277 } 278 279 try { 280 loginContext = Framework.loginAsUser(originatingUsername); 281 } catch (LoginException e) { 282 throw new NuxeoException(e); 283 } 284 285 session = CoreInstance.openCoreSession(repositoryName); 286 } 287 288 /** 289 * May be called by implementing classes to open a session on the given repository. 290 * 291 * @param repositoryName the repository name 292 * @return the session (also available in {@code session} field) 293 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 294 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 295 */ 296 @Deprecated 297 public CoreSession initSession(String repositoryName) { 298 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 299 return session; 300 } 301 302 /** 303 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 304 * 305 * @since 5.8 306 */ 307 public void closeSession() { 308 if (session != null) { 309 session.close(); 310 session = null; 311 } 312 } 313 314 @Override 315 public void run() { 316 if (isSuspending()) { 317 // don't run anything if we're being started while a suspend 318 // has been requested 319 suspended(); 320 return; 321 } 322 if (SequenceTracer.isEnabled()) { 323 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 324 } 325 RuntimeException suppressed = null; 326 int retryCount = getRetryCount(); // may be 0 327 for (int i = 0; i <= retryCount; i++) { 328 if (i > 0) { 329 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 330 log.trace("Concurrent update", suppressed); 331 } 332 if (ExceptionUtils.hasInterruptedCause(suppressed)) { 333 // if we're here suppressed != null so we destroy SequenceTracer 334 log.debug("No need to retry the work with id=" + getId() + ", work manager is shutting down"); 335 break; 336 } 337 try { 338 runWorkWithTransaction(); 339 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 340 return; 341 } catch (RuntimeException e) { 342 if (suppressed == null) { 343 suppressed = e; 344 } else { 345 suppressed.addSuppressed(e); 346 } 347 } 348 } 349 // all retries have been done, throw the exception 350 if (suppressed != null) { 351 String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class=" 352 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 353 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 354 throw new RuntimeException(msg, suppressed); 355 } 356 } 357 358 private String getTitleOr(String defaultTitle) { 359 try { 360 return getTitle(); 361 } catch (Exception e) { 362 return defaultTitle; 363 } 364 } 365 366 /** 367 * Does work under a transaction. 368 * 369 * @since 5.9.4 370 */ 371 protected void runWorkWithTransaction() { 372 TransactionHelper.startTransaction(); 373 boolean ok = false; 374 Exception exc = null; 375 try { 376 WorkSchedulePath.handleEnter(this); 377 // --- do work 378 setStartTime(); 379 work(); // may throw ConcurrentUpdateException 380 ok = true; 381 // --- end work 382 } catch (Exception e) { 383 exc = e; 384 if (e instanceof RuntimeException) { 385 throw (RuntimeException) e; 386 } else if (e instanceof InterruptedException) { 387 // restore interrupted status for the thread pool worker 388 Thread.currentThread().interrupt(); 389 } 390 throw new RuntimeException(e); 391 } finally { 392 WorkSchedulePath.handleReturn(); 393 try { 394 setCompletionTime(); 395 cleanUp(ok, exc); 396 } finally { 397 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 398 if (!ok || isSuspending()) { 399 log.trace(this + " is suspending, rollbacking"); 400 TransactionHelper.setTransactionRollbackOnly(); 401 } 402 TransactionHelper.commitOrRollbackTransaction(); 403 } 404 } 405 } 406 } 407 408 @Override 409 public abstract void work(); 410 411 /** 412 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 413 * 414 * @return 0 for no retry, or more if some retries are possible 415 * @see #work 416 * @since 5.8 417 */ 418 public int getRetryCount() { 419 return 0; 420 } 421 422 /** 423 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 424 * error or was interrupted. 425 * 426 * @param ok {@code true} if the work completed normally 427 * @param e the exception, if available 428 */ 429 @Override 430 public void cleanUp(boolean ok, Exception e) { 431 if (!ok) { 432 if (ExceptionUtils.hasInterruptedCause(e)) { 433 log.debug("Interrupted work: " + this); 434 } else { 435 if (!(e instanceof ConcurrentUpdateException)) { 436 if (!isSuspending()) { 437 log.error("Exception during work: " + this, e); 438 if (WorkSchedulePath.captureStack) { 439 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 440 } 441 } 442 } 443 } 444 } 445 closeSession(); 446 447 try { 448 // loginContext may be null in tests 449 if (loginContext != null) { 450 loginContext.logout(); 451 } 452 } catch (LoginException le) { 453 throw new NuxeoException(le); 454 } 455 } 456 457 @Override 458 public String getOriginatingUsername() { 459 return originatingUsername; 460 } 461 462 @Override 463 public long getSchedulingTime() { 464 return schedulingTime; 465 } 466 467 @Override 468 public long getStartTime() { 469 return startTime; 470 } 471 472 @Override 473 public long getCompletionTime() { 474 return completionTime; 475 } 476 477 @Override 478 public void setStartTime() { 479 startTime = System.currentTimeMillis(); 480 } 481 482 protected void setCompletionTime() { 483 completionTime = System.currentTimeMillis(); 484 } 485 486 @Override 487 public String getCategory() { 488 return getClass().getSimpleName(); 489 } 490 491 @Override 492 public String toString() { 493 StringBuilder buf = new StringBuilder(); 494 buf.append(getClass().getSimpleName()); 495 buf.append('('); 496 if (docId != null) { 497 buf.append(docId); 498 buf.append(", "); 499 } else if (docIds != null && docIds.size() > 0) { 500 buf.append(docIds.get(0)); 501 buf.append("..., "); 502 } 503 buf.append(getSchedulePath().getParentPath()); 504 buf.append(", "); 505 buf.append(getProgress()); 506 buf.append(", "); 507 buf.append(getStatus()); 508 buf.append(')'); 509 return buf.toString(); 510 } 511 512 @Override 513 public DocumentLocation getDocument() { 514 if (docId != null) { 515 return newDocumentLocation(docId); 516 } 517 return null; 518 } 519 520 @Override 521 public List<DocumentLocation> getDocuments() { 522 if (docIds != null) { 523 List<DocumentLocation> res = new ArrayList<>(docIds.size()); 524 for (String docId : docIds) { 525 res.add(newDocumentLocation(docId)); 526 } 527 return res; 528 } 529 if (docId != null) { 530 return Collections.singletonList(newDocumentLocation(docId)); 531 } 532 return Collections.emptyList(); 533 } 534 535 protected DocumentLocation newDocumentLocation(String docId) { 536 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 537 } 538 539 @Override 540 public boolean isDocumentTree() { 541 return isTree; 542 } 543 544 /** 545 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 546 * running a long process. 547 */ 548 public void commitOrRollbackTransaction() { 549 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 550 TransactionHelper.commitOrRollbackTransaction(); 551 } 552 } 553 554 /** 555 * Starts a new transaction. 556 * <p> 557 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 558 * process. 559 * 560 * @return true if a new transaction was started 561 */ 562 public boolean startTransaction() { 563 return TransactionHelper.startTransaction(); 564 } 565 566 @Override 567 public boolean equals(Object other) { 568 if (!(other instanceof Work)) { 569 return false; 570 } 571 return ((Work) other).getId().equals(getId()); 572 } 573 574 @Override 575 public int hashCode() { 576 return getId().hashCode(); 577 } 578 579 @Override 580 public String getPartitionKey() { 581 if (docId != null) { 582 return docId; 583 } else if (docIds != null && !docIds.isEmpty()) { 584 return docIds.get(0); 585 } 586 return getId(); 587 } 588}