001/*
002 * (C) Copyright 2012-2013 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.datasource;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.nuxeo.common.utils.JDBCUtils;
025import org.nuxeo.runtime.api.Framework;
026import org.nuxeo.runtime.datasource.PooledDataSourceRegistry.PooledDataSource;
027import org.nuxeo.runtime.transaction.TransactionHelper;
028
029import java.lang.reflect.InvocationHandler;
030import java.lang.reflect.InvocationTargetException;
031import java.lang.reflect.Method;
032import java.lang.reflect.Proxy;
033import java.sql.Connection;
034import java.sql.SQLException;
035import java.util.ArrayList;
036import java.util.List;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentMap;
039
040import javax.naming.NamingException;
041import javax.resource.ResourceException;
042import javax.sql.DataSource;
043import javax.transaction.RollbackException;
044import javax.transaction.Status;
045import javax.transaction.Synchronization;
046import javax.transaction.SystemException;
047import javax.transaction.Transaction;
048
049/**
050 * This helper provides a way to get a JDBC connection, through {@link #getConnection(String)}, that will return a
051 * connection wrapper able to use a shared connection when used in transactional mode and setAutoCommit(false) is
052 * called, and otherwise use a normal physical JDBC connection.
053 * <p>
054 * The physical connections are created from the datasource configured using the framework property {@value #SINGLE_DS}.
055 * <p>
056 * This helper is used to implement consistent resource management in a non-XA context. Several users of the shared
057 * connection can call setAutoCommit(false) then do transactional work and commit(). Only the commit() of the last user
058 * will do an actual commit on the physical connection.
059 *
060 * @since 5.7
061 */
062public class ConnectionHelper {
063
064    private static final Log log = LogFactory.getLog(ConnectionHelper.class);
065
066    /**
067     * Shared connection for each transaction.
068     * <p>
069     * The shared connection is always in autoCommit=false.
070     * <p>
071     * Things are removed from this map by a transaction synchronizer when the transaction finishes.
072     */
073    private static ConcurrentMap<Transaction, SharedConnection> sharedConnections = new ConcurrentHashMap<Transaction, SharedConnection>();
074
075    /**
076     * SharedConnectionSynchronization registered for the transaction, when sharing.
077     */
078    private static ConcurrentMap<Transaction, SharedConnectionSynchronization> sharedSynchronizations = new ConcurrentHashMap<Transaction, SharedConnectionSynchronization>();
079
080    /**
081     * Property holding a datasource name to use to replace all database accesses.
082     */
083    public static final String SINGLE_DS = "nuxeo.db.singleDataSource";
084
085    /**
086     * Property holding one ore more datasource names (comma or space separated) for whose connections the single
087     * datasource is not used.
088     */
089    public static final String EXCLUDE_DS = "nuxeo.db.singleDataSource.exclude";
090
091
092    private static final String singleDsPropertyCached;
093
094    private static final String excludeDsPropertyCached;
095
096    static {
097        if (Framework.isTestModeSet()) {
098            // no cache for test mode so unit test can change this property at runtime
099            excludeDsPropertyCached = null;
100            singleDsPropertyCached = null;
101        } else {
102            excludeDsPropertyCached = Framework.getProperty(EXCLUDE_DS, "");
103            singleDsPropertyCached = Framework.getProperty(SINGLE_DS, "");
104        }
105    }
106
107    /**
108     * Wrapper for a connection that delegates calls to either a private connection, or a per-transaction shared one if
109     * a transaction is started.
110     * <p>
111     * Sharing is started on setAutoCommit(true), and ends on setAutoCommit(false) or close().
112     */
113    private static class ConnectionHandle implements InvocationHandler {
114
115        private boolean closed;
116
117        /**
118         * Expected autoCommit mode by the client for this connection.
119         */
120        private boolean autoCommit;
121
122        /**
123         * The transaction in use at the time where sharing was started (autoCommit was set to false during a
124         * transaction).
125         * <p>
126         * The sharedConnection is allocated on first use after that.
127         */
128        private Transaction transactionForShare;
129
130        /**
131         * A local connection, allocated when the connection is used when sharedInTransaction == null.
132         */
133        private Connection localConnection;
134
135        /**
136         * A shared connection, allocated when the connection is used when sharedInTransaction != null.
137         */
138        private SharedConnection sharedConnection;
139
140        /**
141         * True between the first use and the commit/rollback (in non-autoCommit mode and shared connection).
142         */
143        private boolean began;
144
145        public ConnectionHandle() {
146            autoCommit = true;
147            if (log.isDebugEnabled()) {
148                log.debug("Construct " + this);
149                if (log.isTraceEnabled()) {
150                    log.trace("Construct stacktrace " + this, new Exception("debug"));
151                }
152            }
153        }
154
155        private void logInvoke(String message) {
156            if (log.isDebugEnabled()) {
157                log.debug("Invoke " + message + " " + this);
158            }
159        }
160
161        @Override
162        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
163            String methodName = method.getName();
164            if (methodName.equals("isClosed")) {
165                return isClosed();
166            } else if (methodName.equals("close")) {
167                close();
168                return null;
169            }
170            if (closed) {
171                throw new SQLException("Connection is closed", "08003");
172            }
173
174            if (methodName.equals("getAutoCommit")) {
175                return getAutoCommit();
176            }
177
178            if (methodName.equals("setAutoCommit")) {
179                setAutoCommit(((Boolean) args[0]).booleanValue());
180                return null;
181            }
182
183            Connection connection;
184            if (transactionForShare != null) {
185                // check that we're still in the same transaction
186                // this also enforces single-threaded use of
187                // the shared connection
188                Transaction transaction = getTransaction();
189                if (transaction != transactionForShare) {
190                    throw new SQLException("Calling method " + methodName
191                            + ", connection sharing started in transaction " + transactionForShare
192                            + " but it is now used in transaction " + transaction);
193                }
194
195                sharedConnectionAllocate();
196
197                // for begin/commit we don't actually need to allocate
198                // the connection
199                if (methodName.equals("commit")) {
200                    if (autoCommit) {
201                        throw new SQLException("Cannot commit outside of transaction", "25000");
202                    }
203                    sharedConnectionCommit();
204                    return null;
205                } else if (methodName.equals("rollback")) {
206                    if (autoCommit) {
207                        throw new SQLException("Cannot commit outside of transaction", "25000");
208                    }
209                    if (args != null && args.length > 0) {
210                        throw new SQLException("Not implemented: rollback(Savepoint)", "0A000");
211                    }
212                    sharedConnectionRollback();
213                    return null;
214                } else if (methodName.equals("setSavepoint") || methodName.equals("releaseSavepoint")) {
215                    throw new SQLException("Not implemented: " + methodName, "0A000");
216                }
217
218                sharedConnectionBegin(methodName);
219
220                connection = sharedConnection.getConnection();
221            } else {
222                localConnectionAllocate();
223                connection = localConnection;
224            }
225
226            try {
227                if (log.isDebugEnabled()) {
228                    if (sharedConnection == null) {
229                        logInvoke(methodName);
230                    } else {
231                        logInvoke(methodName + " " + sharedConnection);
232                    }
233                }
234                return method.invoke(connection, args);
235            } catch (InvocationTargetException e) {
236                throw e.getCause();
237            }
238        }
239
240        private Boolean getAutoCommit() {
241            return Boolean.valueOf(autoCommit);
242        }
243
244        private void setAutoCommit(boolean setAutoCommit) throws SQLException {
245            if (setAutoCommit == autoCommit) {
246                return; // no change
247            }
248            autoCommit = setAutoCommit;
249            if (log.isDebugEnabled()) {
250                log.debug("setAutoCommit(" + autoCommit + ") " + this);
251            }
252            if (!autoCommit) {
253                // setting autoCommit = false
254                if (transactionForShare != null) {
255                    throw new AssertionError("autoCommit=false when already sharing");
256                }
257                // not yet sharing
258                Transaction transaction = getTransaction();
259                if (transaction != null && transactionStatus(transaction) == Status.STATUS_ACTIVE) {
260                    // start sharing
261                    transactionForShare = transaction;
262                    if (localConnection != null) {
263                        // share using the previous local connection
264                        logInvoke("setAutoCommit false");
265                        localConnection.setAutoCommit(false);
266                        log.debug("Upgrading local connection to shared");
267                        sharedConnection = getSharedConnection(localConnection);
268                        localConnection = null;
269                    } else {
270                        // sharedConnection allocated on first use
271                    }
272                } else {
273                    log.debug("No usable transaction");
274                    // we're outside a usable transaction
275                    // use the local connection
276                    if (localConnection != null) {
277                        logInvoke("setAutoCommit false");
278                        localConnection.setAutoCommit(false);
279                    } else {
280                        // localConnection allocated on first use
281                    }
282                }
283            } else {
284                // setting autoCommit = true
285                if (transactionForShare != null) {
286                    if (began) {
287                        // do automatic commit
288                        log.debug("setAutoCommit true committing shared");
289                        sharedConnectionCommit();
290                    }
291                    // stop sharing
292                    sharedConnection = null;
293                    transactionForShare = null;
294                } else if (localConnection != null) {
295                    logInvoke("setAutoCommit true");
296                    localConnection.setAutoCommit(true);
297                }
298            }
299        }
300
301        // allocation on first use
302        private void localConnectionAllocate() throws SQLException {
303            if (localConnection == null) {
304                if (log.isDebugEnabled()) {
305                    log.debug("Constructing physical connection " + this);
306                    if (log.isTraceEnabled()) {
307                        log.trace("Constructing physical connection stacktrace", new Exception("debug"));
308                    }
309                }
310                localConnection = getPhysicalConnection();
311                logInvoke("setAutoCommit " + autoCommit);
312                localConnection.setAutoCommit(autoCommit);
313            }
314        }
315
316        // allocation on first use
317        private void sharedConnectionAllocate() throws SQLException {
318            if (sharedConnection == null) {
319                if (transactionStatus(transactionForShare) == Status.STATUS_ACTIVE) {
320                    sharedConnection = getSharedConnection(null);
321                    // autoCommit mode set by SharedConnection.allocate()
322                } else {
323                    // already committing or rolling back
324                    // do not assign a connection at all
325                    // only commit or rollback is allowed,
326                    // and they will do nothing (began=false)
327                }
328            }
329        }
330
331        private void sharedConnectionBegin(String methodName) throws SQLException {
332            if (sharedConnection == null) {
333                throw new SQLException("Cannot call " + methodName + " with transaction in state "
334                        + transactionStatus(transactionForShare), "25000");
335            }
336            if (!autoCommit && !began) {
337                sharedConnection.begin(this);
338                began = true;
339            }
340        }
341
342        private void sharedConnectionCommit() throws SQLException {
343            if (began) {
344                if (log.isDebugEnabled()) {
345                    log.debug("Committing shared " + this);
346                }
347                sharedConnection.commit(this);
348                began = false;
349            }
350        }
351
352        private void sharedConnectionRollback() throws SQLException {
353            if (began) {
354                sharedConnection.rollback(this);
355                began = false;
356            }
357        }
358
359        /** Called back from SharedConnection close. */
360        protected void closeFromSharedConnection() {
361            sharedConnection = null;
362            transactionForShare = null;
363        }
364
365        private Boolean isClosed() {
366            return Boolean.valueOf(closed);
367        }
368
369        private void close() throws SQLException {
370            if (!closed) {
371                if (log.isDebugEnabled()) {
372                    log.debug("close() " + this);
373                }
374                if (transactionForShare != null) {
375                    if (sharedConnection != null) {
376                        if (began) {
377                            // connection closed before commit/rollback
378                            // commit it by hand (even though it's unspecified)
379                            log.debug("close committing shared");
380                            sharedConnectionCommit();
381                        }
382                        sharedConnection = null;
383                    }
384                    transactionForShare = null;
385                } else {
386                    if (localConnection != null) {
387                        logInvoke("close");
388                        localConnection.close();
389                        localConnection = null;
390                    }
391                }
392                closed = true;
393            }
394        }
395
396        /** Gets the physical connection, use by unwrap. */
397        public Connection getUnwrappedConnection() throws SQLException {
398            Connection connection;
399            if (sharedConnection != null) {
400                connection = sharedConnection.getConnection();
401            } else {
402                connection = localConnection;
403            }
404            if (connection == null) {
405                throw new SQLException("Connection not allocated");
406            }
407            return connection;
408        }
409
410        /**
411         * Gets the shared connection for the shared transaction, or allocates a new one. If allocating a new one,
412         * registers a synchronizer in order to remove it at transaction completion time.
413         *
414         * @param connection an existing local connection to reuse, or null
415         */
416        private SharedConnection getSharedConnection(Connection connection) throws SQLException {
417            SharedConnection sharedConnection = sharedConnections.get(transactionForShare);
418            if (sharedConnection == null) {
419                // allocate a new shared connection
420                sharedConnection = new SharedConnection(connection);
421                if (log.isDebugEnabled()) {
422                    log.debug("Allocating new shared connection " + sharedConnection + " for " + this);
423                }
424                if (sharedConnections.putIfAbsent(transactionForShare, sharedConnection) != null) {
425                    // race condition but we are single-threaded in this
426                    // transaction!
427                    throw new AssertionError("Race condition in single transaction!");
428                }
429                // register a synchronizer to clear the map
430                SharedConnectionSynchronization.getInstance(transactionForShare);
431            } else {
432                if (log.isDebugEnabled()) {
433                    log.debug("Reusing shared connection " + sharedConnection + " for " + this);
434                }
435                if (connection != null) {
436                    // the local connection passed is not needed anymore
437                    log.debug("Dropping previous local connection");
438                    logInvoke("close");
439                    connection.close();
440                }
441            }
442            return sharedConnection;
443        }
444
445        @Override
446        public String toString() {
447            return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this));
448        }
449    }
450
451    /**
452     * Shared connection, holding a physical connection use by several pieces of code in the same transaction (so not
453     * multi-threaded). It's always in mode autoCommit=false.
454     * <p>
455     * The last user to commit/rollback will do an actual commit/rollback on the physical connection.
456     * <p>
457     * If a rollback is done but not by the last user, the connection will be marked rollback only.
458     */
459    private static class SharedConnection {
460
461        /** The JDBC connection. */
462        private Connection connection;
463
464        /** The connection handles associated to this shared connection. */
465        private final List<ConnectionHandle> handles;
466
467        /** Whether the final commit must actually do a rollback. */
468        private boolean mustRollback;
469
470        public SharedConnection(Connection connection) {
471            this.connection = connection;
472            handles = new ArrayList<ConnectionHandle>(3);
473        }
474
475        private void logInvoke(String message) {
476            if (log.isDebugEnabled()) {
477                log.debug("Invoke shared " + message + " " + this);
478            }
479        }
480
481        public Connection getConnection() {
482            return connection;
483        }
484
485        /** Called just before first use. */
486        public void begin(ConnectionHandle handle) throws SQLException {
487            ref(handle);
488        }
489
490        /** Finishes connection use by commit. */
491        public void commit(ConnectionHandle handle) throws SQLException {
492            try {
493                if (handles.size() == 1) {
494                    if (mustRollback) {
495                        logInvoke("rollback");
496                        mustRollback = false;
497                    }
498                } else {
499                    if (log.isDebugEnabled()) {
500                        log.debug("commit not yet closing " + this);
501                    }
502                }
503            } finally {
504                unref(handle);
505            }
506        }
507
508        /** Finishes connection use by rollback. */
509        public void rollback(ConnectionHandle handle) throws SQLException {
510            try {
511                if (handles.size() == 1) {
512                    logInvoke("rollback");
513                    mustRollback = false;
514                } else {
515                    if (log.isDebugEnabled()) {
516                        log.debug("setting rollback only " + this);
517                    }
518                    mustRollback = true;
519                }
520            } finally {
521                unref(handle);
522            }
523        }
524
525        private void ref(ConnectionHandle handle) throws SQLException {
526            if (handles.isEmpty()) {
527                if (connection == null) {
528                    allocate();
529                }
530            }
531            handles.add(handle);
532            if (log.isDebugEnabled()) {
533                log.debug("Reference added for " + this);
534            }
535        }
536
537        private void unref(ConnectionHandle handle) throws SQLException {
538            handles.remove(handle);
539            if (log.isDebugEnabled()) {
540                log.debug("Reference removed for " + this);
541            }
542        }
543
544        // Note that this is not called when a local connection was upgraded to
545        // a shared one, and is reused.
546        private void allocate() throws SQLException {
547            if (log.isDebugEnabled()) {
548                log.debug("Constructing physical connection " + this);
549                if (log.isTraceEnabled()) {
550                    log.trace("Constructing physical connection stacktrace", new Exception("debug"));
551                }
552            }
553            connection = getPhysicalConnection();
554            logInvoke("setAutoCommit false");
555            connection.setAutoCommit(false);
556        }
557
558        /** Called after transaction completion to free resources. */
559        public void closeAfterTransaction(boolean mustRollback) {
560            if (!handles.isEmpty()) {
561                log.error("Transaction ended with " + handles.size() + " connections not committed " + this + " "
562                        + handles);
563            }
564            if (connection == null) {
565                return;
566            }
567            try {
568                if (mustRollback) {
569                    connection.rollback();
570                } else {
571                    connection.commit();
572                }
573            } catch (SQLException cause) {
574                log.error("Could not close endup connection at transaction end", cause);
575            } finally {
576                close();
577            }
578        }
579
580        /** Closes and dereferences from all handles to this. */
581        private void close() {
582            try {
583                logInvoke("close");
584                connection.close();
585            } catch (SQLException e) {
586                log.error("Could not close leftover connection at transaction end", e);
587            } finally {
588                connection = null;
589                for (ConnectionHandle h : handles) {
590                    h.closeFromSharedConnection();
591                }
592                handles.clear();
593            }
594        }
595
596        @Override
597        public String toString() {
598            return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this));
599        }
600    }
601
602    /**
603     * In addition to closing the shared connection, also acts as a delegate for other synchronizers that must run
604     * before it.
605     */
606    private static class SharedConnectionSynchronization implements Synchronization {
607
608        private final Transaction transaction;
609
610        private final List<Synchronization> syncsFirst;
611
612        private final List<Synchronization> syncsLast;
613
614        /**
615         * Gets the instance or creates it. If creating, registers with the actual transaction.
616         */
617        // not synchronized as the transaction is already thread-local
618        // and we use a ConcurrentHashMap
619        public static SharedConnectionSynchronization getInstance(Transaction transaction) {
620            SharedConnectionSynchronization scs = sharedSynchronizations.get(transaction);
621            if (scs == null) {
622                scs = new SharedConnectionSynchronization(transaction);
623                try {
624                    transaction.registerSynchronization(scs);
625                } catch (IllegalStateException | RollbackException | SystemException e) {
626                    throw new RuntimeException("Cannot register synchronization", e);
627                }
628                sharedSynchronizations.put(transaction, scs);
629            }
630            return scs;
631        }
632
633        public SharedConnectionSynchronization(Transaction transaction) {
634            this.transaction = transaction;
635            syncsFirst = new ArrayList<Synchronization>(5);
636            syncsLast = new ArrayList<Synchronization>(5);
637        }
638
639        /**
640         * Registers a synchronization that must run before or after us.
641         */
642        public void registerSynchronization(Synchronization sync, boolean first) {
643            if (first) {
644                syncsFirst.add(sync);
645            } else {
646                syncsLast.add(sync);
647            }
648        }
649
650        @Override
651        public void beforeCompletion() {
652            beforeCompletion(syncsFirst);
653            beforeCompletion(syncsLast);
654        }
655
656        private void beforeCompletion(List<Synchronization> syncs) {
657            // beforeCompletion hooks may add other syncs,
658            // so we must be careful when iterating on the list
659            RuntimeException exc = null;
660            for (int i = 0; i < syncs.size(); i++) {
661                try {
662                    syncs.get(i).beforeCompletion();
663                } catch (RuntimeException e) {
664                    log.error("Exception during beforeCompletion hook", e);
665                    if (exc == null) {
666                        exc = e;
667                        try {
668                            transaction.setRollbackOnly();
669                        } catch (SystemException se) {
670                            log.error("Cannot set rollback only", e);
671                        }
672                    }
673                }
674            }
675            if (exc != null) {
676                throw exc;
677            }
678        }
679
680        /**
681         * After completion, removes the shared connection from the map and closes it.
682         */
683        @Override
684        public void afterCompletion(int status) {
685            sharedSynchronizations.remove(transaction);
686            afterCompletion(syncsFirst, status);
687            closeSharedAfterCompletion(status == Status.STATUS_ROLLEDBACK);
688            afterCompletion(syncsLast, status);
689        }
690
691        private void closeSharedAfterCompletion(boolean rollback) {
692            SharedConnection sharedConnection = sharedConnections.remove(transaction);
693            if (sharedConnection != null) {
694                sharedConnection.closeAfterTransaction(rollback);
695            }
696        }
697
698        private void afterCompletion(List<Synchronization> syncs, int status) {
699            for (Synchronization sync : syncs) {
700                try {
701                    sync.afterCompletion(status);
702                } catch (RuntimeException e) {
703                    log.warn("Unexpected exception from afterCompletion; continuing", e);
704                }
705            }
706        }
707    }
708
709    private static Transaction getTransaction() {
710        try {
711            return TransactionHelper.lookupTransactionManager().getTransaction();
712        } catch (NamingException | SystemException e) {
713            return null;
714        }
715    }
716
717    private static int transactionStatus(Transaction transaction) {
718        try {
719            return transaction.getStatus();
720        } catch (SystemException e) {
721            log.error("Cannot get transaction status", e);
722            return Status.STATUS_UNKNOWN;
723        }
724    }
725
726    /**
727     * Tries to unwrap the connection to get the real physical one (returned by the original datasource).
728     * <p>
729     * This should only be used by code that needs to cast the connection to a driver-specific class to use
730     * driver-specific features.
731     *
732     * @throws SQLException if no actual physical connection was allocated yet
733     */
734    public static Connection unwrap(Connection connection) throws SQLException {
735        if (Proxy.isProxyClass(connection.getClass())) {
736            InvocationHandler handler = Proxy.getInvocationHandler(connection);
737            if (handler instanceof ConnectionHandle) {
738                ConnectionHandle h = (ConnectionHandle) handler;
739                connection = h.getUnwrappedConnection();
740            }
741        }
742        if (connection instanceof org.tranql.connector.jdbc.ConnectionHandle) {
743            return ((org.tranql.connector.jdbc.ConnectionHandle) connection).getAssociation().getPhysicalConnection();
744        }
745        // now try Apache DBCP unwrap (standard or Tomcat), to skip datasource
746        // wrapping layers
747        // this needs accessToUnderlyingConnectionAllowed=true in the pool
748        // config
749        try {
750            Method m = connection.getClass().getMethod("getInnermostDelegate");
751            m.setAccessible(true); // needed, method of inner private class
752            Connection delegate = (Connection) m.invoke(connection);
753            if (delegate == null) {
754                log.error("Cannot access underlying connection, you must use "
755                        + "accessToUnderlyingConnectionAllowed=true in the pool configuration");
756            } else {
757                connection = delegate;
758            }
759        } catch (NoSuchMethodException | SecurityException | IllegalAccessException | InvocationTargetException e) {
760            // ignore missing method, connection not coming from Apache pool
761        }
762        return connection;
763    }
764
765    /**
766     * Checks if single transaction-local datasource mode will be used for the given datasource name.
767     *
768     * @return {@code true} if using a single transaction-local connection for this datasource
769     */
770    public static boolean useSingleConnection(String dataSourceName) {
771        if (dataSourceName != null) {
772            String excludes = getExcludeDsProperty();
773            if ("*".equals(excludes)) {
774                return false;
775            }
776            if (!StringUtils.isBlank(excludes)) {
777                for (String exclude : excludes.split("[, ] *")) {
778                    if (dataSourceName.equals(exclude)
779                            || dataSourceName.equals(DataSourceHelper.getDataSourceJNDIName(exclude))) {
780                        return false;
781                    }
782                }
783            }
784        }
785        return !StringUtils.isBlank(getSingleDsProperty());
786    }
787
788    /**
789     * Gets the fake name we use to pass to ConnectionHelper.getConnection, in order for exclusions on these connections
790     * to be possible.
791     */
792    public static String getPseudoDataSourceNameForRepository(String repositoryName) {
793        return "repository_" + repositoryName;
794    }
795
796    /**
797     * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection
798     * <strong>MUST</strong> be closed in a finally block when code is done using it.
799     * <p>
800     * If the passed dataSource name is in the exclusion list, null will be returned.
801     *
802     * @param dataSourceName the datasource for which the connection is requested
803     * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in
804     *         effect
805     * @throws ResourceException
806     */
807    public static Connection getConnection(String dataSourceName) throws SQLException {
808        return getConnection(dataSourceName, false);
809    }
810
811    /**
812     * Gets a new reference to the transaction-local JDBC connection for the given dataSource. The connection
813     * <strong>MUST</strong> be closed in a finally block when code is done using it.
814     * <p>
815     * If the passed dataSource name is in the exclusion list, null will be returned.
816     * <p>
817     * If noSharing is requested, the connection will never come from the transaction-local and will always be newly
818     * allocated.
819     *
820     * @param dataSourceName the datasource for which the connection is requested
821     * @param noSharing {@code true} if this connection must not be shared with others
822     * @return a new reference to the connection, or {@code null} if single datasource connection sharing is not in
823     *         effect
824     * @throws ResourceException
825     */
826    public static Connection getConnection(String dataSourceName, boolean noSharing) throws SQLException {
827        if (!useSingleConnection(dataSourceName)) {
828            DataSource ds = getDataSource(dataSourceName);
829            if (ds instanceof PooledDataSource) {
830                return ((PooledDataSource) ds).getConnection(noSharing);
831            }
832            return getPhysicalConnection(dataSourceName);
833        }
834        return getConnection(noSharing);
835    }
836
837    private static Connection getConnection(boolean noSharing) throws SQLException {
838        String dataSourceName = getSingleDsProperty();
839        if (StringUtils.isBlank(dataSourceName)) {
840            return null;
841        }
842        if (noSharing) {
843            return getPhysicalConnection(dataSourceName);
844        }
845        return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[]{Connection.class},
846                new ConnectionHandle());
847    }
848
849    private static String getSingleDsProperty() {
850        if (singleDsPropertyCached != null) {
851            return singleDsPropertyCached;
852        }
853        return Framework.getProperty(SINGLE_DS);
854    }
855
856    private static String getExcludeDsProperty() {
857        if (excludeDsPropertyCached != null) {
858            return excludeDsPropertyCached;
859        }
860        return Framework.getProperty(EXCLUDE_DS);
861    }
862
863    private static Connection getPhysicalConnection() throws SQLException {
864        return getPhysicalConnection(getSingleDsProperty());
865    }
866
867    /**
868     * Gets a physical connection from a datasource name.
869     * <p>
870     * A few retries are done to work around databases that have problems with many open/close in a row.
871     *
872     * @param dataSourceName the datasource name
873     * @return the connection
874     */
875    private static Connection getPhysicalConnection(String dataSourceName) throws SQLException {
876        DataSource dataSource = getDataSource(dataSourceName);
877        return JDBCUtils.getConnection(dataSource);
878    }
879
880    /**
881     * Gets a datasource from a datasource name, or in test mode use test connection parameters.
882     *
883     * @param dataSourceName the datasource name
884     * @return the datasource
885     */
886    private static DataSource getDataSource(String dataSourceName) throws SQLException {
887        try {
888            return DataSourceHelper.getDataSource(dataSourceName);
889        } catch (NamingException e) {
890            if (Framework.isTestModeSet()) {
891                String url = Framework.getProperty("nuxeo.test.vcs.url");
892                String user = Framework.getProperty("nuxeo.test.vcs.user");
893                String password = Framework.getProperty("nuxeo.test.vcs.password");
894                if (url != null && user != null) {
895                    return new DataSourceFromUrl(url, user, password); // driver?
896                }
897            }
898            throw new SQLException("Cannot find datasource: " + dataSourceName, e);
899        }
900    }
901
902    /**
903     * Checks how many references there are to shared connections.
904     * <p>
905     * USED IN UNIT TESTS OR FOR DEBUGGING.
906     */
907    public static int countConnectionReferences() {
908        return sharedConnections.size();
909    }
910
911    /**
912     * Clears the remaining connection references for the current thread.
913     * <p>
914     * USED IN UNIT TESTS ONLY.
915     */
916    public static void clearConnectionReferences() {
917        for (SharedConnection sharedConnection : sharedConnections.values()) {
918            sharedConnection.closeAfterTransaction(true);
919        }
920        sharedConnections.clear();
921    }
922
923    /**
924     * If sharing is in effect, registers a synchronization with the current transaction, making sure it runs before the
925     * {@link SharedConnectionSynchronization}.
926     *
927     * @return {@code true}
928     */
929    public static boolean registerSynchronization(Synchronization sync) throws SystemException {
930        return registerSynchronization(sync, true);
931    }
932
933    /**
934     * If sharing is in effect, registers a synchronization with the current transaction, making sure the
935     * {@link Synchronization#afterCompletion} method runs after the {@link SharedConnectionSynchronization}.
936     *
937     * @return {@code true}
938     */
939    public static boolean registerSynchronizationLast(Synchronization sync) throws SystemException {
940        return registerSynchronization(sync, false);
941    }
942
943    private static boolean registerSynchronization(Synchronization sync, boolean first) throws SystemException {
944        Transaction transaction = getTransaction();
945        if (transaction == null) {
946            throw new SystemException("Cannot register synchronization: no transaction");
947        }
948        // We always do the lookup and registration to the actual transaction
949        // even if there is no shared connection yet.
950        SharedConnectionSynchronization scs = SharedConnectionSynchronization.getInstance(transaction);
951        scs.registerSynchronization(sync, first);
952        return true;
953    }
954
955}