001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 */
012package org.nuxeo.runtime.datasource;
013
014import org.apache.commons.lang.StringUtils;
015import org.apache.commons.logging.Log;
016import org.apache.commons.logging.LogFactory;
017import org.nuxeo.common.utils.JDBCUtils;
018import org.nuxeo.runtime.api.Framework;
019import org.nuxeo.runtime.datasource.PooledDataSourceRegistry.PooledDataSource;
020import org.nuxeo.runtime.transaction.TransactionHelper;
021
022import java.lang.reflect.InvocationHandler;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import java.lang.reflect.Proxy;
026import java.sql.Connection;
027import java.sql.SQLException;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032
033import javax.naming.NamingException;
034import javax.resource.ResourceException;
035import javax.sql.DataSource;
036import javax.transaction.RollbackException;
037import javax.transaction.Status;
038import javax.transaction.Synchronization;
039import javax.transaction.SystemException;
040import javax.transaction.Transaction;
041
042/**
043 * This helper provides a way to get a JDBC connection, through {@link #getConnection(String)}, that will return a
044 * connection wrapper able to use a shared connection when used in transactional mode and setAutoCommit(false) is
045 * called, and otherwise use a normal physical JDBC connection.
046 * <p>
047 * The physical connections are created from the datasource configured using the framework property {@value #SINGLE_DS}.
048 * <p>
049 * This helper is used to implement consistent resource management in a non-XA context. Several users of the shared
050 * connection can call setAutoCommit(false) then do transactional work and commit(). Only the commit() of the last user
051 * will do an actual commit on the physical connection.
052 *
053 * @since 5.7
054 */
055public class ConnectionHelper {
056
057    private static final Log log = LogFactory.getLog(ConnectionHelper.class);
058
059    /**
060     * Shared connection for each transaction.
061     * <p>
062     * The shared connection is always in autoCommit=false.
063     * <p>
064     * Things are removed from this map by a transaction synchronizer when the transaction finishes.
065     */
066    private static ConcurrentMap<Transaction, SharedConnection> sharedConnections = new ConcurrentHashMap<Transaction, SharedConnection>();
067
068    /**
069     * SharedConnectionSynchronization registered for the transaction, when sharing.
070     */
071    private static ConcurrentMap<Transaction, SharedConnectionSynchronization> sharedSynchronizations = new ConcurrentHashMap<Transaction, SharedConnectionSynchronization>();
072
073    /**
074     * Property holding a datasource name to use to replace all database accesses.
075     */
076    public static final String SINGLE_DS = "nuxeo.db.singleDataSource";
077
078    /**
079     * Property holding one ore more datasource names (comma or space separated) for whose connections the single
080     * datasource is not used.
081     */
082    public static final String EXCLUDE_DS = "nuxeo.db.singleDataSource.exclude";
083
084
085    private static final String singleDsPropertyCached;
086
087    private static final String excludeDsPropertyCached;
088
089    static {
090        if (Framework.isTestModeSet()) {
091            // no cache for test mode so unit test can change this property at runtime
092            excludeDsPropertyCached = null;
093            singleDsPropertyCached = null;
094        } else {
095            excludeDsPropertyCached = Framework.getProperty(EXCLUDE_DS, "");
096            singleDsPropertyCached = Framework.getProperty(SINGLE_DS, "");
097        }
098    }
099
100    /**
101     * Wrapper for a connection that delegates calls to either a private connection, or a per-transaction shared one if
102     * a transaction is started.
103     * <p>
104     * Sharing is started on setAutoCommit(true), and ends on setAutoCommit(false) or close().
105     */
106    private static class ConnectionHandle implements InvocationHandler {
107
108        private boolean closed;
109
110        /**
111         * Expected autoCommit mode by the client for this connection.
112         */
113        private boolean autoCommit;
114
115        /**
116         * The transaction in use at the time where sharing was started (autoCommit was set to false during a
117         * transaction).
118         * <p>
119         * The sharedConnection is allocated on first use after that.
120         */
121        private Transaction transactionForShare;
122
123        /**
124         * A local connection, allocated when the connection is used when sharedInTransaction == null.
125         */
126        private Connection localConnection;
127
128        /**
129         * A shared connection, allocated when the connection is used when sharedInTransaction != null.
130         */
131        private SharedConnection sharedConnection;
132
133        /**
134         * True between the first use and the commit/rollback (in non-autoCommit mode and shared connection).
135         */
136        private boolean began;
137
138        public ConnectionHandle() {
139            autoCommit = true;
140            if (log.isDebugEnabled()) {
141                log.debug("Construct " + this);
142                if (log.isTraceEnabled()) {
143                    log.trace("Construct stacktrace " + this, new Exception("debug"));
144                }
145            }
146        }
147
148        private void logInvoke(String message) {
149            if (log.isDebugEnabled()) {
150                log.debug("Invoke " + message + " " + this);
151            }
152        }
153
154        @Override
155        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
156            String methodName = method.getName();
157            if (methodName.equals("isClosed")) {
158                return isClosed();
159            } else if (methodName.equals("close")) {
160                close();
161                return null;
162            }
163            if (closed) {
164                throw new SQLException("Connection is closed", "08003");
165            }
166
167            if (methodName.equals("getAutoCommit")) {
168                return getAutoCommit();
169            }
170
171            if (methodName.equals("setAutoCommit")) {
172                setAutoCommit(((Boolean) args[0]).booleanValue());
173                return null;
174            }
175
176            Connection connection;
177            if (transactionForShare != null) {
178                // check that we're still in the same transaction
179                // this also enforces single-threaded use of
180                // the shared connection
181                Transaction transaction = getTransaction();
182                if (transaction != transactionForShare) {
183                    throw new SQLException("Calling method " + methodName
184                            + ", connection sharing started in transaction " + transactionForShare
185                            + " but it is now used in transaction " + transaction);
186                }
187
188                sharedConnectionAllocate();
189
190                // for begin/commit we don't actually need to allocate
191                // the connection
192                if (methodName.equals("commit")) {
193                    if (autoCommit) {
194                        throw new SQLException("Cannot commit outside of transaction", "25000");
195                    }
196                    sharedConnectionCommit();
197                    return null;
198                } else if (methodName.equals("rollback")) {
199                    if (autoCommit) {
200                        throw new SQLException("Cannot commit outside of transaction", "25000");
201                    }
202                    if (args != null && args.length > 0) {
203                        throw new SQLException("Not implemented: rollback(Savepoint)", "0A000");
204                    }
205                    sharedConnectionRollback();
206                    return null;
207                } else if (methodName.equals("setSavepoint") || methodName.equals("releaseSavepoint")) {
208                    throw new SQLException("Not implemented: " + methodName, "0A000");
209                }
210
211                sharedConnectionBegin(methodName);
212
213                connection = sharedConnection.getConnection();
214            } else {
215                localConnectionAllocate();
216                connection = localConnection;
217            }
218
219            try {
220                if (log.isDebugEnabled()) {
221                    if (sharedConnection == null) {
222                        logInvoke(methodName);
223                    } else {
224                        logInvoke(methodName + " " + sharedConnection);
225                    }
226                }
227                return method.invoke(connection, args);
228            } catch (InvocationTargetException e) {
229                throw e.getCause();
230            }
231        }
232
233        private Boolean getAutoCommit() {
234            return Boolean.valueOf(autoCommit);
235        }
236
237        private void setAutoCommit(boolean setAutoCommit) throws SQLException {
238            if (setAutoCommit == autoCommit) {
239                return; // no change
240            }
241            autoCommit = setAutoCommit;
242            if (log.isDebugEnabled()) {
243                log.debug("setAutoCommit(" + autoCommit + ") " + this);
244            }
245            if (!autoCommit) {
246                // setting autoCommit = false
247                if (transactionForShare != null) {
248                    throw new AssertionError("autoCommit=false when already sharing");
249                }
250                // not yet sharing
251                Transaction transaction = getTransaction();
252                if (transaction != null && transactionStatus(transaction) == Status.STATUS_ACTIVE) {
253                    // start sharing
254                    transactionForShare = transaction;
255                    if (localConnection != null) {
256                        // share using the previous local connection
257                        logInvoke("setAutoCommit false");
258                        localConnection.setAutoCommit(false);
259                        log.debug("Upgrading local connection to shared");
260                        sharedConnection = getSharedConnection(localConnection);
261                        localConnection = null;
262                    } else {
263                        // sharedConnection allocated on first use
264                    }
265                } else {
266                    log.debug("No usable transaction");
267                    // we're outside a usable transaction
268                    // use the local connection
269                    if (localConnection != null) {
270                        logInvoke("setAutoCommit false");
271                        localConnection.setAutoCommit(false);
272                    } else {
273                        // localConnection allocated on first use
274                    }
275                }
276            } else {
277                // setting autoCommit = true
278                if (transactionForShare != null) {
279                    if (began) {
280                        // do automatic commit
281                        log.debug("setAutoCommit true committing shared");
282                        sharedConnectionCommit();
283                    }
284                    // stop sharing
285                    sharedConnection = null;
286                    transactionForShare = null;
287                } else if (localConnection != null) {
288                    logInvoke("setAutoCommit true");
289                    localConnection.setAutoCommit(true);
290                }
291            }
292        }
293
294        // allocation on first use
295        private void localConnectionAllocate() throws SQLException {
296            if (localConnection == null) {
297                if (log.isDebugEnabled()) {
298                    log.debug("Constructing physical connection " + this);
299                    if (log.isTraceEnabled()) {
300                        log.trace("Constructing physical connection stacktrace", new Exception("debug"));
301                    }
302                }
303                localConnection = getPhysicalConnection();
304                logInvoke("setAutoCommit " + autoCommit);
305                localConnection.setAutoCommit(autoCommit);
306            }
307        }
308
309        // allocation on first use
310        private void sharedConnectionAllocate() throws SQLException {
311            if (sharedConnection == null) {
312                if (transactionStatus(transactionForShare) == Status.STATUS_ACTIVE) {
313                    sharedConnection = getSharedConnection(null);
314                    // autoCommit mode set by SharedConnection.allocate()
315                } else {
316                    // already committing or rolling back
317                    // do not assign a connection at all
318                    // only commit or rollback is allowed,
319                    // and they will do nothing (began=false)
320                }
321            }
322        }
323
324        private void sharedConnectionBegin(String methodName) throws SQLException {
325            if (sharedConnection == null) {
326                throw new SQLException("Cannot call " + methodName + " with transaction in state "
327                        + transactionStatus(transactionForShare), "25000");
328            }
329            if (!autoCommit && !began) {
330                sharedConnection.begin(this);
331                began = true;
332            }
333        }
334
335        private void sharedConnectionCommit() throws SQLException {
336            if (began) {
337                if (log.isDebugEnabled()) {
338                    log.debug("Committing shared " + this);
339                }
340                sharedConnection.commit(this);
341                began = false;
342            }
343        }
344
345        private void sharedConnectionRollback() throws SQLException {
346            if (began) {
347                sharedConnection.rollback(this);
348                began = false;
349            }
350        }
351
352        /** Called back from SharedConnection close. */
353        protected void closeFromSharedConnection() {
354            sharedConnection = null;
355            transactionForShare = null;
356        }
357
358        private Boolean isClosed() {
359            return Boolean.valueOf(closed);
360        }
361
362        private void close() throws SQLException {
363            if (!closed) {
364                if (log.isDebugEnabled()) {
365                    log.debug("close() " + this);
366                }
367                if (transactionForShare != null) {
368                    if (sharedConnection != null) {
369                        if (began) {
370                            // connection closed before commit/rollback
371                            // commit it by hand (even though it's unspecified)
372                            log.debug("close committing shared");
373                            sharedConnectionCommit();
374                        }
375                        sharedConnection = null;
376                    }
377                    transactionForShare = null;
378                } else {
379                    if (localConnection != null) {
380                        logInvoke("close");
381                        localConnection.close();
382                        localConnection = null;
383                    }
384                }
385                closed = true;
386            }
387        }
388
389        /** Gets the physical connection, use by unwrap. */
390        public Connection getUnwrappedConnection() throws SQLException {
391            Connection connection;
392            if (sharedConnection != null) {
393                connection = sharedConnection.getConnection();
394            } else {
395                connection = localConnection;
396            }
397            if (connection == null) {
398                throw new SQLException("Connection not allocated");
399            }
400            return connection;
401        }
402
403        /**
404         * Gets the shared connection for the shared transaction, or allocates a new one. If allocating a new one,
405         * registers a synchronizer in order to remove it at transaction completion time.
406         *
407         * @param connection an existing local connection to reuse, or null
408         */
409        private SharedConnection getSharedConnection(Connection connection) throws SQLException {
410            SharedConnection sharedConnection = sharedConnections.get(transactionForShare);
411            if (sharedConnection == null) {
412                // allocate a new shared connection
413                sharedConnection = new SharedConnection(connection);
414                if (log.isDebugEnabled()) {
415                    log.debug("Allocating new shared connection " + sharedConnection + " for " + this);
416                }
417                if (sharedConnections.putIfAbsent(transactionForShare, sharedConnection) != null) {
418                    // race condition but we are single-threaded in this
419                    // transaction!
420                    throw new AssertionError("Race condition in single transaction!");
421                }
422                // register a synchronizer to clear the map
423                SharedConnectionSynchronization.getInstance(transactionForShare);
424            } else {
425                if (log.isDebugEnabled()) {
426                    log.debug("Reusing shared connection " + sharedConnection + " for " + this);
427                }
428                if (connection != null) {
429                    // the local connection passed is not needed anymore
430                    log.debug("Dropping previous local connection");
431                    logInvoke("close");
432                    connection.close();
433                }
434            }
435            return sharedConnection;
436        }
437
438        @Override
439        public String toString() {
440            return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this));
441        }
442    }
443
444    /**
445     * Shared connection, holding a physical connection use by several pieces of code in the same transaction (so not
446     * multi-threaded). It's always in mode autoCommit=false.
447     * <p>
448     * The last user to commit/rollback will do an actual commit/rollback on the physical connection.
449     * <p>
450     * If a rollback is done but not by the last user, the connection will be marked rollback only.
451     */
452    private static class SharedConnection {
453
454        /** The JDBC connection. */
455        private Connection connection;
456
457        /** The connection handles associated to this shared connection. */
458        private final List<ConnectionHandle> handles;
459
460        /** Whether the final commit must actually do a rollback. */
461        private boolean mustRollback;
462
463        public SharedConnection(Connection connection) {
464            this.connection = connection;
465            handles = new ArrayList<ConnectionHandle>(3);
466        }
467
468        private void logInvoke(String message) {
469            if (log.isDebugEnabled()) {
470                log.debug("Invoke shared " + message + " " + this);
471            }
472        }
473
474        public Connection getConnection() {
475            return connection;
476        }
477
478        /** Called just before first use. */
479        public void begin(ConnectionHandle handle) throws SQLException {
480            ref(handle);
481        }
482
483        /** Finishes connection use by commit. */
484        public void commit(ConnectionHandle handle) throws SQLException {
485            try {
486                if (handles.size() == 1) {
487                    if (mustRollback) {
488                        logInvoke("rollback");
489                        mustRollback = false;
490                    }
491                } else {
492                    if (log.isDebugEnabled()) {
493                        log.debug("commit not yet closing " + this);
494                    }
495                }
496            } finally {
497                unref(handle);
498            }
499        }
500
501        /** Finishes connection use by rollback. */
502        public void rollback(ConnectionHandle handle) throws SQLException {
503            try {
504                if (handles.size() == 1) {
505                    logInvoke("rollback");
506                    mustRollback = false;
507                } else {
508                    if (log.isDebugEnabled()) {
509                        log.debug("setting rollback only " + this);
510                    }
511                    mustRollback = true;
512                }
513            } finally {
514                unref(handle);
515            }
516        }
517
518        private void ref(ConnectionHandle handle) throws SQLException {
519            if (handles.isEmpty()) {
520                if (connection == null) {
521                    allocate();
522                }
523            }
524            handles.add(handle);
525            if (log.isDebugEnabled()) {
526                log.debug("Reference added for " + this);
527            }
528        }
529
530        private void unref(ConnectionHandle handle) throws SQLException {
531            handles.remove(handle);
532            if (log.isDebugEnabled()) {
533                log.debug("Reference removed for " + this);
534            }
535        }
536
537        // Note that this is not called when a local connection was upgraded to
538        // a shared one, and is reused.
539        private void allocate() throws SQLException {
540            if (log.isDebugEnabled()) {
541                log.debug("Constructing physical connection " + this);
542                if (log.isTraceEnabled()) {
543                    log.trace("Constructing physical connection stacktrace", new Exception("debug"));
544                }
545            }
546            connection = getPhysicalConnection();
547            logInvoke("setAutoCommit false");
548            connection.setAutoCommit(false);
549        }
550
551        /** Called after transaction completion to free resources. */
552        public void closeAfterTransaction(boolean mustRollback) {
553            if (!handles.isEmpty()) {
554                log.error("Transaction ended with " + handles.size() + " connections not committed " + this + " "
555                        + handles);
556            }
557            if (connection == null) {
558                return;
559            }
560            try {
561                if (mustRollback) {
562                    connection.rollback();
563                } else {
564                    connection.commit();
565                }
566            } catch (SQLException cause) {
567                log.error("Could not close endup connection at transaction end", cause);
568            } finally {
569                close();
570            }
571        }
572
573        /** Closes and dereferences from all handles to this. */
574        private void close() {
575            try {
576                logInvoke("close");
577                connection.close();
578            } catch (SQLException e) {
579                log.error("Could not close leftover connection at transaction end", e);
580            } finally {
581                connection = null;
582                for (ConnectionHandle h : handles) {
583                    h.closeFromSharedConnection();
584                }
585                handles.clear();
586            }
587        }
588
589        @Override
590        public String toString() {
591            return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this));
592        }
593    }
594
595    /**
596     * In addition to closing the shared connection, also acts as a delegate for other synchronizers that must run
597     * before it.
598     */
599    private static class SharedConnectionSynchronization implements Synchronization {
600
601        private final Transaction transaction;
602
603        private final List<Synchronization> syncsFirst;
604
605        private final List<Synchronization> syncsLast;
606
607        /**
608         * Gets the instance or creates it. If creating, registers with the actual transaction.
609         */
610        // not synchronized as the transaction is already thread-local
611        // and we use a ConcurrentHashMap
612        public static SharedConnectionSynchronization getInstance(Transaction transaction) {
613            SharedConnectionSynchronization scs = sharedSynchronizations.get(transaction);
614            if (scs == null) {
615                scs = new SharedConnectionSynchronization(transaction);
616                try {
617                    transaction.registerSynchronization(scs);
618                } catch (IllegalStateException | RollbackException | SystemException e) {
619                    throw new RuntimeException("Cannot register synchronization", e);
620                }
621                sharedSynchronizations.put(transaction, scs);
622            }
623            return scs;
624        }
625
626        public SharedConnectionSynchronization(Transaction transaction) {
627            this.transaction = transaction;
628            syncsFirst = new ArrayList<Synchronization>(5);
629            syncsLast = new ArrayList<Synchronization>(5);
630        }
631
632        /**
633         * Registers a synchronization that must run before or after us.
634         */
635        public void registerSynchronization(Synchronization sync, boolean first) {
636            if (first) {
637                syncsFirst.add(sync);
638            } else {
639                syncsLast.add(sync);
640            }
641        }
642
643        @Override
644        public void beforeCompletion() {
645            beforeCompletion(syncsFirst);
646            beforeCompletion(syncsLast);
647        }
648
649        private void beforeCompletion(List<Synchronization> syncs) {
650            // beforeCompletion hooks may add other syncs,
651            // so we must be careful when iterating on the list
652            RuntimeException exc = null;
653            for (int i = 0; i < syncs.size(); i++) {
654                try {
655                    syncs.get(i).beforeCompletion();
656                } catch (RuntimeException e) {
657                    log.error("Exception during beforeCompletion hook", e);
658                    if (exc == null) {
659                        exc = e;
660                        try {
661                            transaction.setRollbackOnly();
662                        } catch (SystemException se) {
663                            log.error("Cannot set rollback only", e);
664                        }
665                    }
666                }
667            }
668            if (exc != null) {
669                throw exc;
670            }
671        }
672
673        /**
674         * After completion, removes the shared connection from the map and closes it.
675         */
676        @Override
677        public void afterCompletion(int status) {
678            sharedSynchronizations.remove(transaction);
679            afterCompletion(syncsFirst, status);
680            closeSharedAfterCompletion(status == Status.STATUS_ROLLEDBACK);
681            afterCompletion(syncsLast, status);
682        }
683
684        private void closeSharedAfterCompletion(boolean rollback) {
685            SharedConnection sharedConnection = sharedConnections.remove(transaction);
686            if (sharedConnection != null) {
687                sharedConnection.closeAfterTransaction(rollback);
688            }
689        }
690
691        private void afterCompletion(List<Synchronization> syncs, int status) {
692            for (Synchronization sync : syncs) {
693                try {
694                    sync.afterCompletion(status);
695                } catch (RuntimeException e) {
696                    log.warn("Unexpected exception from afterCompletion; continuing", e);
697                }
698            }
699        }
700    }
701
702    private static Transaction getTransaction() {
703        try {
704            return TransactionHelper.lookupTransactionManager().getTransaction();
705        } catch (NamingException | SystemException e) {
706            return null;
707        }
708    }
709
710    private static int transactionStatus(Transaction transaction) {
711        try {
712            return transaction.getStatus();
713        } catch (SystemException e) {
714            log.error("Cannot get transaction status", e);
715            return Status.STATUS_UNKNOWN;
716        }
717    }
718
719    /**
720     * Tries to unwrap the connection to get the real physical one (returned by the original datasource).
721     * <p>
722     * This should only be used by code that needs to cast the connection to a driver-specific class to use
723     * driver-specific features.
724     *
725     * @throws SQLException if no actual physical connection was allocated yet
726     */
727    public static Connection unwrap(Connection connection) throws SQLException {
728        if (Proxy.isProxyClass(connection.getClass())) {
729            InvocationHandler handler = Proxy.getInvocationHandler(connection);
730            if (handler instanceof ConnectionHandle) {
731                ConnectionHandle h = (ConnectionHandle) handler;
732                connection = h.getUnwrappedConnection();
733            }
734        }
735        if (connection instanceof org.tranql.connector.jdbc.ConnectionHandle) {
736            return ((org.tranql.connector.jdbc.ConnectionHandle) connection).getAssociation().getPhysicalConnection();
737        }
738        // now try Apache DBCP unwrap (standard or Tomcat), to skip datasource
739        // wrapping layers
740        // this needs accessToUnderlyingConnectionAllowed=true in the pool
741        // config
742        try {
743            Method m = connection.getClass().getMethod("getInnermostDelegate");
744            m.setAccessible(true); // needed, method of inner private class
745            Connection delegate = (Connection) m.invoke(connection);
746            if (delegate == null) {
747                log.error("Cannot access underlying connection, you must use "
748                        + "accessToUnderlyingConnectionAllowed=true in the pool configuration");
749            } else {
750                connection = delegate;
751            }
752        } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException e) {
753            // ignore missing method, connection not coming from Apache pool
754        }
755        return connection;
756    }
757
758    /**
759     * Checks if single transaction-local datasource mode will be used for the given datasource name.
760     *
761     * @return {@code true} if using a single transaction-local connection for this datasource
762     */
763    public static boolean useSingleConnection(String dataSourceName) {
764        if (dataSourceName != null) {
765            String excludes = getExcludeDsProperty();
766            if ("*".equals(excludes)) {
767                return false;
768            }
769            if (!StringUtils.isBlank(excludes)) {
770                for (String exclude : excludes.split("[, ] *")) {
771                    if (dataSourceName.equals(exclude)
772                            || dataSourceName.equals(DataSourceHelper.getDataSourceJNDIName(exclude))) {
773                        return false;
774                    }
775                }
776            }
777        }
778        return !StringUtils.isBlank(getSingleDsProperty());
779    }
780
781    /**
782     * Gets the fake name we use to pass to ConnectionHelper.getConnection, in order for exclusions on these connections
783     * to be possible.
784     */
785    public static String getPseudoDataSourceNameForRepository(String repositoryName) {
786        return "repository_" + repositoryName;
787    }
788
789    /**
790     * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection
791     * <strong>MUST</strong> be closed in a finally block when code is done using it.
792     * <p>
793     * If the passed dataSource name is in the exclusion list, null will be returned.
794     *
795     * @param dataSourceName the datasource for which the connection is requested
796     * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in
797     *         effect
798     * @throws ResourceException
799     */
800    public static Connection getConnection(String dataSourceName) throws SQLException {
801        return getConnection(dataSourceName, false);
802    }
803
804    /**
805     * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection
806     * <strong>MUST</strong> be closed in a finally block when code is done using it.
807     * <p>
808     * If the passed dataSource name is in the exclusion list, null will be returned.
809     * <p>
810     * If noSharing is requested, the connection will never come from the transaction-local and will always be newly
811     * allocated.
812     *
813     * @param dataSourceName the datasource for which the connection is requested
814     * @param noSharing {@code true} if this connection must not be shared with others
815     * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in
816     *         effect
817     * @throws ResourceException
818     */
819    public static Connection getConnection(String dataSourceName, boolean noSharing) throws SQLException {
820        if (!useSingleConnection(dataSourceName)) {
821            DataSource ds = getDataSource(dataSourceName);
822            if (ds instanceof PooledDataSource) {
823                return ((PooledDataSource) ds).getConnection(noSharing);
824            }
825            return getPhysicalConnection(dataSourceName);
826        }
827        return getConnection(noSharing);
828    }
829
830    private static Connection getConnection(boolean noSharing) throws SQLException {
831        String dataSourceName = getSingleDsProperty();
832        if (StringUtils.isBlank(dataSourceName)) {
833            return null;
834        }
835        if (noSharing) {
836            return getPhysicalConnection(dataSourceName);
837        }
838        return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[]{Connection.class},
839                new ConnectionHandle());
840    }
841
842    private static String getSingleDsProperty() {
843        if (singleDsPropertyCached != null) {
844            return singleDsPropertyCached;
845        }
846        return Framework.getProperty(SINGLE_DS);
847    }
848
849    private static String getExcludeDsProperty() {
850        if (excludeDsPropertyCached != null) {
851            return excludeDsPropertyCached;
852        }
853        return Framework.getProperty(EXCLUDE_DS);
854    }
855
856    private static Connection getPhysicalConnection() throws SQLException {
857        return getPhysicalConnection(getSingleDsProperty());
858    }
859
860    /**
861     * Gets a physical connection from a datasource name.
862     * <p>
863     * A few retries are done to work around databases that have problems with many open/close in a row.
864     *
865     * @param dataSourceName the datasource name
866     * @return the connection
867     */
868    private static Connection getPhysicalConnection(String dataSourceName) throws SQLException {
869        DataSource dataSource = getDataSource(dataSourceName);
870        return JDBCUtils.getConnection(dataSource);
871    }
872
873    /**
874     * Gets a datasource from a datasource name, or in test mode use test connection parameters.
875     *
876     * @param dataSourceName the datasource name
877     * @return the datasource
878     */
879    private static DataSource getDataSource(String dataSourceName) throws SQLException {
880        try {
881            return DataSourceHelper.getDataSource(dataSourceName);
882        } catch (NamingException e) {
883            if (Framework.isTestModeSet()) {
884                String url = Framework.getProperty("nuxeo.test.vcs.url");
885                String user = Framework.getProperty("nuxeo.test.vcs.user");
886                String password = Framework.getProperty("nuxeo.test.vcs.password");
887                if (url != null && user != null) {
888                    return new DataSourceFromUrl(url, user, password); // driver?
889                }
890            }
891            throw new SQLException("Cannot find datasource: " + dataSourceName, e);
892        }
893    }
894
895    /**
896     * Checks how many references there are to shared connections.
897     * <p>
898     * USED IN UNIT TESTS OR FOR DEBUGGING.
899     */
900    public static int countConnectionReferences() {
901        return sharedConnections.size();
902    }
903
904    /**
905     * Clears the remaining connection references for the current thread.
906     * <p>
907     * USED IN UNIT TESTS ONLY.
908     */
909    public static void clearConnectionReferences() {
910        for (SharedConnection sharedConnection : sharedConnections.values()) {
911            sharedConnection.closeAfterTransaction(true);
912        }
913        sharedConnections.clear();
914    }
915
916    /**
917     * If sharing is in effect, registers a synchronization with the current transaction, making sure it runs before the
918     * {@link SharedConnectionSynchronization}.
919     *
920     * @return {@code true}
921     */
922    public static boolean registerSynchronization(Synchronization sync) throws SystemException {
923        return registerSynchronization(sync, true);
924    }
925
926    /**
927     * If sharing is in effect, registers a synchronization with the current transaction, making sure the
928     * {@link Synchronization#afterCompletion} method runs after the {@link SharedConnectionSynchronization}.
929     *
930     * @return {@code true}
931     */
932    public static boolean registerSynchronizationLast(Synchronization sync) throws SystemException {
933        return registerSynchronization(sync, false);
934    }
935
936    private static boolean registerSynchronization(Synchronization sync, boolean first) throws SystemException {
937        Transaction transaction = getTransaction();
938        if (transaction == null) {
939            throw new SystemException("Cannot register synchronization: no transaction");
940        }
941        // We always do the lookup and registration to the actual transaction
942        // even if there is no shared connection yet.
943        SharedConnectionSynchronization scs = SharedConnectionSynchronization.getInstance(transaction);
944        scs.registerSynchronization(sync, first);
945        return true;
946    }
947
948}