001/* 002 * (C) Copyright 2012-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.work; 020 021import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Random; 027 028import javax.security.auth.login.LoginContext; 029import javax.security.auth.login.LoginException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.common.logging.SequenceTracer; 034import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 035import org.nuxeo.ecm.core.api.CoreInstance; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentLocation; 038import org.nuxeo.ecm.core.api.IdRef; 039import org.nuxeo.ecm.core.api.NuxeoException; 040import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 041import org.nuxeo.ecm.core.work.api.Work; 042import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.transaction.TransactionHelper; 045 046/** 047 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 048 * <p> 049 * It also deals with transaction management, and prevents running work instances that are suspending. 050 * <p> 051 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 052 * available. 053 * <p> 054 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 055 * state and call {@link #suspended()}. 056 * <p> 057 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 058 * 059 * @since 5.6 060 */ 061public abstract class AbstractWork implements Work { 062 063 private static final long serialVersionUID = 1L; 064 065 private static final Log log = LogFactory.getLog(AbstractWork.class); 066 067 protected static final Random RANDOM = new Random(); 068 069 protected String id; 070 071 /** Suspend requested by the work manager. */ 072 protected transient volatile boolean suspending; 073 074 /** Suspend acknowledged by the work instance. */ 075 protected transient volatile boolean suspended; 076 077 protected State state; 078 079 protected Progress progress; 080 081 /** Repository name for the Work instance, if relevant. */ 082 protected String repositoryName; 083 084 /** 085 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 086 * instance will act. 087 * <p> 088 * Either docId or docIds is set. Not both. 089 */ 090 protected String docId; 091 092 /** 093 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 094 * instance will act. 095 * <p> 096 * Either docId or docIds is set. Not both. 097 */ 098 protected List<String> docIds; 099 100 /** 101 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 102 */ 103 protected boolean isTree; 104 105 /** 106 * The originating username to use when opening the {@link CoreSession}. 107 * 108 * @since 8.1 109 */ 110 protected String originatingUsername; 111 112 protected String status; 113 114 protected long schedulingTime; 115 116 protected long startTime; 117 118 protected long completionTime; 119 120 protected transient CoreSession session; 121 122 protected transient LoginContext loginContext; 123 124 protected WorkSchedulePath schedulePath; 125 126 protected String callerThread; 127 128 /** 129 * Constructs a {@link Work} instance with a unique id. 130 */ 131 public AbstractWork() { 132 // we user RANDOM to deal with these cases: 133 // - several calls in the time granularity of nanoTime() 134 // - several concurrent calls on different servers 135 this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt())); 136 callerThread = SequenceTracer.getThreadName(); 137 } 138 139 public AbstractWork(String id) { 140 this.id = id; 141 progress = PROGRESS_INDETERMINATE; 142 schedulingTime = System.currentTimeMillis(); 143 callerThread = SequenceTracer.getThreadName(); 144 } 145 146 @Override 147 public String getId() { 148 return id; 149 } 150 151 @Override 152 public WorkSchedulePath getSchedulePath() { 153 // schedulePath is transient so will become null after deserialization 154 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 155 } 156 157 @Override 158 public void setSchedulePath(WorkSchedulePath path) { 159 schedulePath = path; 160 } 161 162 public void setDocument(String repositoryName, String docId, boolean isTree) { 163 this.repositoryName = repositoryName; 164 this.docId = docId; 165 docIds = null; 166 this.isTree = isTree; 167 } 168 169 public void setDocument(String repositoryName, String docId) { 170 setDocument(repositoryName, docId, false); 171 } 172 173 public void setDocuments(String repositoryName, List<String> docIds) { 174 this.repositoryName = repositoryName; 175 docId = null; 176 this.docIds = new ArrayList<>(docIds); 177 } 178 179 /** 180 * @since 8.1 181 */ 182 public void setOriginatingUsername(String originatingUsername) { 183 this.originatingUsername = originatingUsername; 184 } 185 186 @Override 187 public void setWorkInstanceSuspending() { 188 suspending = true; 189 } 190 191 @Override 192 public boolean isSuspending() { 193 return suspending; 194 } 195 196 @Override 197 public void suspended() { 198 suspended = true; 199 } 200 201 @Override 202 public boolean isWorkInstanceSuspended() { 203 return suspended; 204 } 205 206 @Override 207 public void setWorkInstanceState(State state) { 208 this.state = state; 209 if (log.isTraceEnabled()) { 210 log.trace(this + " state=" + state); 211 } 212 } 213 214 @Override 215 public State getWorkInstanceState() { 216 return state; 217 } 218 219 @Override 220 public void setProgress(Progress progress) { 221 this.progress = progress; 222 if (log.isTraceEnabled()) { 223 log.trace(String.valueOf(this)); 224 } 225 } 226 227 @Override 228 public Progress getProgress() { 229 return progress; 230 } 231 232 /** 233 * Sets a human-readable status for this work instance. 234 * 235 * @param status the status 236 */ 237 public void setStatus(String status) { 238 this.status = status; 239 } 240 241 @Override 242 public String getStatus() { 243 return status; 244 } 245 246 /** 247 * May be called by implementing classes to open a session on the repository. 248 * 249 * @return the session (also available in {@code session} field) 250 * @deprecated since 8.1. Use {@link #openSystemSession()}. 251 */ 252 @Deprecated 253 public CoreSession initSession() { 254 return initSession(repositoryName); 255 } 256 257 /** 258 * May be called by implementing classes to open a System session on the repository. 259 * 260 * @since 8.1 261 */ 262 public void openSystemSession() { 263 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 264 } 265 266 /** 267 * May be called by implementing classes to open a Use session on the repository. 268 * <p> 269 * It uses the set {@link #originatingUsername} to open the session. 270 * 271 * @since 8.1 272 */ 273 public void openUserSession() { 274 if (originatingUsername == null) { 275 throw new IllegalStateException("Cannot open an user session without an originatingUsername"); 276 } 277 278 try { 279 loginContext = Framework.loginAsUser(originatingUsername); 280 } catch (LoginException e) { 281 throw new NuxeoException(e); 282 } 283 284 session = CoreInstance.openCoreSession(repositoryName); 285 } 286 287 /** 288 * May be called by implementing classes to open a session on the given repository. 289 * 290 * @param repositoryName the repository name 291 * @return the session (also available in {@code session} field) 292 * @deprecated since 8.1. Use {@link #openSystemSession()} to open a session on the configured repository name, 293 * otherwise use {@link CoreInstance#openCoreSessionSystem(String)}. 294 */ 295 @Deprecated 296 public CoreSession initSession(String repositoryName) { 297 session = CoreInstance.openCoreSessionSystem(repositoryName, originatingUsername); 298 return session; 299 } 300 301 /** 302 * Closes the session that was opened by {@link #openSystemSession()} or {@link #openUserSession()}. 303 * 304 * @since 5.8 305 */ 306 public void closeSession() { 307 if (session != null) { 308 session.close(); 309 session = null; 310 } 311 } 312 313 @Override 314 public void run() { 315 if (isSuspending()) { 316 // don't run anything if we're being started while a suspend 317 // has been requested 318 suspended(); 319 return; 320 } 321 if (SequenceTracer.isEnabled()) { 322 SequenceTracer.startFrom(callerThread, "Work " + getTitleOr("unknown"), " #7acde9"); 323 } 324 RuntimeException suppressed = null; 325 int retryCount = getRetryCount(); // may be 0 326 for (int i = 0; i <= retryCount; i++) { 327 if (i > 0) { 328 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 329 log.trace("Concurrent update", suppressed); 330 } 331 try { 332 runWorkWithTransaction(); 333 SequenceTracer.stop("Work done " + (completionTime - startTime) + " ms"); 334 return; 335 } catch (RuntimeException e) { 336 if (suppressed == null) { 337 suppressed = e; 338 } else { 339 suppressed.addSuppressed(e); 340 } 341 } 342 } 343 // all retries have been done, throw the exception 344 if (suppressed != null) { 345 String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class=" 346 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 347 SequenceTracer.destroy("Work failure " + (completionTime - startTime) + " ms"); 348 throw new RuntimeException(msg, suppressed); 349 } 350 } 351 352 private String getTitleOr(String defaultTitle) { 353 try { 354 return getTitle(); 355 } catch (Exception e) { 356 return defaultTitle; 357 } 358 } 359 360 /** 361 * Does work under a transaction. 362 * 363 * @since 5.9.4 364 */ 365 protected void runWorkWithTransaction() { 366 TransactionHelper.startTransaction(); 367 boolean ok = false; 368 Exception exc = null; 369 try { 370 WorkSchedulePath.handleEnter(this); 371 // --- do work 372 setStartTime(); 373 work(); // may throw ConcurrentUpdateException 374 ok = true; 375 // --- end work 376 } catch (Exception e) { 377 exc = e; 378 if (e instanceof RuntimeException) { 379 throw (RuntimeException) e; 380 } else if (e instanceof InterruptedException) { 381 // restore interrupted status for the thread pool worker 382 Thread.currentThread().interrupt(); 383 } 384 throw new RuntimeException(e); 385 } finally { 386 WorkSchedulePath.handleReturn(); 387 try { 388 setCompletionTime(); 389 cleanUp(ok, exc); 390 } finally { 391 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 392 if (!ok || isSuspending()) { 393 log.trace(this + " is suspending, rollbacking"); 394 TransactionHelper.setTransactionRollbackOnly(); 395 } 396 TransactionHelper.commitOrRollbackTransaction(); 397 } 398 } 399 } 400 } 401 402 @Override 403 public abstract void work(); 404 405 /** 406 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 407 * 408 * @return 0 for no retry, or more if some retries are possible 409 * @see #work 410 * @since 5.8 411 */ 412 public int getRetryCount() { 413 return 0; 414 } 415 416 /** 417 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 418 * error or was interrupted. 419 * 420 * @param ok {@code true} if the work completed normally 421 * @param e the exception, if available 422 */ 423 @Override 424 public void cleanUp(boolean ok, Exception e) { 425 if (!ok) { 426 if (e instanceof InterruptedException) { 427 log.debug("Suspended work: " + this); 428 } else { 429 if (!(e instanceof ConcurrentUpdateException)) { 430 if (!isSuspending()) { 431 log.error("Exception during work: " + this, e); 432 if (WorkSchedulePath.captureStack) { 433 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 434 } 435 } 436 } 437 } 438 } 439 closeSession(); 440 441 try { 442 // loginContext may be null in tests 443 if (loginContext != null) { 444 loginContext.logout(); 445 } 446 } catch (LoginException le) { 447 throw new NuxeoException(le); 448 } 449 } 450 451 @Override 452 public String getOriginatingUsername() { 453 return originatingUsername; 454 } 455 456 @Override 457 public long getSchedulingTime() { 458 return schedulingTime; 459 } 460 461 @Override 462 public long getStartTime() { 463 return startTime; 464 } 465 466 @Override 467 public long getCompletionTime() { 468 return completionTime; 469 } 470 471 @Override 472 public void setStartTime() { 473 startTime = System.currentTimeMillis(); 474 } 475 476 protected void setCompletionTime() { 477 completionTime = System.currentTimeMillis(); 478 } 479 480 @Override 481 public String getCategory() { 482 return getClass().getSimpleName(); 483 } 484 485 @Override 486 public String toString() { 487 StringBuilder buf = new StringBuilder(); 488 buf.append(getClass().getSimpleName()); 489 buf.append('('); 490 if (docId != null) { 491 buf.append(docId); 492 buf.append(", "); 493 } else if (docIds != null && docIds.size() > 0) { 494 buf.append(docIds.get(0)); 495 buf.append("..., "); 496 } 497 buf.append(getSchedulePath().getParentPath()); 498 buf.append(", "); 499 buf.append(getProgress()); 500 buf.append(", "); 501 buf.append(getStatus()); 502 buf.append(')'); 503 return buf.toString(); 504 } 505 506 @Override 507 public DocumentLocation getDocument() { 508 if (docId != null) { 509 return newDocumentLocation(docId); 510 } 511 return null; 512 } 513 514 @Override 515 public List<DocumentLocation> getDocuments() { 516 if (docIds != null) { 517 List<DocumentLocation> res = new ArrayList<>(docIds.size()); 518 for (String docId : docIds) { 519 res.add(newDocumentLocation(docId)); 520 } 521 return res; 522 } 523 if (docId != null) { 524 return Collections.singletonList(newDocumentLocation(docId)); 525 } 526 return Collections.emptyList(); 527 } 528 529 protected DocumentLocation newDocumentLocation(String docId) { 530 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 531 } 532 533 @Override 534 public boolean isDocumentTree() { 535 return isTree; 536 } 537 538 /** 539 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 540 * running a long process. 541 */ 542 public void commitOrRollbackTransaction() { 543 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 544 TransactionHelper.commitOrRollbackTransaction(); 545 } 546 } 547 548 /** 549 * Starts a new transaction. 550 * <p> 551 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 552 * process. 553 * 554 * @return true if a new transaction was started 555 */ 556 public boolean startTransaction() { 557 return TransactionHelper.startTransaction(); 558 } 559 560 @Override 561 public boolean equals(Object other) { 562 if (!(other instanceof Work)) { 563 return false; 564 } 565 return ((Work) other).getId().equals(getId()); 566 } 567 568 @Override 569 public int hashCode() { 570 return getId().hashCode(); 571 } 572 573}