001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.core.work; 018 019import static org.nuxeo.ecm.core.work.api.Work.Progress.PROGRESS_INDETERMINATE; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Random; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 029import org.nuxeo.ecm.core.api.CoreInstance; 030import org.nuxeo.ecm.core.api.CoreSession; 031import org.nuxeo.ecm.core.api.DocumentLocation; 032import org.nuxeo.ecm.core.api.IdRef; 033import org.nuxeo.ecm.core.api.impl.DocumentLocationImpl; 034import org.nuxeo.ecm.core.work.api.Work; 035import org.nuxeo.ecm.core.work.api.WorkSchedulePath; 036import org.nuxeo.runtime.transaction.TransactionHelper; 037import org.nuxeo.runtime.transaction.TransactionRuntimeException; 038 039/** 040 * A base implementation for a {@link Work} instance, dealing with most of the details around state change. 041 * <p> 042 * It also deals with transaction management, and prevents running work instances that are suspending. 043 * <p> 044 * Actual implementations must at a minimum implement the {@link #work()} method. A method {@link #cleanUp} is 045 * available. 046 * <p> 047 * To deal with suspension, {@link #work()} should periodically check for {@link #isSuspending()} and if true save its 048 * state and call {@link #suspended()}. 049 * <p> 050 * Specific information about the work can be returned by {@link #getDocument()} or {@link #getDocuments()}. 051 * 052 * @since 5.6 053 */ 054public abstract class AbstractWork implements Work { 055 056 private static final long serialVersionUID = 1L; 057 058 private static final Log log = LogFactory.getLog(AbstractWork.class); 059 060 protected static final Random RANDOM = new Random(); 061 062 protected String id; 063 064 /** Suspend requested by the work manager. */ 065 protected transient volatile boolean suspending; 066 067 /** Suspend acknowledged by the work instance. */ 068 protected transient volatile boolean suspended; 069 070 protected transient State state; 071 072 protected transient Progress progress; 073 074 /** Repository name for the Work instance, if relevant. */ 075 protected String repositoryName; 076 077 /** 078 * Doc id for the Work instance, if relevant. This describes for the WorkManager a document on which this Work 079 * instance will act. 080 * <p> 081 * Either docId or docIds is set. Not both. 082 */ 083 protected String docId; 084 085 /** 086 * Doc ids for the Work instance, if relevant. This describes for the WorkManager the documents on which this Work 087 * instance will act. 088 * <p> 089 * Either docId or docIds is set. Not both. 090 */ 091 protected List<String> docIds; 092 093 /** 094 * If {@code true}, the docId is only the root of a set of documents on which this Work instance will act. 095 */ 096 protected boolean isTree; 097 098 protected String status; 099 100 protected long schedulingTime; 101 102 protected long startTime; 103 104 protected long completionTime; 105 106 protected transient CoreSession session; 107 108 protected WorkSchedulePath schedulePath; 109 110 /** 111 * Constructs a {@link Work} instance with a unique id. 112 */ 113 public AbstractWork() { 114 // we user RANDOM to deal with these cases: 115 // - several calls in the time granularity of nanoTime() 116 // - several concurrent calls on different servers 117 this(System.nanoTime() + "." + Math.abs(RANDOM.nextInt())); 118 } 119 120 public AbstractWork(String id) { 121 this.id = id; 122 progress = PROGRESS_INDETERMINATE; 123 schedulingTime = System.currentTimeMillis(); 124 } 125 126 @Override 127 public String getId() { 128 return id; 129 } 130 131 @Override 132 public WorkSchedulePath getSchedulePath() { 133 // schedulePath is transient so will become null after deserialization 134 return schedulePath == null ? WorkSchedulePath.EMPTY : schedulePath; 135 } 136 137 @Override 138 public void setSchedulePath(WorkSchedulePath path) { 139 schedulePath = path; 140 } 141 142 public void setDocument(String repositoryName, String docId, boolean isTree) { 143 this.repositoryName = repositoryName; 144 this.docId = docId; 145 docIds = null; 146 this.isTree = isTree; 147 } 148 149 public void setDocument(String repositoryName, String docId) { 150 setDocument(repositoryName, docId, false); 151 } 152 153 public void setDocuments(String repositoryName, List<String> docIds) { 154 this.repositoryName = repositoryName; 155 docId = null; 156 this.docIds = new ArrayList<String>(docIds); 157 } 158 159 @Override 160 public void setWorkInstanceSuspending() { 161 suspending = true; 162 } 163 164 @Override 165 public boolean isSuspending() { 166 return suspending; 167 } 168 169 @Override 170 public void suspended() { 171 suspended = true; 172 } 173 174 @Override 175 public boolean isWorkInstanceSuspended() { 176 return suspended; 177 } 178 179 @Override 180 public void setWorkInstanceState(State state) { 181 this.state = state; 182 if (log.isTraceEnabled()) { 183 log.trace(this + " state=" + state); 184 } 185 } 186 187 @Override 188 public State getWorkInstanceState() { 189 return state; 190 } 191 192 @Override 193 @Deprecated 194 public State getState() { 195 return state; 196 } 197 198 @Override 199 public void setProgress(Progress progress) { 200 this.progress = progress; 201 if (log.isTraceEnabled()) { 202 log.trace(String.valueOf(this)); 203 } 204 } 205 206 @Override 207 public Progress getProgress() { 208 return progress; 209 } 210 211 /** 212 * Sets a human-readable status for this work instance. 213 * 214 * @param status the status 215 */ 216 public void setStatus(String status) { 217 this.status = status; 218 } 219 220 @Override 221 public String getStatus() { 222 return status; 223 } 224 225 /** 226 * May be called by implementing classes to open a session on the repository. 227 * 228 * @return the session (also available in {@code session} field) 229 */ 230 public CoreSession initSession() { 231 return initSession(repositoryName); 232 } 233 234 /** 235 * May be called by implementing classes to open a session on the given repository. 236 * 237 * @param repositoryName the repository name 238 * @return the session (also available in {@code session} field) 239 */ 240 public CoreSession initSession(String repositoryName) { 241 session = CoreInstance.openCoreSessionSystem(repositoryName); 242 return session; 243 } 244 245 /** 246 * Closes the session that was opened by {@link #initSession}. 247 * 248 * @since 5.8 249 */ 250 public void closeSession() { 251 if (session != null) { 252 session.close(); 253 session = null; 254 } 255 } 256 257 @Override 258 public void run() { 259 if (isSuspending()) { 260 // don't run anything if we're being started while a suspend 261 // has been requested 262 suspended(); 263 return; 264 } 265 Exception suppressed = null; 266 int retryCount = getRetryCount(); // may be 0 267 for (int i = 0; i <= retryCount; i++) { 268 if (i > 0) { 269 log.debug("Retrying work due to concurrent update (" + i + "): " + this); 270 log.trace("Concurrent update", suppressed); 271 } 272 Exception e = runWorkWithTransactionAndCheckExceptions(); 273 if (e == null) { 274 // no exception, work is done 275 return; 276 } 277 if (suppressed == null) { 278 suppressed = e; 279 } else { 280 suppressed.addSuppressed(e); 281 } 282 } 283 // all retries have been done, throw the exception 284 if (suppressed != null) { 285 String msg = "Work failed after " + retryCount + " " + (retryCount == 1 ? "retry" : "retries") + ", class=" 286 + getClass() + " id=" + getId() + " category=" + getCategory() + " title=" + getTitle(); 287 throw new RuntimeException(msg, suppressed); 288 } 289 } 290 291 /** 292 * Does work under a transaction, and collects exception and suppressed exceptions that may lead to a retry. 293 * 294 * @since 5.9.4 295 */ 296 protected Exception runWorkWithTransactionAndCheckExceptions() { 297 List<Exception> suppressed = Collections.emptyList(); 298 try { 299 TransactionHelper.noteSuppressedExceptions(); 300 try { 301 runWorkWithTransaction(); 302 } finally { 303 suppressed = TransactionHelper.getSuppressedExceptions(); 304 } 305 } catch (ConcurrentUpdateException e) { 306 // happens typically during save() 307 return e; 308 } catch (TransactionRuntimeException e) { 309 // error at commit time 310 if (suppressed.isEmpty()) { 311 return e; 312 } 313 } 314 // reached if no catch, or if TransactionRuntimeException caught 315 if (suppressed.isEmpty()) { 316 return null; 317 } 318 // exceptions during commit caused a rollback in SessionImpl#end 319 Exception e = suppressed.get(0); 320 for (int i = 1; i < suppressed.size(); i++) { 321 e.addSuppressed(suppressed.get(i)); 322 } 323 return e; 324 } 325 326 /** 327 * Does work under a transaction. 328 * 329 * @since 5.9.4 330 * @throws ConcurrentUpdateException, TransactionRuntimeException 331 */ 332 protected void runWorkWithTransaction() throws ConcurrentUpdateException { 333 TransactionHelper.startTransaction(); 334 boolean ok = false; 335 Exception exc = null; 336 try { 337 WorkSchedulePath.handleEnter(this); 338 // --- do work 339 setStartTime(); 340 work(); // may throw ConcurrentUpdateException 341 ok = true; 342 // --- end work 343 } catch (Exception e) { 344 exc = e; 345 if (e instanceof ConcurrentUpdateException) { 346 throw (ConcurrentUpdateException) e; 347 } else if (e instanceof RuntimeException) { 348 throw (RuntimeException) e; 349 } else if (e instanceof InterruptedException) { 350 // restore interrupted status for the thread pool worker 351 Thread.currentThread().interrupt(); 352 } 353 throw new RuntimeException(e); 354 } finally { 355 WorkSchedulePath.handleReturn(); 356 try { 357 setCompletionTime(); 358 cleanUp(ok, exc); 359 } finally { 360 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 361 if (!ok) { 362 TransactionHelper.setTransactionRollbackOnly(); 363 } 364 TransactionHelper.commitOrRollbackTransaction(); 365 } 366 } 367 } 368 } 369 370 @Override 371 public abstract void work(); 372 373 /** 374 * Gets the number of times that this Work instance can be retried in case of concurrent update exceptions. 375 * 376 * @return 0 for no retry, or more if some retries are possible 377 * @see #work 378 * @since 5.8 379 */ 380 public int getRetryCount() { 381 return 0; 382 } 383 384 /** 385 * This method is called after {@link #work} is done in a finally block, whether work completed normally or was in 386 * error or was interrupted. 387 * 388 * @param ok {@code true} if the work completed normally 389 * @param e the exception, if available 390 */ 391 @Override 392 public void cleanUp(boolean ok, Exception e) { 393 if (!ok) { 394 if (e instanceof InterruptedException) { 395 log.debug("Suspended work: " + this); 396 } else { 397 if (!(e instanceof ConcurrentUpdateException)) { 398 log.error("Exception during work: " + this, e); 399 if (WorkSchedulePath.captureStack) { 400 WorkSchedulePath.log.error("Work schedule path", getSchedulePath().getStack()); 401 } 402 } 403 } 404 } 405 closeSession(); 406 } 407 408 @Override 409 public String getUserId() { 410 // TODO 411 return null; 412 } 413 414 @Override 415 public long getSchedulingTime() { 416 return schedulingTime; 417 } 418 419 @Override 420 public long getStartTime() { 421 return startTime; 422 } 423 424 @Override 425 public long getCompletionTime() { 426 return completionTime; 427 } 428 429 @Override 430 public void setStartTime() { 431 startTime = System.currentTimeMillis(); 432 } 433 434 protected void setCompletionTime() { 435 completionTime = System.currentTimeMillis(); 436 } 437 438 @Override 439 public String getCategory() { 440 return getClass().getSimpleName(); 441 } 442 443 @Override 444 public String toString() { 445 StringBuilder buf = new StringBuilder(); 446 buf.append(getClass().getSimpleName()); 447 buf.append('('); 448 if (docId != null) { 449 buf.append(docId); 450 buf.append(", "); 451 } else if (docIds != null && docIds.size() > 0) { 452 buf.append(docIds.get(0)); 453 buf.append("..., "); 454 } 455 buf.append(getSchedulePath().getParentPath()); 456 buf.append(", "); 457 buf.append(getProgress()); 458 buf.append(", "); 459 buf.append(getStatus()); 460 buf.append(')'); 461 return buf.toString(); 462 } 463 464 @Override 465 public DocumentLocation getDocument() { 466 if (docId != null) { 467 return newDocumentLocation(docId); 468 } 469 return null; 470 } 471 472 @Override 473 public List<DocumentLocation> getDocuments() { 474 if (docIds != null) { 475 List<DocumentLocation> res = new ArrayList<DocumentLocation>(docIds.size()); 476 for (String docId : docIds) { 477 res.add(newDocumentLocation(docId)); 478 } 479 return res; 480 } 481 if (docId != null) { 482 return Collections.singletonList(newDocumentLocation(docId)); 483 } 484 return Collections.emptyList(); 485 } 486 487 protected DocumentLocation newDocumentLocation(String docId) { 488 return new DocumentLocationImpl(repositoryName, new IdRef(docId)); 489 } 490 491 @Override 492 public boolean isDocumentTree() { 493 return isTree; 494 } 495 496 @Override 497 public String getWorkInstanceResult() { 498 return null; 499 } 500 501 /** 502 * Releases the transaction resources by committing the existing transaction (if any). This is recommended before 503 * running a long process. 504 */ 505 public void commitOrRollbackTransaction() { 506 if (TransactionHelper.isTransactionActiveOrMarkedRollback()) { 507 TransactionHelper.commitOrRollbackTransaction(); 508 } 509 } 510 511 /** 512 * Starts a new transaction. 513 * <p> 514 * Usually called after {@code commitOrRollbackTransaction()}, for instance for saving back the results of a long 515 * process. 516 * 517 * @return true if a new transaction was started 518 */ 519 public boolean startTransaction() { 520 return TransactionHelper.startTransaction(); 521 } 522 523 @Override 524 public boolean equals(Object other) { 525 if (!(other instanceof Work)) { 526 return false; 527 } 528 return ((Work) other).getId().equals(getId()); 529 } 530 531 @Override 532 public int hashCode() { 533 return getId().hashCode(); 534 } 535 536}