001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.runtime.datasource;
013
014import java.lang.reflect.InvocationHandler;
015import java.lang.reflect.InvocationTargetException;
016import java.lang.reflect.Method;
017import java.lang.reflect.Proxy;
018import java.sql.Connection;
019import java.sql.SQLException;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024
025import javax.naming.NamingException;
026import javax.resource.ResourceException;
027import javax.sql.DataSource;
028import javax.transaction.RollbackException;
029import javax.transaction.Status;
030import javax.transaction.Synchronization;
031import javax.transaction.SystemException;
032import javax.transaction.Transaction;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.nuxeo.common.utils.JDBCUtils;
038import org.nuxeo.runtime.api.Framework;
039import org.nuxeo.runtime.datasource.PooledDataSourceRegistry.PooledDataSource;
040import org.nuxeo.runtime.transaction.TransactionHelper;
041
042/**
043 * This helper provides a way to get a JDBC connection, through {@link #getConnection(String)}, that will return a
044 * connection wrapper able to use a shared connection when used in transactional mode and setAutoCommit(false) is
045 * called, and otherwise use a normal physical JDBC connection.
046 * <p>
047 * The physical connections are created from the datasource configured using the framework property {@value #SINGLE_DS}.
048 * <p>
049 * This helper is used to implement consistent resource management in a non-XA context. Several users of the shared
050 * connection can call setAutoCommit(false) then do transactional work and commit(). Only the commit() of the last user
051 * will do an actual commit on the physical connection.
052 *
053 * @since 5.7
054 */
055public class ConnectionHelper {
056
057    private static final Log log = LogFactory.getLog(ConnectionHelper.class);
058
059    /**
060     * Shared connection for each transaction.
061     * <p>
062     * The shared connection is always in autoCommit=false.
063     * <p>
064     * Things are removed from this map by a transaction synchronizer when the transaction finishes.
065     */
066    private static ConcurrentMap<Transaction, SharedConnection> sharedConnections = new ConcurrentHashMap<Transaction, SharedConnection>();
067
068    /**
069     * SharedConnectionSynchronization registered for the transaction, when sharing.
070     */
071    private static ConcurrentMap<Transaction, SharedConnectionSynchronization> sharedSynchronizations = new ConcurrentHashMap<Transaction, SharedConnectionSynchronization>();
072
073    /**
074     * Property holding a datasource name to use to replace all database accesses.
075     */
076    public static final String SINGLE_DS = "nuxeo.db.singleDataSource";
077
078    /**
079     * Property holding one ore more datasource names (comma or space separated) for whose connections the single
080     * datasource is not used.
081     */
082    public static final String EXCLUDE_DS = "nuxeo.db.singleDataSource.exclude";
083
084    /**
085     * Wrapper for a connection that delegates calls to either a private connection, or a per-transaction shared one if
086     * a transaction is started.
087     * <p>
088     * Sharing is started on setAutoCommit(true), and ends on setAutoCommit(false) or close().
089     */
090    private static class ConnectionHandle implements InvocationHandler {
091
092        private boolean closed;
093
094        /**
095         * Expected autoCommit mode by the client for this connection.
096         */
097        private boolean autoCommit;
098
099        /**
100         * The transaction in use at the time where sharing was started (autoCommit was set to false during a
101         * transaction).
102         * <p>
103         * The sharedConnection is allocated on first use after that.
104         */
105        private Transaction transactionForShare;
106
107        /**
108         * A local connection, allocated when the connection is used when sharedInTransaction == null.
109         */
110        private Connection localConnection;
111
112        /**
113         * A shared connection, allocated when the connection is used when sharedInTransaction != null.
114         */
115        private SharedConnection sharedConnection;
116
117        /**
118         * True between the first use and the commit/rollback (in non-autoCommit mode and shared connection).
119         */
120        private boolean began;
121
122        public ConnectionHandle() {
123            autoCommit = true;
124            if (log.isDebugEnabled()) {
125                log.debug("Construct " + this);
126                if (log.isTraceEnabled()) {
127                    log.trace("Construct stacktrace " + this, new Exception("debug"));
128                }
129            }
130        }
131
132        private void logInvoke(String message) {
133            if (log.isDebugEnabled()) {
134                log.debug("Invoke " + message + " " + this);
135            }
136        }
137
138        @Override
139        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
140            String methodName = method.getName();
141            if (methodName.equals("isClosed")) {
142                return isClosed();
143            } else if (methodName.equals("close")) {
144                close();
145                return null;
146            }
147            if (closed) {
148                throw new SQLException("Connection is closed", "08003");
149            }
150
151            if (methodName.equals("getAutoCommit")) {
152                return getAutoCommit();
153            }
154
155            if (methodName.equals("setAutoCommit")) {
156                setAutoCommit(((Boolean) args[0]).booleanValue());
157                return null;
158            }
159
160            Connection connection;
161            if (transactionForShare != null) {
162                // check that we're still in the same transaction
163                // this also enforces single-threaded use of
164                // the shared connection
165                Transaction transaction = getTransaction();
166                if (transaction != transactionForShare) {
167                    throw new SQLException("Calling method " + methodName
168                            + ", connection sharing started in transaction " + transactionForShare
169                            + " but it is now used in transaction " + transaction);
170                }
171
172                sharedConnectionAllocate();
173
174                // for begin/commit we don't actually need to allocate
175                // the connection
176                if (methodName.equals("commit")) {
177                    if (autoCommit) {
178                        throw new SQLException("Cannot commit outside of transaction", "25000");
179                    }
180                    sharedConnectionCommit();
181                    return null;
182                } else if (methodName.equals("rollback")) {
183                    if (autoCommit) {
184                        throw new SQLException("Cannot commit outside of transaction", "25000");
185                    }
186                    if (args != null && args.length > 0) {
187                        throw new SQLException("Not implemented: rollback(Savepoint)", "0A000");
188                    }
189                    sharedConnectionRollback();
190                    return null;
191                } else if (methodName.equals("setSavepoint") || methodName.equals("releaseSavepoint")) {
192                    throw new SQLException("Not implemented: " + methodName, "0A000");
193                }
194
195                sharedConnectionBegin(methodName);
196
197                connection = sharedConnection.getConnection();
198            } else {
199                localConnectionAllocate();
200                connection = localConnection;
201            }
202
203            try {
204                if (log.isDebugEnabled()) {
205                    if (sharedConnection == null) {
206                        logInvoke(methodName);
207                    } else {
208                        logInvoke(methodName + " " + sharedConnection);
209                    }
210                }
211                return method.invoke(connection, args);
212            } catch (InvocationTargetException e) {
213                throw e.getCause();
214            }
215        }
216
217        private Boolean getAutoCommit() {
218            return Boolean.valueOf(autoCommit);
219        }
220
221        private void setAutoCommit(boolean setAutoCommit) throws SQLException {
222            if (setAutoCommit == autoCommit) {
223                return; // no change
224            }
225            autoCommit = setAutoCommit;
226            if (log.isDebugEnabled()) {
227                log.debug("setAutoCommit(" + autoCommit + ") " + this);
228            }
229            if (!autoCommit) {
230                // setting autoCommit = false
231                if (transactionForShare != null) {
232                    throw new AssertionError("autoCommit=false when already sharing");
233                }
234                // not yet sharing
235                Transaction transaction = getTransaction();
236                if (transaction != null && transactionStatus(transaction) == Status.STATUS_ACTIVE) {
237                    // start sharing
238                    transactionForShare = transaction;
239                    if (localConnection != null) {
240                        // share using the previous local connection
241                        logInvoke("setAutoCommit false");
242                        localConnection.setAutoCommit(false);
243                        log.debug("Upgrading local connection to shared");
244                        sharedConnection = getSharedConnection(localConnection);
245                        localConnection = null;
246                    } else {
247                        // sharedConnection allocated on first use
248                    }
249                } else {
250                    log.debug("No usable transaction");
251                    // we're outside a usable transaction
252                    // use the local connection
253                    if (localConnection != null) {
254                        logInvoke("setAutoCommit false");
255                        localConnection.setAutoCommit(false);
256                    } else {
257                        // localConnection allocated on first use
258                    }
259                }
260            } else {
261                // setting autoCommit = true
262                if (transactionForShare != null) {
263                    if (began) {
264                        // do automatic commit
265                        log.debug("setAutoCommit true committing shared");
266                        sharedConnectionCommit();
267                    }
268                    // stop sharing
269                    sharedConnection = null;
270                    transactionForShare = null;
271                } else if (localConnection != null) {
272                    logInvoke("setAutoCommit true");
273                    localConnection.setAutoCommit(true);
274                }
275            }
276        }
277
278        // allocation on first use
279        private void localConnectionAllocate() throws SQLException {
280            if (localConnection == null) {
281                if (log.isDebugEnabled()) {
282                    log.debug("Constructing physical connection " + this);
283                    if (log.isTraceEnabled()) {
284                        log.trace("Constructing physical connection stacktrace", new Exception("debug"));
285                    }
286                }
287                localConnection = getPhysicalConnection();
288                logInvoke("setAutoCommit " + autoCommit);
289                localConnection.setAutoCommit(autoCommit);
290            }
291        }
292
293        // allocation on first use
294        private void sharedConnectionAllocate() throws SQLException {
295            if (sharedConnection == null) {
296                if (transactionStatus(transactionForShare) == Status.STATUS_ACTIVE) {
297                    sharedConnection = getSharedConnection(null);
298                    // autoCommit mode set by SharedConnection.allocate()
299                } else {
300                    // already committing or rolling back
301                    // do not assign a connection at all
302                    // only commit or rollback is allowed,
303                    // and they will do nothing (began=false)
304                }
305            }
306        }
307
308        private void sharedConnectionBegin(String methodName) throws SQLException {
309            if (sharedConnection == null) {
310                throw new SQLException("Cannot call " + methodName + " with transaction in state "
311                        + transactionStatus(transactionForShare), "25000");
312            }
313            if (!autoCommit && !began) {
314                sharedConnection.begin(this);
315                began = true;
316            }
317        }
318
319        private void sharedConnectionCommit() throws SQLException {
320            if (began) {
321                if (log.isDebugEnabled()) {
322                    log.debug("Committing shared " + this);
323                }
324                sharedConnection.commit(this);
325                began = false;
326            }
327        }
328
329        private void sharedConnectionRollback() throws SQLException {
330            if (began) {
331                sharedConnection.rollback(this);
332                began = false;
333            }
334        }
335
336        /** Called back from SharedConnection close. */
337        protected void closeFromSharedConnection() {
338            sharedConnection = null;
339            transactionForShare = null;
340        }
341
342        private Boolean isClosed() {
343            return Boolean.valueOf(closed);
344        }
345
346        private void close() throws SQLException {
347            if (!closed) {
348                if (log.isDebugEnabled()) {
349                    log.debug("close() " + this);
350                }
351                if (transactionForShare != null) {
352                    if (sharedConnection != null) {
353                        if (began) {
354                            // connection closed before commit/rollback
355                            // commit it by hand (even though it's unspecified)
356                            log.debug("close committing shared");
357                            sharedConnectionCommit();
358                        }
359                        sharedConnection = null;
360                    }
361                    transactionForShare = null;
362                } else {
363                    if (localConnection != null) {
364                        logInvoke("close");
365                        localConnection.close();
366                        localConnection = null;
367                    }
368                }
369                closed = true;
370            }
371        }
372
373        /** Gets the physical connection, use by unwrap. */
374        public Connection getUnwrappedConnection() throws SQLException {
375            Connection connection;
376            if (sharedConnection != null) {
377                connection = sharedConnection.getConnection();
378            } else {
379                connection = localConnection;
380            }
381            if (connection == null) {
382                throw new SQLException("Connection not allocated");
383            }
384            return connection;
385        }
386
387        /**
388         * Gets the shared connection for the shared transaction, or allocates a new one. If allocating a new one,
389         * registers a synchronizer in order to remove it at transaction completion time.
390         *
391         * @param connection an existing local connection to reuse, or null
392         */
393        private SharedConnection getSharedConnection(Connection connection) throws SQLException {
394            SharedConnection sharedConnection = sharedConnections.get(transactionForShare);
395            if (sharedConnection == null) {
396                // allocate a new shared connection
397                sharedConnection = new SharedConnection(connection);
398                if (log.isDebugEnabled()) {
399                    log.debug("Allocating new shared connection " + sharedConnection + " for " + this);
400                }
401                if (sharedConnections.putIfAbsent(transactionForShare, sharedConnection) != null) {
402                    // race condition but we are single-threaded in this
403                    // transaction!
404                    throw new AssertionError("Race condition in single transaction!");
405                }
406                // register a synchronizer to clear the map
407                SharedConnectionSynchronization.getInstance(transactionForShare);
408            } else {
409                if (log.isDebugEnabled()) {
410                    log.debug("Reusing shared connection " + sharedConnection + " for " + this);
411                }
412                if (connection != null) {
413                    // the local connection passed is not needed anymore
414                    log.debug("Dropping previous local connection");
415                    logInvoke("close");
416                    connection.close();
417                }
418            }
419            return sharedConnection;
420        }
421
422        @Override
423        public String toString() {
424            return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this));
425        }
426    }
427
428    /**
429     * Shared connection, holding a physical connection use by several pieces of code in the same transaction (so not
430     * multi-threaded). It's always in mode autoCommit=false.
431     * <p>
432     * The last user to commit/rollback will do an actual commit/rollback on the physical connection.
433     * <p>
434     * If a rollback is done but not by the last user, the connection will be marked rollback only.
435     */
436    private static class SharedConnection {
437
438        /** The JDBC connection. */
439        private Connection connection;
440
441        /** The connection handles associated to this shared connection. */
442        private final List<ConnectionHandle> handles;
443
444        /** Whether the final commit must actually do a rollback. */
445        private boolean mustRollback;
446
447        public SharedConnection(Connection connection) {
448            this.connection = connection;
449            handles = new ArrayList<ConnectionHandle>(3);
450        }
451
452        private void logInvoke(String message) {
453            if (log.isDebugEnabled()) {
454                log.debug("Invoke shared " + message + " " + this);
455            }
456        }
457
458        public Connection getConnection() {
459            return connection;
460        }
461
462        /** Called just before first use. */
463        public void begin(ConnectionHandle handle) throws SQLException {
464            ref(handle);
465        }
466
467        /** Finishes connection use by commit. */
468        public void commit(ConnectionHandle handle) throws SQLException {
469            try {
470                if (handles.size() == 1) {
471                    if (mustRollback) {
472                        logInvoke("rollback");
473                        mustRollback = false;
474                    }
475                } else {
476                    if (log.isDebugEnabled()) {
477                        log.debug("commit not yet closing " + this);
478                    }
479                }
480            } finally {
481                unref(handle);
482            }
483        }
484
485        /** Finishes connection use by rollback. */
486        public void rollback(ConnectionHandle handle) throws SQLException {
487            try {
488                if (handles.size() == 1) {
489                    logInvoke("rollback");
490                    mustRollback = false;
491                } else {
492                    if (log.isDebugEnabled()) {
493                        log.debug("setting rollback only " + this);
494                    }
495                    mustRollback = true;
496                }
497            } finally {
498                unref(handle);
499            }
500        }
501
502        private void ref(ConnectionHandle handle) throws SQLException {
503            if (handles.isEmpty()) {
504                if (connection == null) {
505                    allocate();
506                }
507            }
508            handles.add(handle);
509            if (log.isDebugEnabled()) {
510                log.debug("Reference added for " + this);
511            }
512        }
513
514        private void unref(ConnectionHandle handle) throws SQLException {
515            handles.remove(handle);
516            if (log.isDebugEnabled()) {
517                log.debug("Reference removed for " + this);
518            }
519        }
520
521        // Note that this is not called when a local connection was upgraded to
522        // a shared one, and is reused.
523        private void allocate() throws SQLException {
524            if (log.isDebugEnabled()) {
525                log.debug("Constructing physical connection " + this);
526                if (log.isTraceEnabled()) {
527                    log.trace("Constructing physical connection stacktrace", new Exception("debug"));
528                }
529            }
530            connection = getPhysicalConnection();
531            logInvoke("setAutoCommit false");
532            connection.setAutoCommit(false);
533        }
534
535        /** Called after transaction completion to free resources. */
536        public void closeAfterTransaction(boolean mustRollback) {
537            if (!handles.isEmpty()) {
538                log.error("Transaction ended with " + handles.size() + " connections not committed " + this + " "
539                        + handles);
540            }
541            if (connection == null) {
542                return;
543            }
544            try {
545                if (mustRollback) {
546                    connection.rollback();
547                } else {
548                    connection.commit();
549                }
550            } catch (SQLException cause) {
551                log.error("Could not close endup connection at transaction end", cause);
552            } finally {
553                close();
554            }
555        }
556
557        /** Closes and dereferences from all handles to this. */
558        private void close() {
559            try {
560                logInvoke("close");
561                connection.close();
562            } catch (SQLException e) {
563                log.error("Could not close leftover connection at transaction end", e);
564            } finally {
565                connection = null;
566                for (ConnectionHandle h : handles) {
567                    h.closeFromSharedConnection();
568                }
569                handles.clear();
570            }
571        }
572
573        @Override
574        public String toString() {
575            return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this));
576        }
577    }
578
579    /**
580     * In addition to closing the shared connection, also acts as a delegate for other synchronizers that must run
581     * before it.
582     */
583    private static class SharedConnectionSynchronization implements Synchronization {
584
585        private final Transaction transaction;
586
587        private final List<Synchronization> syncsFirst;
588
589        private final List<Synchronization> syncsLast;
590
591        /**
592         * Gets the instance or creates it. If creating, registers with the actual transaction.
593         */
594        // not synchronized as the transaction is already thread-local
595        // and we use a ConcurrentHashMap
596        public static SharedConnectionSynchronization getInstance(Transaction transaction) {
597            SharedConnectionSynchronization scs = sharedSynchronizations.get(transaction);
598            if (scs == null) {
599                scs = new SharedConnectionSynchronization(transaction);
600                try {
601                    transaction.registerSynchronization(scs);
602                } catch (IllegalStateException | RollbackException | SystemException e) {
603                    throw new RuntimeException("Cannot register synchronization", e);
604                }
605                sharedSynchronizations.put(transaction, scs);
606            }
607            return scs;
608        }
609
610        public SharedConnectionSynchronization(Transaction transaction) {
611            this.transaction = transaction;
612            syncsFirst = new ArrayList<Synchronization>(5);
613            syncsLast = new ArrayList<Synchronization>(5);
614        }
615
616        /**
617         * Registers a synchronization that must run before or after us.
618         */
619        public void registerSynchronization(Synchronization sync, boolean first) {
620            if (first) {
621                syncsFirst.add(sync);
622            } else {
623                syncsLast.add(sync);
624            }
625        }
626
627        @Override
628        public void beforeCompletion() {
629            beforeCompletion(syncsFirst);
630            beforeCompletion(syncsLast);
631        }
632
633        private void beforeCompletion(List<Synchronization> syncs) {
634            // beforeCompletion hooks may add other syncs,
635            // so we must be careful when iterating on the list
636            RuntimeException exc = null;
637            for (int i = 0; i < syncs.size(); i++) {
638                try {
639                    syncs.get(i).beforeCompletion();
640                } catch (RuntimeException e) {
641                    log.error("Exception during beforeCompletion hook", e);
642                    if (exc == null) {
643                        exc = e;
644                        try {
645                            transaction.setRollbackOnly();
646                        } catch (SystemException se) {
647                            log.error("Cannot set rollback only", e);
648                        }
649                    }
650                }
651            }
652            if (exc != null) {
653                throw exc;
654            }
655        }
656
657        /**
658         * After completion, removes the shared connection from the map and closes it.
659         */
660        @Override
661        public void afterCompletion(int status) {
662            sharedSynchronizations.remove(transaction);
663            afterCompletion(syncsFirst, status);
664            closeSharedAfterCompletion(status == Status.STATUS_ROLLEDBACK);
665            afterCompletion(syncsLast, status);
666        }
667
668        private void closeSharedAfterCompletion(boolean rollback) {
669            SharedConnection sharedConnection = sharedConnections.remove(transaction);
670            if (sharedConnection != null) {
671                sharedConnection.closeAfterTransaction(rollback);
672            }
673        }
674
675        private void afterCompletion(List<Synchronization> syncs, int status) {
676            for (Synchronization sync : syncs) {
677                try {
678                    sync.afterCompletion(status);
679                } catch (RuntimeException e) {
680                    log.warn("Unexpected exception from afterCompletion; continuing", e);
681                }
682            }
683        }
684    }
685
686    private static Transaction getTransaction() {
687        try {
688            return TransactionHelper.lookupTransactionManager().getTransaction();
689        } catch (NamingException | SystemException e) {
690            return null;
691        }
692    }
693
694    private static int transactionStatus(Transaction transaction) {
695        try {
696            return transaction.getStatus();
697        } catch (SystemException e) {
698            log.error("Cannot get transaction status", e);
699            return Status.STATUS_UNKNOWN;
700        }
701    }
702
703    /**
704     * Tries to unwrap the connection to get the real physical one (returned by the original datasource).
705     * <p>
706     * This should only be used by code that needs to cast the connection to a driver-specific class to use
707     * driver-specific features.
708     *
709     * @throws SQLException if no actual physical connection was allocated yet
710     */
711    public static Connection unwrap(Connection connection) throws SQLException {
712        if (Proxy.isProxyClass(connection.getClass())) {
713            InvocationHandler handler = Proxy.getInvocationHandler(connection);
714            if (handler instanceof ConnectionHandle) {
715                ConnectionHandle h = (ConnectionHandle) handler;
716                connection = h.getUnwrappedConnection();
717            }
718        }
719        if (connection instanceof org.tranql.connector.jdbc.ConnectionHandle) {
720            return ((org.tranql.connector.jdbc.ConnectionHandle) connection).getAssociation().getPhysicalConnection();
721        }
722        // now try Apache DBCP unwrap (standard or Tomcat), to skip datasource
723        // wrapping layers
724        // this needs accessToUnderlyingConnectionAllowed=true in the pool
725        // config
726        try {
727            Method m = connection.getClass().getMethod("getInnermostDelegate");
728            m.setAccessible(true); // needed, method of inner private class
729            Connection delegate = (Connection) m.invoke(connection);
730            if (delegate == null) {
731                log.error("Cannot access underlying connection, you must use "
732                        + "accessToUnderlyingConnectionAllowed=true in the pool configuration");
733            } else {
734                connection = delegate;
735            }
736        } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException e) {
737            // ignore missing method, connection not coming from Apache pool
738        }
739        return connection;
740    }
741
742    /**
743     * Checks if single transaction-local datasource mode will be used for the given datasource name.
744     *
745     * @return {@code true} if using a single transaction-local connection for this datasource
746     */
747    public static boolean useSingleConnection(String dataSourceName) {
748        if (dataSourceName != null) {
749            String excludes = Framework.getProperty(EXCLUDE_DS);
750            if ("*".equals(excludes)) {
751                return false;
752            }
753            if (!StringUtils.isBlank(excludes)) {
754                for (String exclude : excludes.split("[, ] *")) {
755                    if (dataSourceName.equals(exclude)
756                            || dataSourceName.equals(DataSourceHelper.getDataSourceJNDIName(exclude))) {
757                        return false;
758                    }
759                }
760            }
761        }
762        return !StringUtils.isBlank(Framework.getProperty(SINGLE_DS));
763    }
764
765    /**
766     * Gets the fake name we use to pass to ConnectionHelper.getConnection, in order for exclusions on these connections
767     * to be possible.
768     */
769    public static String getPseudoDataSourceNameForRepository(String repositoryName) {
770        return "repository_" + repositoryName;
771    }
772
773    /**
774     * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection
775     * <strong>MUST</strong> be closed in a finally block when code is done using it.
776     * <p>
777     * If the passed dataSource name is in the exclusion list, null will be returned.
778     *
779     * @param dataSourceName the datasource for which the connection is requested
780     * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in
781     *         effect
782     * @throws ResourceException
783     */
784    public static Connection getConnection(String dataSourceName) throws SQLException {
785        return getConnection(dataSourceName, false);
786    }
787
788    /**
789     * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection
790     * <strong>MUST</strong> be closed in a finally block when code is done using it.
791     * <p>
792     * If the passed dataSource name is in the exclusion list, null will be returned.
793     * <p>
794     * If noSharing is requested, the connection will never come from the transaction-local and will always be newly
795     * allocated.
796     *
797     * @param dataSourceName the datasource for which the connection is requested
798     * @param noSharing {@code true} if this connection must not be shared with others
799     * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in
800     *         effect
801     * @throws ResourceException
802     */
803    public static Connection getConnection(String dataSourceName, boolean noSharing) throws SQLException {
804        if (!useSingleConnection(dataSourceName)) {
805            DataSource ds = getDataSource(dataSourceName);
806            if (ds instanceof PooledDataSource) {
807                return ((PooledDataSource) ds).getConnection(noSharing);
808            }
809            return getPhysicalConnection(dataSourceName);
810        }
811        return getConnection(noSharing);
812    }
813
814    private static Connection getConnection(boolean noSharing) throws SQLException {
815        String dataSourceName = Framework.getProperty(SINGLE_DS);
816        if (StringUtils.isBlank(dataSourceName)) {
817            return null;
818        }
819        if (noSharing) {
820            return getPhysicalConnection(dataSourceName);
821        }
822        return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class },
823                new ConnectionHandle());
824    }
825
826    private static Connection getPhysicalConnection() throws SQLException {
827        return getPhysicalConnection(Framework.getProperty(SINGLE_DS));
828    }
829
830    /**
831     * Gets a physical connection from a datasource name.
832     * <p>
833     * A few retries are done to work around databases that have problems with many open/close in a row.
834     *
835     * @param dataSourceName the datasource name
836     * @return the connection
837     */
838    private static Connection getPhysicalConnection(String dataSourceName) throws SQLException {
839        DataSource dataSource = getDataSource(dataSourceName);
840        return JDBCUtils.getConnection(dataSource);
841    }
842
843    /**
844     * Gets a datasource from a datasource name, or in test mode use test connection parameters.
845     *
846     * @param dataSourceName the datasource name
847     * @return the datasource
848     */
849    private static DataSource getDataSource(String dataSourceName) throws SQLException {
850        try {
851            return DataSourceHelper.getDataSource(dataSourceName);
852        } catch (NamingException e) {
853            if (Framework.isTestModeSet()) {
854                String url = Framework.getProperty("nuxeo.test.vcs.url");
855                String user = Framework.getProperty("nuxeo.test.vcs.user");
856                String password = Framework.getProperty("nuxeo.test.vcs.password");
857                if (url != null && user != null) {
858                    return new DataSourceFromUrl(url, user, password); // driver?
859                }
860            }
861            throw new SQLException("Cannot find datasource: " + dataSourceName, e);
862        }
863    }
864
865    /**
866     * Checks how many references there are to shared connections.
867     * <p>
868     * USED IN UNIT TESTS OR FOR DEBUGGING.
869     */
870    public static int countConnectionReferences() {
871        return sharedConnections.size();
872    }
873
874    /**
875     * Clears the remaining connection references for the current thread.
876     * <p>
877     * USED IN UNIT TESTS ONLY.
878     */
879    public static void clearConnectionReferences() {
880        for (SharedConnection sharedConnection : sharedConnections.values()) {
881            sharedConnection.closeAfterTransaction(true);
882        }
883        sharedConnections.clear();
884    }
885
886    /**
887     * If sharing is in effect, registers a synchronization with the current transaction, making sure it runs before the
888     * {@link SharedConnectionSynchronization}.
889     *
890     * @return {@code true}
891     */
892    public static boolean registerSynchronization(Synchronization sync) throws SystemException {
893        return registerSynchronization(sync, true);
894    }
895
896    /**
897     * If sharing is in effect, registers a synchronization with the current transaction, making sure the
898     * {@link Synchronization#afterCompletion} method runs after the {@link SharedConnectionSynchronization}.
899     *
900     * @return {@code true}
901     */
902    public static boolean registerSynchronizationLast(Synchronization sync) throws SystemException {
903        return registerSynchronization(sync, false);
904    }
905
906    private static boolean registerSynchronization(Synchronization sync, boolean first) throws SystemException {
907        Transaction transaction = getTransaction();
908        if (transaction == null) {
909            throw new SystemException("Cannot register synchronization: no transaction");
910        }
911        // We always do the lookup and registration to the actual transaction
912        // even if there is no shared connection yet.
913        SharedConnectionSynchronization scs = SharedConnectionSynchronization.getInstance(transaction);
914        scs.registerSynchronization(sync, first);
915        return true;
916    }
917
918}