001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019 020package org.nuxeo.runtime.transaction; 021 022import java.lang.reflect.Field; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.function.Supplier; 031 032import javax.naming.NamingException; 033import javax.transaction.HeuristicMixedException; 034import javax.transaction.HeuristicRollbackException; 035import javax.transaction.InvalidTransactionException; 036import javax.transaction.NotSupportedException; 037import javax.transaction.RollbackException; 038import javax.transaction.Status; 039import javax.transaction.Synchronization; 040import javax.transaction.SystemException; 041import javax.transaction.Transaction; 042import javax.transaction.TransactionManager; 043import javax.transaction.TransactionSynchronizationRegistry; 044import javax.transaction.UserTransaction; 045import javax.transaction.xa.XAResource; 046 047import org.apache.commons.lang3.reflect.FieldUtils; 048import org.apache.commons.logging.Log; 049import org.apache.commons.logging.LogFactory; 050import org.nuxeo.runtime.jtajca.NuxeoContainer; 051 052/** 053 * Utilities to work with transactions. 054 */ 055public class TransactionHelper { 056 057 private static final Log log = LogFactory.getLog(TransactionHelper.class); 058 059 private static final Field GERONIMO_TRANSACTION_TIMEOUT_FIELD = FieldUtils.getField( 060 org.apache.geronimo.transaction.manager.TransactionImpl.class, "timeout", true); 061 062 /** 063 * Thread pool used to execute code in a separate transactional context. 064 * 065 * @since 11.1 066 */ 067 // like Executors.newCachedThreadPool() but using a small keepAliveTime to avoid blocking shutdown 068 protected static final ExecutorService EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, 069 new SynchronousQueue<>()); 070 071 private TransactionHelper() { 072 // utility class 073 } 074 075 /** 076 * Looks up the User Transaction in JNDI. 077 * 078 * @return the User Transaction 079 * @throws NamingException if not found 080 */ 081 public static UserTransaction lookupUserTransaction() throws NamingException { 082 UserTransaction ut = NuxeoContainer.getUserTransaction(); 083 if (ut == null) { 084 throw new NamingException("tx manager not installed"); 085 } 086 return ut; 087 } 088 089 /** 090 * Gets the transaction status. 091 * 092 * @return the transaction {@linkplain Status status}, or -1 if there is no transaction manager 093 * @since 8.4 094 * @see Status 095 */ 096 public static int getTransactionStatus() { 097 UserTransaction ut = NuxeoContainer.getUserTransaction(); 098 if (ut == null) { 099 return -1; 100 } 101 try { 102 return ut.getStatus(); 103 } catch (SystemException e) { 104 throw new TransactionRuntimeException("Cannot get transaction status", e); 105 } 106 } 107 108 /** 109 * Returns the UserTransaction JNDI binding name. 110 * <p> 111 * Assumes {@link #lookupUserTransaction} has been called once before. 112 */ 113 public static String getUserTransactionJNDIName() { 114 return NuxeoContainer.nameOf("UserTransaction"); 115 } 116 117 /** 118 * Looks up the TransactionManager in JNDI. 119 * 120 * @return the TransactionManager 121 * @throws NamingException if not found 122 */ 123 public static TransactionManager lookupTransactionManager() throws NamingException { 124 TransactionManager tm = NuxeoContainer.getTransactionManager(); 125 if (tm == null) { 126 throw new NamingException("tx manager not installed"); 127 } 128 return tm; 129 } 130 131 /** 132 * Looks up the TransactionSynchronizationRegistry in JNDI. 133 * 134 * @return the TransactionSynchronizationRegistry 135 * @throws NamingException if not found 136 */ 137 public static TransactionSynchronizationRegistry lookupSynchronizationRegistry() throws NamingException { 138 TransactionSynchronizationRegistry synch = NuxeoContainer.getTransactionSynchronizationRegistry(); 139 if (synch == null) { 140 throw new NamingException("tx manager not installed"); 141 } 142 return synch; 143 } 144 145 /** 146 * Checks if there is no transaction 147 * 148 * @since 6.0 149 */ 150 public static boolean isNoTransaction() { 151 int status = getTransactionStatus(); 152 return status == Status.STATUS_NO_TRANSACTION || status == -1; 153 } 154 155 /** 156 * Checks if the current User Transaction is active. 157 */ 158 public static boolean isTransactionActive() { 159 int status = getTransactionStatus(); 160 return status == Status.STATUS_ACTIVE; 161 } 162 163 /** 164 * Checks if the current User Transaction is marked rollback only. 165 */ 166 public static boolean isTransactionMarkedRollback() { 167 int status = getTransactionStatus(); 168 return status == Status.STATUS_MARKED_ROLLBACK; 169 } 170 171 /** 172 * Checks if the current User Transaction is active or marked rollback only. 173 */ 174 public static boolean isTransactionActiveOrMarkedRollback() { 175 int status = getTransactionStatus(); 176 return status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK; 177 } 178 179 /** 180 * Checks if the current User Transaction is active or preparing. 181 * 182 * @since 8.4 183 */ 184 public static boolean isTransactionActiveOrPreparing() { 185 int status = getTransactionStatus(); 186 return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING; 187 } 188 189 /** 190 * Checks if the current User Transaction has already timed out, i.e., whether a commit would immediately abort with 191 * a timeout exception. 192 * 193 * @return {@code true} if there is a current transaction that has timed out, {@code false} otherwise 194 * @since 7.1 195 */ 196 public static boolean isTransactionTimedOut() { 197 TransactionManager tm = NuxeoContainer.getTransactionManager(); 198 if (tm == null) { 199 return false; 200 } 201 try { 202 Transaction tx = tm.getTransaction(); 203 if (tx == null || tx.getStatus() != Status.STATUS_ACTIVE) { 204 return false; 205 } 206 if (tx instanceof org.apache.geronimo.transaction.manager.TransactionImpl) { 207 // Geronimo Transaction Manager 208 Long timeout = (Long) GERONIMO_TRANSACTION_TIMEOUT_FIELD.get(tx); 209 return System.currentTimeMillis() > timeout.longValue(); 210 } else { 211 // unknown transaction manager 212 return false; 213 } 214 } catch (SystemException | ReflectiveOperationException e) { 215 throw new RuntimeException(e); 216 } 217 } 218 219 /** 220 * Checks if the current User Transaction has already timed out, i.e., whether a commit would immediately abort with 221 * a timeout exception. 222 * <p> 223 * Throws if the transaction has timed out. 224 * 225 * @throws TransactionRuntimeException if the transaction has timed out 226 * @since 8.4 227 */ 228 public static void checkTransactionTimeout() throws TransactionRuntimeException { 229 if (isTransactionTimedOut()) { 230 throw new TransactionRuntimeException("Transaction has timed out"); 231 } 232 } 233 234 /** 235 * Starts a new User Transaction. 236 * 237 * @return {@code true} if the transaction was successfully started, {@code false} otherwise 238 */ 239 public static boolean startTransaction() { 240 UserTransaction ut = NuxeoContainer.getUserTransaction(); 241 if (ut == null) { 242 return false; 243 } 244 try { 245 if (log.isDebugEnabled()) { 246 log.debug("Starting transaction"); 247 } 248 ut.begin(); 249 return true; 250 } catch (NotSupportedException | SystemException e) { 251 log.error("Unable to start transaction", e); 252 } 253 return false; 254 } 255 256 /** 257 * Suspend the current transaction if active and start a new transaction 258 * 259 * @return the suspended transaction or null 260 * @since 5.6 261 * @deprecated since 11.1, as not all backends (transaction resource managers) allow suspending the transaction or 262 * transaction interleaving, instead use {@link #runInNewTransaction} or {@link #runWithoutTransaction} 263 * explicitly 264 */ 265 @Deprecated(since = "11.1") 266 public static Transaction requireNewTransaction() { 267 TransactionManager tm = NuxeoContainer.getTransactionManager(); 268 if (tm == null) { 269 return null; 270 } 271 try { 272 Transaction tx = tm.getTransaction(); 273 if (tx != null) { 274 tx = tm.suspend(); 275 } 276 tm.begin(); 277 return tx; 278 } catch (NotSupportedException | SystemException e) { 279 throw new TransactionRuntimeException("Cannot suspend tx", e); 280 } 281 } 282 283 /** 284 * Suspends the current transaction and returns 285 * 286 * @return the suspended transaction or null 287 * @deprecated since 11.1, as not all backends (transaction resource managers) allow suspending the transaction or 288 * transaction interleaving, instead use {@link #runInNewTransaction} or {@link #runWithoutTransaction} 289 * explicitly 290 */ 291 @Deprecated(since = "11.1") 292 public static Transaction suspendTransaction() { 293 TransactionManager tm = NuxeoContainer.getTransactionManager(); 294 if (tm == null) { 295 return null; 296 } 297 try { 298 Transaction tx = tm.getTransaction(); 299 if (tx != null) { 300 tx = tm.suspend(); 301 } 302 return tx; 303 } catch (SystemException e) { 304 throw new TransactionRuntimeException("Cannot suspend tx", e); 305 } 306 } 307 308 /** 309 * Commit the current transaction if active and resume the principal transaction 310 * 311 * @deprecated since 11.1, as not all backends (transaction resource managers) allow suspending the transaction or 312 * transaction interleaving, instead use {@link #runInNewTransaction} or {@link #runWithoutTransaction} 313 * explicitly 314 */ 315 @Deprecated(since = "11.1") 316 public static void resumeTransaction(Transaction tx) { 317 TransactionManager tm = NuxeoContainer.getTransactionManager(); 318 if (tm == null) { 319 return; 320 } 321 try { 322 if (tm.getStatus() == Status.STATUS_ACTIVE) { 323 tm.commit(); 324 } 325 if (tx != null) { 326 tm.resume(tx); 327 } 328 } catch (SystemException | RollbackException | HeuristicMixedException | HeuristicRollbackException 329 | InvalidTransactionException | IllegalStateException | SecurityException e) { 330 throw new TransactionRuntimeException("Cannot resume tx", e); 331 } 332 } 333 334 /** 335 * Starts a new User Transaction with the specified timeout. 336 * 337 * @param timeout the timeout in seconds, %lt;= 0 for the default 338 * @return {@code true} if the transaction was successfully started, {@code false} otherwise 339 * @since 5.6 340 */ 341 public static boolean startTransaction(int timeout) { 342 if (timeout < 0) { 343 timeout = 0; 344 } 345 TransactionManager tm = NuxeoContainer.getTransactionManager(); 346 if (tm == null) { 347 return false; 348 } 349 350 try { 351 tm.setTransactionTimeout(timeout); 352 } catch (SystemException e) { 353 log.error("Unable to set transaction timeout: " + timeout, e); 354 return false; 355 } 356 try { 357 return startTransaction(); 358 } finally { 359 try { 360 tm.setTransactionTimeout(0); 361 } catch (SystemException e) { 362 log.error("Unable to reset transaction timeout", e); 363 } 364 } 365 } 366 367 /** 368 * Commits or rolls back the User Transaction depending on the transaction status. 369 */ 370 public static void commitOrRollbackTransaction() { 371 UserTransaction ut = NuxeoContainer.getUserTransaction(); 372 if (ut == null) { 373 return; 374 } 375 noteSuppressedExceptions(); 376 RuntimeException thrown = null; 377 boolean isRollbackDuringCommit = false; 378 try { 379 int status = ut.getStatus(); 380 if (status == Status.STATUS_ACTIVE) { 381 if (log.isDebugEnabled()) { 382 log.debug("Committing transaction"); 383 } 384 try { 385 ut.commit(); 386 } catch (HeuristicRollbackException | HeuristicMixedException e) { 387 throw new TransactionRuntimeException(e.getMessage(), e); 388 } catch (RollbackException e) { 389 // from org.apache.geronimo.transaction.manager.TransactionImpl.commit 390 Throwable cause = e.getCause(); 391 String msg; 392 if (cause != null && "Transaction has timed out".equals(cause.getMessage())) { 393 msg = "Unable to commit: Transaction timeout"; 394 } else { 395 // this happens if there's a ConcurrentUpdateException at transaction end inside VCS 396 isRollbackDuringCommit = true; 397 msg = e.getMessage(); 398 } 399 log.debug("Unable to commit", e); 400 throw new TransactionRuntimeException(msg, e); 401 } 402 } else if (status == Status.STATUS_MARKED_ROLLBACK) { 403 if (log.isDebugEnabled()) { 404 log.debug("Cannot commit transaction because it is marked rollback only"); 405 } 406 ut.rollback(); 407 } else { 408 if (log.isDebugEnabled()) { 409 log.debug("Cannot commit transaction with unknown status: " + status); 410 } 411 } 412 } catch (SystemException e) { 413 thrown = new TransactionRuntimeException(e); 414 throw thrown; 415 } catch (RuntimeException e) { 416 thrown = e; 417 throw thrown; 418 } finally { 419 List<Exception> suppressed = getSuppressedExceptions(); 420 if (!suppressed.isEmpty()) { 421 // add suppressed to thrown exception, or throw a new one 422 RuntimeException e; 423 if (thrown == null) { 424 e = new TransactionRuntimeException("Exception during commit"); 425 } else { 426 if (isRollbackDuringCommit && suppressed.get(0) instanceof RuntimeException) { 427 // use the suppressed one directly and throw it instead 428 thrown = null; // force rethrow below 429 e = (RuntimeException) suppressed.remove(0); 430 } else { 431 e = thrown; 432 } 433 } 434 suppressed.forEach(e::addSuppressed); 435 if (thrown == null) { 436 throw e; 437 } 438 } 439 } 440 } 441 442 private static final ThreadLocal<List<Exception>> suppressedExceptions = new ThreadLocal<>(); 443 444 /** 445 * After this, some exceptions during transaction commit may be suppressed and remembered. 446 * 447 * @since 5.9.4 448 */ 449 protected static void noteSuppressedExceptions() { 450 suppressedExceptions.set(new ArrayList<>()); 451 } 452 453 /** 454 * Remembers the exception if it happens during the processing of a commit, so that it can be surfaced as a 455 * suppressed exception at the end of the commit. 456 * 457 * @since 5.9.4 458 */ 459 public static void noteSuppressedException(Exception e) { 460 List<Exception> exceptions = suppressedExceptions.get(); 461 if (exceptions != null) { 462 exceptions.add(e); 463 } 464 } 465 466 /** 467 * Gets the suppressed exceptions, and stops remembering. 468 * 469 * @since 5.9.4 470 */ 471 protected static List<Exception> getSuppressedExceptions() { 472 List<Exception> exceptions = suppressedExceptions.get(); 473 suppressedExceptions.remove(); 474 return exceptions == null ? List.of() : exceptions; 475 } 476 477 /** 478 * Sets the current User Transaction as rollback only. 479 * 480 * @return {@code true} if the transaction was successfully marked rollback only, {@code false} otherwise 481 */ 482 public static boolean setTransactionRollbackOnly() { 483 if (log.isDebugEnabled()) { 484 log.debug("Setting transaction as rollback only"); 485 if (log.isTraceEnabled()) { 486 log.trace("Rollback stack trace", new Throwable("Rollback stack trace")); 487 } 488 } 489 UserTransaction ut = NuxeoContainer.getUserTransaction(); 490 if (ut == null) { 491 return false; 492 } 493 try { 494 ut.setRollbackOnly(); 495 return true; 496 } catch (IllegalStateException | SystemException cause) { 497 log.error("Could not mark transaction as rollback only", cause); 498 } 499 return false; 500 } 501 502 /** 503 * Sets the current User Transaction as rollback only if it has timed out. 504 * 505 * @return {@code true} if the transaction was successfully marked rollback only, {@code false} otherwise 506 * @since 7.1 507 */ 508 public static boolean setTransactionRollbackOnlyIfTimedOut() { 509 if (isTransactionTimedOut()) { 510 return setTransactionRollbackOnly(); 511 } 512 return false; 513 } 514 515 public static void registerSynchronization(Synchronization handler) { 516 if (!isTransactionActiveOrPreparing()) { 517 throw new TransactionRuntimeException("Cannot register Synchronization if transaction is not active"); 518 } 519 try { 520 NuxeoContainer.getTransactionManager().getTransaction().registerSynchronization(handler); 521 } catch (IllegalStateException | RollbackException | SystemException cause) { 522 throw new RuntimeException("Cannot register synch handler in current tx", cause); 523 } 524 } 525 526 /** 527 * Enlists a XA resource in the current transaction. 528 * 529 * @param xaRes the XA resource 530 * @since 11.1 531 */ 532 public static void enlistResource(XAResource xaRes) { 533 if (!isTransactionActiveOrMarkedRollback()) { 534 throw new TransactionRuntimeException("Cannot enlist XA resource if transaction is not active"); 535 } 536 try { 537 NuxeoContainer.getTransactionManager().getTransaction().enlistResource(xaRes); 538 } catch (IllegalStateException | RollbackException | SystemException cause) { 539 throw new RuntimeException("Cannot enlist XA resource in current tx", cause); 540 } 541 } 542 543 /** 544 * Runs the given {@link Runnable} without a transactional context. 545 * 546 * @param runnable the {@link Runnable} 547 * @since 9.1 548 */ 549 public static void runWithoutTransaction(Runnable runnable) { 550 runWithoutTransaction(() -> { runnable.run(); return null; }); 551 } 552 553 554 /** 555 * Calls the given {@link Supplier} without a transactional context. 556 * 557 * @param supplier the {@link Supplier} 558 * @return the supplier's result 559 * @since 9.1 560 */ 561 public static <R> R runWithoutTransaction(Supplier<R> supplier) { 562 return runWithoutTransactionInternal(() -> runAndCleanupTransactionContext(supplier)); 563 } 564 565 /** 566 * Runs the given {@link Runnable} in a new transactional context. 567 * 568 * @param runnable the {@link Runnable} 569 * @since 9.1 570 */ 571 public static void runInNewTransaction(Runnable runnable) { 572 runInNewTransaction(() -> { runnable.run(); return null;}); 573 } 574 575 /** 576 * Calls the given {@link Supplier} in a new transactional context. 577 * 578 * @param supplier the {@link Supplier} 579 * @return the supplier's result 580 * @since 9.1 581 */ 582 public static <R> R runInNewTransaction(Supplier<R> supplier) { 583 return runWithoutTransaction(() -> runInTransaction(supplier)); 584 } 585 586 /** 587 * Runs the given {@link Runnable} in a transactional context. Will not start a new transaction if one already 588 * exists. 589 * 590 * @param runnable the {@link Runnable} 591 * @since 8.4 592 */ 593 public static void runInTransaction(Runnable runnable) { 594 runInTransaction(() -> {runnable.run(); return null;}); 595 } 596 597 /** 598 * Calls the given {@link Supplier} in a transactional context. Will not start a new transaction if one already 599 * exists. 600 * 601 * @param supplier the {@link Supplier} 602 * @return the supplier's result 603 * @since 8.4 604 */ 605 public static <R> R runInTransaction(Supplier<R> supplier) { 606 boolean startTransaction = !isTransactionActiveOrMarkedRollback(); 607 if (startTransaction) { 608 if (!startTransaction()) { 609 throw new TransactionRuntimeException("Cannot start transaction"); 610 } 611 } 612 boolean completedAbruptly = true; 613 try { 614 R result = supplier.get(); 615 completedAbruptly = false; 616 return result; 617 } finally { 618 try { 619 if (completedAbruptly) { 620 setTransactionRollbackOnly(); 621 } 622 } finally { 623 if (startTransaction) { 624 commitOrRollbackTransaction(); 625 } 626 } 627 } 628 } 629 630 /** 631 * Calls the given {@link Supplier} in a context without a transaction. The supplier must do its own transactional 632 * cleanup to restore the thread to a pristine state. 633 * 634 * @param supplier the {@link Supplier} 635 * @return the supplier's result 636 * @since 11.1 637 */ 638 protected static <R> R runWithoutTransactionInternal(Supplier<R> supplier) { 639 // if there is already no transaction, run in this thread 640 if (isNoTransaction()) { 641 return supplier.get(); 642 } 643 // otherwise use a separate thread to get a separate transactional context 644 try { 645 return EXECUTOR.submit(supplier::get).get(); 646 } catch (InterruptedException e) { 647 Thread.currentThread().interrupt(); // restore interrupted status 648 throw new RuntimeException(e); 649 } catch (ExecutionException e) { 650 Throwable cause = e.getCause(); 651 if (cause instanceof RuntimeException) { 652 throw (RuntimeException) cause; 653 } else { 654 throw new RuntimeException(cause); 655 } 656 } 657 } 658 659 /** 660 * Calls the given {@link Supplier} and cleans up the transaction context afterwards. 661 * 662 * @param supplier the {@link Supplier} 663 * @return the supplier's result 664 * @since 11.1 665 */ 666 protected static <R> R runAndCleanupTransactionContext(Supplier<R> supplier) { 667 try { 668 return supplier.get(); 669 } finally { 670 if (!isNoTransaction()) { 671 // restore the no-transaction context of this thread 672 try { 673 commitOrRollbackTransaction(); 674 } catch (TransactionRuntimeException e) { 675 log.error("Failed to commit/rollback", e); 676 } 677 } 678 } 679 } 680 681}