001/* 002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.datasource; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.nuxeo.common.utils.JDBCUtils; 025import org.nuxeo.runtime.api.Framework; 026import org.nuxeo.runtime.datasource.PooledDataSourceRegistry.PooledDataSource; 027import org.nuxeo.runtime.transaction.TransactionHelper; 028 029import java.lang.reflect.InvocationHandler; 030import java.lang.reflect.InvocationTargetException; 031import java.lang.reflect.Method; 032import java.lang.reflect.Proxy; 033import java.sql.Connection; 034import java.sql.SQLException; 035import java.util.ArrayList; 036import java.util.List; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentMap; 039 040import javax.naming.NamingException; 041import javax.resource.ResourceException; 042import javax.sql.DataSource; 043import javax.transaction.RollbackException; 044import javax.transaction.Status; 045import javax.transaction.Synchronization; 046import javax.transaction.SystemException; 047import javax.transaction.Transaction; 048 049/** 050 * This helper provides a way to get a JDBC connection, through {@link #getConnection(String)}, that will return a 051 * connection wrapper able to use a shared connection when used in transactional mode and setAutoCommit(false) is 052 * called, and otherwise use a normal physical JDBC connection. 053 * <p> 054 * The physical connections are created from the datasource configured using the framework property {@value #SINGLE_DS}. 055 * <p> 056 * This helper is used to implement consistent resource management in a non-XA context. Several users of the shared 057 * connection can call setAutoCommit(false) then do transactional work and commit(). Only the commit() of the last user 058 * will do an actual commit on the physical connection. 059 * 060 * @since 5.7 061 */ 062public class ConnectionHelper { 063 064 private static final Log log = LogFactory.getLog(ConnectionHelper.class); 065 066 /** 067 * Shared connection for each transaction. 068 * <p> 069 * The shared connection is always in autoCommit=false. 070 * <p> 071 * Things are removed from this map by a transaction synchronizer when the transaction finishes. 072 */ 073 private static ConcurrentMap<Transaction, SharedConnection> sharedConnections = new ConcurrentHashMap<Transaction, SharedConnection>(); 074 075 /** 076 * SharedConnectionSynchronization registered for the transaction, when sharing. 077 */ 078 private static ConcurrentMap<Transaction, SharedConnectionSynchronization> sharedSynchronizations = new ConcurrentHashMap<Transaction, SharedConnectionSynchronization>(); 079 080 /** 081 * Property holding a datasource name to use to replace all database accesses. 082 */ 083 public static final String SINGLE_DS = "nuxeo.db.singleDataSource"; 084 085 /** 086 * Property holding one ore more datasource names (comma or space separated) for whose connections the single 087 * datasource is not used. 088 */ 089 public static final String EXCLUDE_DS = "nuxeo.db.singleDataSource.exclude"; 090 091 092 private static final String singleDsPropertyCached; 093 094 private static final String excludeDsPropertyCached; 095 096 static { 097 if (Framework.isTestModeSet()) { 098 // no cache for test mode so unit test can change this property at runtime 099 excludeDsPropertyCached = null; 100 singleDsPropertyCached = null; 101 } else { 102 excludeDsPropertyCached = Framework.getProperty(EXCLUDE_DS, ""); 103 singleDsPropertyCached = Framework.getProperty(SINGLE_DS, ""); 104 } 105 } 106 107 /** 108 * Wrapper for a connection that delegates calls to either a private connection, or a per-transaction shared one if 109 * a transaction is started. 110 * <p> 111 * Sharing is started on setAutoCommit(true), and ends on setAutoCommit(false) or close(). 112 */ 113 private static class ConnectionHandle implements InvocationHandler { 114 115 private boolean closed; 116 117 /** 118 * Expected autoCommit mode by the client for this connection. 119 */ 120 private boolean autoCommit; 121 122 /** 123 * The transaction in use at the time where sharing was started (autoCommit was set to false during a 124 * transaction). 125 * <p> 126 * The sharedConnection is allocated on first use after that. 127 */ 128 private Transaction transactionForShare; 129 130 /** 131 * A local connection, allocated when the connection is used when sharedInTransaction == null. 132 */ 133 private Connection localConnection; 134 135 /** 136 * A shared connection, allocated when the connection is used when sharedInTransaction != null. 137 */ 138 private SharedConnection sharedConnection; 139 140 /** 141 * True between the first use and the commit/rollback (in non-autoCommit mode and shared connection). 142 */ 143 private boolean began; 144 145 public ConnectionHandle() { 146 autoCommit = true; 147 if (log.isDebugEnabled()) { 148 log.debug("Construct " + this); 149 if (log.isTraceEnabled()) { 150 log.trace("Construct stacktrace " + this, new Exception("debug")); 151 } 152 } 153 } 154 155 private void logInvoke(String message) { 156 if (log.isDebugEnabled()) { 157 log.debug("Invoke " + message + " " + this); 158 } 159 } 160 161 @Override 162 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 163 String methodName = method.getName(); 164 if (methodName.equals("isClosed")) { 165 return isClosed(); 166 } else if (methodName.equals("close")) { 167 close(); 168 return null; 169 } 170 if (closed) { 171 throw new SQLException("Connection is closed", "08003"); 172 } 173 174 if (methodName.equals("getAutoCommit")) { 175 return getAutoCommit(); 176 } 177 178 if (methodName.equals("setAutoCommit")) { 179 setAutoCommit(((Boolean) args[0]).booleanValue()); 180 return null; 181 } 182 183 Connection connection; 184 if (transactionForShare != null) { 185 // check that we're still in the same transaction 186 // this also enforces single-threaded use of 187 // the shared connection 188 Transaction transaction = getTransaction(); 189 if (transaction != transactionForShare) { 190 throw new SQLException("Calling method " + methodName 191 + ", connection sharing started in transaction " + transactionForShare 192 + " but it is now used in transaction " + transaction); 193 } 194 195 sharedConnectionAllocate(); 196 197 // for begin/commit we don't actually need to allocate 198 // the connection 199 if (methodName.equals("commit")) { 200 if (autoCommit) { 201 throw new SQLException("Cannot commit outside of transaction", "25000"); 202 } 203 sharedConnectionCommit(); 204 return null; 205 } else if (methodName.equals("rollback")) { 206 if (autoCommit) { 207 throw new SQLException("Cannot commit outside of transaction", "25000"); 208 } 209 if (args != null && args.length > 0) { 210 throw new SQLException("Not implemented: rollback(Savepoint)", "0A000"); 211 } 212 sharedConnectionRollback(); 213 return null; 214 } else if (methodName.equals("setSavepoint") || methodName.equals("releaseSavepoint")) { 215 throw new SQLException("Not implemented: " + methodName, "0A000"); 216 } 217 218 sharedConnectionBegin(methodName); 219 220 connection = sharedConnection.getConnection(); 221 } else { 222 localConnectionAllocate(); 223 connection = localConnection; 224 } 225 226 try { 227 if (log.isDebugEnabled()) { 228 if (sharedConnection == null) { 229 logInvoke(methodName); 230 } else { 231 logInvoke(methodName + " " + sharedConnection); 232 } 233 } 234 return method.invoke(connection, args); 235 } catch (InvocationTargetException e) { 236 throw e.getCause(); 237 } 238 } 239 240 private Boolean getAutoCommit() { 241 return Boolean.valueOf(autoCommit); 242 } 243 244 private void setAutoCommit(boolean setAutoCommit) throws SQLException { 245 if (setAutoCommit == autoCommit) { 246 return; // no change 247 } 248 autoCommit = setAutoCommit; 249 if (log.isDebugEnabled()) { 250 log.debug("setAutoCommit(" + autoCommit + ") " + this); 251 } 252 if (!autoCommit) { 253 // setting autoCommit = false 254 if (transactionForShare != null) { 255 throw new AssertionError("autoCommit=false when already sharing"); 256 } 257 // not yet sharing 258 Transaction transaction = getTransaction(); 259 if (transaction != null && transactionStatus(transaction) == Status.STATUS_ACTIVE) { 260 // start sharing 261 transactionForShare = transaction; 262 if (localConnection != null) { 263 // share using the previous local connection 264 logInvoke("setAutoCommit false"); 265 localConnection.setAutoCommit(false); 266 log.debug("Upgrading local connection to shared"); 267 sharedConnection = getSharedConnection(localConnection); 268 localConnection = null; 269 } else { 270 // sharedConnection allocated on first use 271 } 272 } else { 273 log.debug("No usable transaction"); 274 // we're outside a usable transaction 275 // use the local connection 276 if (localConnection != null) { 277 logInvoke("setAutoCommit false"); 278 localConnection.setAutoCommit(false); 279 } else { 280 // localConnection allocated on first use 281 } 282 } 283 } else { 284 // setting autoCommit = true 285 if (transactionForShare != null) { 286 if (began) { 287 // do automatic commit 288 log.debug("setAutoCommit true committing shared"); 289 sharedConnectionCommit(); 290 } 291 // stop sharing 292 sharedConnection = null; 293 transactionForShare = null; 294 } else if (localConnection != null) { 295 logInvoke("setAutoCommit true"); 296 localConnection.setAutoCommit(true); 297 } 298 } 299 } 300 301 // allocation on first use 302 private void localConnectionAllocate() throws SQLException { 303 if (localConnection == null) { 304 if (log.isDebugEnabled()) { 305 log.debug("Constructing physical connection " + this); 306 if (log.isTraceEnabled()) { 307 log.trace("Constructing physical connection stacktrace", new Exception("debug")); 308 } 309 } 310 localConnection = getPhysicalConnection(); 311 logInvoke("setAutoCommit " + autoCommit); 312 localConnection.setAutoCommit(autoCommit); 313 } 314 } 315 316 // allocation on first use 317 private void sharedConnectionAllocate() throws SQLException { 318 if (sharedConnection == null) { 319 if (transactionStatus(transactionForShare) == Status.STATUS_ACTIVE) { 320 sharedConnection = getSharedConnection(null); 321 // autoCommit mode set by SharedConnection.allocate() 322 } else { 323 // already committing or rolling back 324 // do not assign a connection at all 325 // only commit or rollback is allowed, 326 // and they will do nothing (began=false) 327 } 328 } 329 } 330 331 private void sharedConnectionBegin(String methodName) throws SQLException { 332 if (sharedConnection == null) { 333 throw new SQLException("Cannot call " + methodName + " with transaction in state " 334 + transactionStatus(transactionForShare), "25000"); 335 } 336 if (!autoCommit && !began) { 337 sharedConnection.begin(this); 338 began = true; 339 } 340 } 341 342 private void sharedConnectionCommit() throws SQLException { 343 if (began) { 344 if (log.isDebugEnabled()) { 345 log.debug("Committing shared " + this); 346 } 347 sharedConnection.commit(this); 348 began = false; 349 } 350 } 351 352 private void sharedConnectionRollback() throws SQLException { 353 if (began) { 354 sharedConnection.rollback(this); 355 began = false; 356 } 357 } 358 359 /** Called back from SharedConnection close. */ 360 protected void closeFromSharedConnection() { 361 sharedConnection = null; 362 transactionForShare = null; 363 } 364 365 private Boolean isClosed() { 366 return Boolean.valueOf(closed); 367 } 368 369 private void close() throws SQLException { 370 if (!closed) { 371 if (log.isDebugEnabled()) { 372 log.debug("close() " + this); 373 } 374 if (transactionForShare != null) { 375 if (sharedConnection != null) { 376 if (began) { 377 // connection closed before commit/rollback 378 // commit it by hand (even though it's unspecified) 379 log.debug("close committing shared"); 380 sharedConnectionCommit(); 381 } 382 sharedConnection = null; 383 } 384 transactionForShare = null; 385 } else { 386 if (localConnection != null) { 387 logInvoke("close"); 388 localConnection.close(); 389 localConnection = null; 390 } 391 } 392 closed = true; 393 } 394 } 395 396 /** Gets the physical connection, use by unwrap. */ 397 public Connection getUnwrappedConnection() throws SQLException { 398 Connection connection; 399 if (sharedConnection != null) { 400 connection = sharedConnection.getConnection(); 401 } else { 402 connection = localConnection; 403 } 404 if (connection == null) { 405 throw new SQLException("Connection not allocated"); 406 } 407 return connection; 408 } 409 410 /** 411 * Gets the shared connection for the shared transaction, or allocates a new one. If allocating a new one, 412 * registers a synchronizer in order to remove it at transaction completion time. 413 * 414 * @param connection an existing local connection to reuse, or null 415 */ 416 private SharedConnection getSharedConnection(Connection connection) throws SQLException { 417 SharedConnection sharedConnection = sharedConnections.get(transactionForShare); 418 if (sharedConnection == null) { 419 // allocate a new shared connection 420 sharedConnection = new SharedConnection(connection); 421 if (log.isDebugEnabled()) { 422 log.debug("Allocating new shared connection " + sharedConnection + " for " + this); 423 } 424 if (sharedConnections.putIfAbsent(transactionForShare, sharedConnection) != null) { 425 // race condition but we are single-threaded in this 426 // transaction! 427 throw new AssertionError("Race condition in single transaction!"); 428 } 429 // register a synchronizer to clear the map 430 SharedConnectionSynchronization.getInstance(transactionForShare); 431 } else { 432 if (log.isDebugEnabled()) { 433 log.debug("Reusing shared connection " + sharedConnection + " for " + this); 434 } 435 if (connection != null) { 436 // the local connection passed is not needed anymore 437 log.debug("Dropping previous local connection"); 438 logInvoke("close"); 439 connection.close(); 440 } 441 } 442 return sharedConnection; 443 } 444 445 @Override 446 public String toString() { 447 return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)); 448 } 449 } 450 451 /** 452 * Shared connection, holding a physical connection use by several pieces of code in the same transaction (so not 453 * multi-threaded). It's always in mode autoCommit=false. 454 * <p> 455 * The last user to commit/rollback will do an actual commit/rollback on the physical connection. 456 * <p> 457 * If a rollback is done but not by the last user, the connection will be marked rollback only. 458 */ 459 private static class SharedConnection { 460 461 /** The JDBC connection. */ 462 private Connection connection; 463 464 /** The connection handles associated to this shared connection. */ 465 private final List<ConnectionHandle> handles; 466 467 /** Whether the final commit must actually do a rollback. */ 468 private boolean mustRollback; 469 470 public SharedConnection(Connection connection) { 471 this.connection = connection; 472 handles = new ArrayList<ConnectionHandle>(3); 473 } 474 475 private void logInvoke(String message) { 476 if (log.isDebugEnabled()) { 477 log.debug("Invoke shared " + message + " " + this); 478 } 479 } 480 481 public Connection getConnection() { 482 return connection; 483 } 484 485 /** Called just before first use. */ 486 public void begin(ConnectionHandle handle) throws SQLException { 487 ref(handle); 488 } 489 490 /** Finishes connection use by commit. */ 491 public void commit(ConnectionHandle handle) throws SQLException { 492 try { 493 if (handles.size() == 1) { 494 if (mustRollback) { 495 logInvoke("rollback"); 496 mustRollback = false; 497 } 498 } else { 499 if (log.isDebugEnabled()) { 500 log.debug("commit not yet closing " + this); 501 } 502 } 503 } finally { 504 unref(handle); 505 } 506 } 507 508 /** Finishes connection use by rollback. */ 509 public void rollback(ConnectionHandle handle) throws SQLException { 510 try { 511 if (handles.size() == 1) { 512 logInvoke("rollback"); 513 mustRollback = false; 514 } else { 515 if (log.isDebugEnabled()) { 516 log.debug("setting rollback only " + this); 517 } 518 mustRollback = true; 519 } 520 } finally { 521 unref(handle); 522 } 523 } 524 525 private void ref(ConnectionHandle handle) throws SQLException { 526 if (handles.isEmpty()) { 527 if (connection == null) { 528 allocate(); 529 } 530 } 531 handles.add(handle); 532 if (log.isDebugEnabled()) { 533 log.debug("Reference added for " + this); 534 } 535 } 536 537 private void unref(ConnectionHandle handle) throws SQLException { 538 handles.remove(handle); 539 if (log.isDebugEnabled()) { 540 log.debug("Reference removed for " + this); 541 } 542 } 543 544 // Note that this is not called when a local connection was upgraded to 545 // a shared one, and is reused. 546 private void allocate() throws SQLException { 547 if (log.isDebugEnabled()) { 548 log.debug("Constructing physical connection " + this); 549 if (log.isTraceEnabled()) { 550 log.trace("Constructing physical connection stacktrace", new Exception("debug")); 551 } 552 } 553 connection = getPhysicalConnection(); 554 logInvoke("setAutoCommit false"); 555 connection.setAutoCommit(false); 556 } 557 558 /** Called after transaction completion to free resources. */ 559 public void closeAfterTransaction(boolean mustRollback) { 560 if (!handles.isEmpty()) { 561 log.error("Transaction ended with " + handles.size() + " connections not committed " + this + " " 562 + handles); 563 } 564 if (connection == null) { 565 return; 566 } 567 try { 568 if (mustRollback) { 569 connection.rollback(); 570 } else { 571 connection.commit(); 572 } 573 } catch (SQLException cause) { 574 log.error("Could not close endup connection at transaction end", cause); 575 } finally { 576 close(); 577 } 578 } 579 580 /** Closes and dereferences from all handles to this. */ 581 private void close() { 582 try { 583 logInvoke("close"); 584 connection.close(); 585 } catch (SQLException e) { 586 log.error("Could not close leftover connection at transaction end", e); 587 } finally { 588 connection = null; 589 for (ConnectionHandle h : handles) { 590 h.closeFromSharedConnection(); 591 } 592 handles.clear(); 593 } 594 } 595 596 @Override 597 public String toString() { 598 return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)); 599 } 600 } 601 602 /** 603 * In addition to closing the shared connection, also acts as a delegate for other synchronizers that must run 604 * before it. 605 */ 606 private static class SharedConnectionSynchronization implements Synchronization { 607 608 private final Transaction transaction; 609 610 private final List<Synchronization> syncsFirst; 611 612 private final List<Synchronization> syncsLast; 613 614 /** 615 * Gets the instance or creates it. If creating, registers with the actual transaction. 616 */ 617 // not synchronized as the transaction is already thread-local 618 // and we use a ConcurrentHashMap 619 public static SharedConnectionSynchronization getInstance(Transaction transaction) { 620 SharedConnectionSynchronization scs = sharedSynchronizations.get(transaction); 621 if (scs == null) { 622 scs = new SharedConnectionSynchronization(transaction); 623 try { 624 transaction.registerSynchronization(scs); 625 } catch (IllegalStateException | RollbackException | SystemException e) { 626 throw new RuntimeException("Cannot register synchronization", e); 627 } 628 sharedSynchronizations.put(transaction, scs); 629 } 630 return scs; 631 } 632 633 public SharedConnectionSynchronization(Transaction transaction) { 634 this.transaction = transaction; 635 syncsFirst = new ArrayList<Synchronization>(5); 636 syncsLast = new ArrayList<Synchronization>(5); 637 } 638 639 /** 640 * Registers a synchronization that must run before or after us. 641 */ 642 public void registerSynchronization(Synchronization sync, boolean first) { 643 if (first) { 644 syncsFirst.add(sync); 645 } else { 646 syncsLast.add(sync); 647 } 648 } 649 650 @Override 651 public void beforeCompletion() { 652 beforeCompletion(syncsFirst); 653 beforeCompletion(syncsLast); 654 } 655 656 private void beforeCompletion(List<Synchronization> syncs) { 657 // beforeCompletion hooks may add other syncs, 658 // so we must be careful when iterating on the list 659 RuntimeException exc = null; 660 for (int i = 0; i < syncs.size(); i++) { 661 try { 662 syncs.get(i).beforeCompletion(); 663 } catch (RuntimeException e) { 664 log.error("Exception during beforeCompletion hook", e); 665 if (exc == null) { 666 exc = e; 667 try { 668 transaction.setRollbackOnly(); 669 } catch (SystemException se) { 670 log.error("Cannot set rollback only", e); 671 } 672 } 673 } 674 } 675 if (exc != null) { 676 throw exc; 677 } 678 } 679 680 /** 681 * After completion, removes the shared connection from the map and closes it. 682 */ 683 @Override 684 public void afterCompletion(int status) { 685 sharedSynchronizations.remove(transaction); 686 afterCompletion(syncsFirst, status); 687 closeSharedAfterCompletion(status == Status.STATUS_ROLLEDBACK); 688 afterCompletion(syncsLast, status); 689 } 690 691 private void closeSharedAfterCompletion(boolean rollback) { 692 SharedConnection sharedConnection = sharedConnections.remove(transaction); 693 if (sharedConnection != null) { 694 sharedConnection.closeAfterTransaction(rollback); 695 } 696 } 697 698 private void afterCompletion(List<Synchronization> syncs, int status) { 699 for (Synchronization sync : syncs) { 700 try { 701 sync.afterCompletion(status); 702 } catch (RuntimeException e) { 703 log.warn("Unexpected exception from afterCompletion; continuing", e); 704 } 705 } 706 } 707 } 708 709 private static Transaction getTransaction() { 710 try { 711 return TransactionHelper.lookupTransactionManager().getTransaction(); 712 } catch (NamingException | SystemException e) { 713 return null; 714 } 715 } 716 717 private static int transactionStatus(Transaction transaction) { 718 try { 719 return transaction.getStatus(); 720 } catch (SystemException e) { 721 log.error("Cannot get transaction status", e); 722 return Status.STATUS_UNKNOWN; 723 } 724 } 725 726 /** 727 * Tries to unwrap the connection to get the real physical one (returned by the original datasource). 728 * <p> 729 * This should only be used by code that needs to cast the connection to a driver-specific class to use 730 * driver-specific features. 731 * 732 * @throws SQLException if no actual physical connection was allocated yet 733 */ 734 public static Connection unwrap(Connection connection) throws SQLException { 735 if (Proxy.isProxyClass(connection.getClass())) { 736 InvocationHandler handler = Proxy.getInvocationHandler(connection); 737 if (handler instanceof ConnectionHandle) { 738 ConnectionHandle h = (ConnectionHandle) handler; 739 connection = h.getUnwrappedConnection(); 740 } 741 } 742 if (connection instanceof org.tranql.connector.jdbc.ConnectionHandle) { 743 return ((org.tranql.connector.jdbc.ConnectionHandle) connection).getAssociation().getPhysicalConnection(); 744 } 745 // now try Apache DBCP unwrap (standard or Tomcat), to skip datasource 746 // wrapping layers 747 // this needs accessToUnderlyingConnectionAllowed=true in the pool 748 // config 749 try { 750 Method m = connection.getClass().getMethod("getInnermostDelegate"); 751 m.setAccessible(true); // needed, method of inner private class 752 Connection delegate = (Connection) m.invoke(connection); 753 if (delegate == null) { 754 log.error("Cannot access underlying connection, you must use " 755 + "accessToUnderlyingConnectionAllowed=true in the pool configuration"); 756 } else { 757 connection = delegate; 758 } 759 } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException e) { 760 // ignore missing method, connection not coming from Apache pool 761 } 762 return connection; 763 } 764 765 /** 766 * Checks if single transaction-local datasource mode will be used for the given datasource name. 767 * 768 * @return {@code true} if using a single transaction-local connection for this datasource 769 */ 770 public static boolean useSingleConnection(String dataSourceName) { 771 if (dataSourceName != null) { 772 String excludes = getExcludeDsProperty(); 773 if ("*".equals(excludes)) { 774 return false; 775 } 776 if (!StringUtils.isBlank(excludes)) { 777 for (String exclude : excludes.split("[, ] *")) { 778 if (dataSourceName.equals(exclude) 779 || dataSourceName.equals(DataSourceHelper.getDataSourceJNDIName(exclude))) { 780 return false; 781 } 782 } 783 } 784 } 785 return !StringUtils.isBlank(getSingleDsProperty()); 786 } 787 788 /** 789 * Gets the fake name we use to pass to ConnectionHelper.getConnection, in order for exclusions on these connections 790 * to be possible. 791 */ 792 public static String getPseudoDataSourceNameForRepository(String repositoryName) { 793 return "repository_" + repositoryName; 794 } 795 796 /** 797 * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection 798 * <strong>MUST</strong> be closed in a finally block when code is done using it. 799 * <p> 800 * If the passed dataSource name is in the exclusion list, null will be returned. 801 * 802 * @param dataSourceName the datasource for which the connection is requested 803 * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in 804 * effect 805 * @throws ResourceException 806 */ 807 public static Connection getConnection(String dataSourceName) throws SQLException { 808 return getConnection(dataSourceName, false); 809 } 810 811 /** 812 * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection 813 * <strong>MUST</strong> be closed in a finally block when code is done using it. 814 * <p> 815 * If the passed dataSource name is in the exclusion list, null will be returned. 816 * <p> 817 * If noSharing is requested, the connection will never come from the transaction-local and will always be newly 818 * allocated. 819 * 820 * @param dataSourceName the datasource for which the connection is requested 821 * @param noSharing {@code true} if this connection must not be shared with others 822 * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in 823 * effect 824 * @throws ResourceException 825 */ 826 public static Connection getConnection(String dataSourceName, boolean noSharing) throws SQLException { 827 if (!useSingleConnection(dataSourceName)) { 828 DataSource ds = getDataSource(dataSourceName); 829 if (ds instanceof PooledDataSource) { 830 return ((PooledDataSource) ds).getConnection(noSharing); 831 } 832 return getPhysicalConnection(dataSourceName); 833 } 834 return getConnection(noSharing); 835 } 836 837 private static Connection getConnection(boolean noSharing) throws SQLException { 838 String dataSourceName = getSingleDsProperty(); 839 if (StringUtils.isBlank(dataSourceName)) { 840 return null; 841 } 842 if (noSharing) { 843 return getPhysicalConnection(dataSourceName); 844 } 845 return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[]{Connection.class}, 846 new ConnectionHandle()); 847 } 848 849 private static String getSingleDsProperty() { 850 if (singleDsPropertyCached != null) { 851 return singleDsPropertyCached; 852 } 853 return Framework.getProperty(SINGLE_DS); 854 } 855 856 private static String getExcludeDsProperty() { 857 if (excludeDsPropertyCached != null) { 858 return excludeDsPropertyCached; 859 } 860 return Framework.getProperty(EXCLUDE_DS); 861 } 862 863 private static Connection getPhysicalConnection() throws SQLException { 864 return getPhysicalConnection(getSingleDsProperty()); 865 } 866 867 /** 868 * Gets a physical connection from a datasource name. 869 * <p> 870 * A few retries are done to work around databases that have problems with many open/close in a row. 871 * 872 * @param dataSourceName the datasource name 873 * @return the connection 874 */ 875 private static Connection getPhysicalConnection(String dataSourceName) throws SQLException { 876 DataSource dataSource = getDataSource(dataSourceName); 877 return JDBCUtils.getConnection(dataSource); 878 } 879 880 /** 881 * Gets a datasource from a datasource name, or in test mode use test connection parameters. 882 * 883 * @param dataSourceName the datasource name 884 * @return the datasource 885 */ 886 private static DataSource getDataSource(String dataSourceName) throws SQLException { 887 try { 888 return DataSourceHelper.getDataSource(dataSourceName); 889 } catch (NamingException e) { 890 if (Framework.isTestModeSet()) { 891 String url = Framework.getProperty("nuxeo.test.vcs.url"); 892 String user = Framework.getProperty("nuxeo.test.vcs.user"); 893 String password = Framework.getProperty("nuxeo.test.vcs.password"); 894 if (url != null && user != null) { 895 return new DataSourceFromUrl(url, user, password); // driver? 896 } 897 } 898 throw new SQLException("Cannot find datasource: " + dataSourceName, e); 899 } 900 } 901 902 /** 903 * Checks how many references there are to shared connections. 904 * <p> 905 * USED IN UNIT TESTS OR FOR DEBUGGING. 906 */ 907 public static int countConnectionReferences() { 908 return sharedConnections.size(); 909 } 910 911 /** 912 * Clears the remaining connection references for the current thread. 913 * <p> 914 * USED IN UNIT TESTS ONLY. 915 */ 916 public static void clearConnectionReferences() { 917 for (SharedConnection sharedConnection : sharedConnections.values()) { 918 sharedConnection.closeAfterTransaction(true); 919 } 920 sharedConnections.clear(); 921 } 922 923 /** 924 * If sharing is in effect, registers a synchronization with the current transaction, making sure it runs before the 925 * {@link SharedConnectionSynchronization}. 926 * 927 * @return {@code true} 928 */ 929 public static boolean registerSynchronization(Synchronization sync) throws SystemException { 930 return registerSynchronization(sync, true); 931 } 932 933 /** 934 * If sharing is in effect, registers a synchronization with the current transaction, making sure the 935 * {@link Synchronization#afterCompletion} method runs after the {@link SharedConnectionSynchronization}. 936 * 937 * @return {@code true} 938 */ 939 public static boolean registerSynchronizationLast(Synchronization sync) throws SystemException { 940 return registerSynchronization(sync, false); 941 } 942 943 private static boolean registerSynchronization(Synchronization sync, boolean first) throws SystemException { 944 Transaction transaction = getTransaction(); 945 if (transaction == null) { 946 throw new SystemException("Cannot register synchronization: no transaction"); 947 } 948 // We always do the lookup and registration to the actual transaction 949 // even if there is no shared connection yet. 950 SharedConnectionSynchronization scs = SharedConnectionSynchronization.getInstance(transaction); 951 scs.registerSynchronization(sync, first); 952 return true; 953 } 954 955}