001/*
002 * (C) Copyright 2016-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.runtime.jtajca;
017
018import java.util.Date;
019import java.util.HashSet;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.TimerTask;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.stream.Collectors;
028
029import javax.resource.ResourceException;
030import javax.transaction.TransactionManager;
031
032import org.apache.commons.logging.LogFactory;
033import org.apache.geronimo.connector.outbound.AbstractConnectionManager;
034import org.apache.geronimo.connector.outbound.ConnectionHandleInterceptor;
035import org.apache.geronimo.connector.outbound.ConnectionInfo;
036import org.apache.geronimo.connector.outbound.ConnectionInterceptor;
037import org.apache.geronimo.connector.outbound.ConnectionReturnAction;
038import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor;
039import org.apache.geronimo.connector.outbound.GenericConnectionManager;
040import org.apache.geronimo.connector.outbound.MCFConnectionInterceptor;
041import org.apache.geronimo.connector.outbound.PoolIdleReleaserTimer;
042import org.apache.geronimo.connector.outbound.SubjectInterceptor;
043import org.apache.geronimo.connector.outbound.SubjectSource;
044import org.apache.geronimo.connector.outbound.TCCLInterceptor;
045import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PartitionedPool;
046import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport;
047import org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport;
048import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
049import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Setups a connection according to the pooling attributes, mainly duplicated from {@link GenericConnectionManager} for
055 * injecting a connection validation interceptor.
056 *
057 * @since 8.3
058 */
059public class NuxeoConnectionManager extends AbstractConnectionManager {
060
061    private static final long serialVersionUID = 1L;
062
063    protected static final Logger log = LoggerFactory.getLogger(NuxeoConnectionManager.class);
064
065    protected final NuxeoConnectionTrackingCoordinator coordinator;
066
067    public NuxeoConnectionManager(int activettl, NuxeoValidationSupport validationSupport,
068            TransactionSupport transactionSupport, PoolingSupport pooling, SubjectSource subjectSource,
069            NuxeoConnectionTrackingCoordinator connectionTracker, RecoverableTransactionManager transactionManager,
070            String name, ClassLoader classLoader) {
071        super(new InterceptorsImpl(validationSupport, transactionSupport, pooling, subjectSource, name,
072                connectionTracker, transactionManager, classLoader), transactionManager, name);
073        coordinator = connectionTracker;
074        activemonitor = new ActiveMonitor(activettl);
075        nosharing = new Nosharing();
076    }
077
078    static class InterceptorsImpl implements AbstractConnectionManager.Interceptors {
079
080        private final ConnectionInterceptor stack;
081
082        private final ConnectionInterceptor recoveryStack;
083
084        private final PoolingSupport poolingSupport;
085
086        /**
087         * Order of constructed interceptors:
088         * <p/>
089         * ConnectionTrackingInterceptor (connectionTracker != null) TCCLInterceptor ConnectionHandleInterceptor
090         * ValidationHandleInterceptor TransactionCachingInterceptor (useTransactions & useTransactionCaching)
091         * TransactionEnlistingInterceptor (useTransactions) SubjectInterceptor (realmBridge != null)
092         * SinglePoolConnectionInterceptor or MultiPoolConnectionInterceptor LocalXAResourceInsertionInterceptor or
093         * XAResourceInsertionInterceptor (useTransactions (&localTransactions)) MCFConnectionInterceptor
094         */
095        public InterceptorsImpl(NuxeoValidationSupport validationSupport, TransactionSupport transactionSupport,
096                PoolingSupport pooling, SubjectSource subjectSource, String name, ConnectionTracker connectionTracker,
097                TransactionManager transactionManager, ClassLoader classLoader) {
098            poolingSupport = pooling;
099            // check for consistency between attributes
100            if (subjectSource == null && pooling instanceof PartitionedPool
101                    && ((PartitionedPool) pooling).isPartitionBySubject()) {
102                throw new IllegalStateException("To use Subject in pooling, you need a SecurityDomain");
103            }
104
105            // Set up the interceptor stack
106            MCFConnectionInterceptor tail = new MCFConnectionInterceptor();
107            ConnectionInterceptor stack = tail;
108            stack = transactionSupport.addXAResourceInsertionInterceptor(stack, name);
109            stack = pooling.addPoolingInterceptors(stack);
110            if (log.isTraceEnabled()) {
111                log.trace("Connection Manager " + name + " installed pool " + stack);
112            }
113            stack = validationSupport.addValidationInterceptors(stack);
114            stack = transactionSupport.addTransactionInterceptors(stack, transactionManager);
115
116            if (subjectSource != null) {
117                stack = new SubjectInterceptor(stack, subjectSource);
118            }
119
120            if (transactionSupport.isRecoverable()) {
121                recoveryStack = new TCCLInterceptor(stack, classLoader);
122            } else {
123                recoveryStack = null;
124            }
125
126            stack = new ConnectionHandleInterceptor(stack);
127            stack = new TCCLInterceptor(stack, classLoader);
128            if (connectionTracker != null) {
129                stack = new ConnectionTrackingInterceptor(stack, name, connectionTracker);
130            }
131            tail.setStack(stack);
132            this.stack = stack;
133            if (log.isDebugEnabled()) {
134                StringBuilder s = new StringBuilder("ConnectionManager Interceptor stack;\n");
135                stack.info(s);
136                log.debug(s.toString());
137            }
138        }
139
140        @Override
141        public ConnectionInterceptor getStack() {
142            return stack;
143        }
144
145        @Override
146        public ConnectionInterceptor getRecoveryStack() {
147            return recoveryStack;
148        }
149
150        @Override
151        public PoolingSupport getPoolingAttributes() {
152            return poolingSupport;
153        }
154
155    }
156
157    @Override
158    public void doStop() throws Exception {
159        if (getConnectionCount() < getPartitionMinSize()) {
160            Thread.sleep(10); // wait for filling tasks completion
161        }
162        super.doStop();
163    }
164
165    /**
166     * @see ActiveMonitor#killTimedoutConnections
167     * @since 8.4
168     */
169    public List<ActiveMonitor.TimeToLive> killActiveTimedoutConnections(long clock) {
170        return activemonitor.killTimedoutConnections(clock);
171    }
172
173    /**
174     * @see ActiveMonitor#killedCount
175     * @since 8.4
176     */
177    public long getKilledConnectionCount() {
178        return activemonitor.killedCount;
179    }
180
181    final ActiveMonitor activemonitor;
182
183    class ActiveMonitor implements ConnectionTracker {
184
185        final int ttl;
186
187        ActiveMonitor(int delay) {
188            ttl = delay;
189            if (ttl > 0) {
190                scheduleCleanups();
191            }
192            coordinator.addTracker(this);
193        }
194
195        final Map<ConnectionInfo, TimeToLive> ttls = new ConcurrentHashMap<>();
196
197        long killedCount = 0L;
198
199        CleanupTask cleanup = new CleanupTask();
200
201        class CleanupTask extends TimerTask {
202
203            @Override
204            public void run() {
205                killActiveTimedoutConnections(System.currentTimeMillis());
206            }
207
208        }
209
210        void cancelCleanups() {
211            cleanup.cancel();
212        }
213
214        void scheduleCleanups() {
215            PoolIdleReleaserTimer.getTimer().scheduleAtFixedRate(cleanup, 60 * 1000, 60 * 1000);
216        }
217
218        @Override
219        public void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor,
220                ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException {
221            int delay = ttl();
222            if (delay > 0) {
223                ttls.put(connectionInfo, new TimeToLive(connectionInfo, Thread.currentThread().getName(),
224                        System.currentTimeMillis(), delay));
225            }
226        }
227
228        @Override
229        public void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor,
230                ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) {
231            ttls.remove(connectionInfo);
232        }
233
234        @Override
235        public void setEnvironment(ConnectionInfo connectionInfo, String key) {
236        }
237
238        /**
239         * Kill active connection that have timed out relative to the given {@code clock}.
240         *
241         * @return information about the killed connections
242         * @since 8.4
243         */
244        public List<TimeToLive> killTimedoutConnections(long clock) {
245            List<TimeToLive> killed = new LinkedList<>();
246            Iterator<TimeToLive> it = ttls.values().iterator();
247            while (it.hasNext()) {
248                TimeToLive ttl = it.next();
249                if (ttl.deadline <= clock) {
250                    ttl.killAndLog();
251                    killed.add(ttl);
252                    it.remove();
253                }
254
255            }
256            return killed;
257        }
258
259        /**
260         * Logs active connections
261         *
262         * @since 8.4
263         */
264        public void log() {
265            for (TimeToLive ttl : ttls.values()) {
266                LogFactory.getLog(TimeToLive.class).warn(ttl, ttl.info.getTrace());
267            }
268        }
269
270        /**
271         * List active connections
272         *
273         * @since 8.4
274         */
275        public Set<TimeToLive> list() {
276            return new HashSet<>(ttls.values());
277        }
278
279        public class TimeToLive {
280
281            public final ConnectionInfo info;
282
283            public final String threadName;
284
285            public final long obtained;
286
287            public final long deadline;
288
289            TimeToLive(ConnectionInfo info, String threadName, long obtained, int delay) {
290                this.info = info;
291                this.threadName = threadName;
292                this.obtained = System.currentTimeMillis();
293                deadline = obtained + delay;
294            }
295
296            void killAndLog() {
297                try {
298                    info.getManagedConnectionInfo().getPoolInterceptor().returnConnection(info,
299                            ConnectionReturnAction.DESTROY);
300                } catch (Throwable error) {
301                    if (error instanceof InterruptedException) {
302                        Thread.currentThread().interrupt();
303                        throw error;
304                    }
305                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class)
306                              .error("Caught error while killing " + info, error);
307                } finally {
308                    killedCount += 1;
309                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class)
310                              .error("Killed " + message(new StringBuilder()), info.getTrace());
311                }
312            }
313
314            void log(long clock) {
315                if (deadline < clock) {
316                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).info(message(new StringBuilder()),
317                            info.getTrace());
318                } else {
319                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).error(message(new StringBuilder()),
320                            info.getTrace());
321                }
322            }
323
324            public StringBuilder message(StringBuilder builder) {
325                return builder.append(info)
326                              .append(",  was obtained by ")
327                              .append(threadName)
328                              .append(" at ")
329                              .append(new Date(obtained))
330                              .append(" and timed out at ")
331                              .append(new Date(deadline));
332            }
333
334            @Override
335            public String toString() {
336                return String.format("TimeToLive(%x) %s", hashCode(), message(new StringBuilder()).toString());
337            }
338        }
339
340        final ThreadLocal<Integer> context = new ThreadLocal<>();
341
342        public void enter(int delay) {
343            context.set(delay);
344        }
345
346        public void exit() {
347            context.remove();
348        }
349
350        int ttl() {
351            Integer value = context.get();
352            if (value != null) {
353                return value.intValue();
354            }
355            return ttl;
356        }
357    }
358
359    public int getActiveTimeoutMinutes() {
360        return activemonitor.ttl / (60 * 1000);
361    }
362
363    public Set<ConnectionInfo> listActive() {
364        return activemonitor.ttls.values().stream().map(ttl -> ttl.info).collect(Collectors.toSet());
365    }
366
367    public void enterActiveMonitor(int delay) {
368        activemonitor.enter(delay);
369    }
370
371    public void exitActiveTimedout() {
372        activemonitor.exit();
373    }
374
375    final Nosharing nosharing;
376
377    class Nosharing implements ConnectionTracker {
378
379        Nosharing() {
380            coordinator.addTracker(this);
381        }
382
383        @Override
384        public void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor,
385                ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException {
386
387        }
388
389        @Override
390        public void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor,
391                ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) {
392
393        }
394
395        final ThreadLocal<Boolean> noSharingHolder = new ThreadLocal<>();
396
397        @Override
398        public void setEnvironment(ConnectionInfo connectionInfo, String key) {
399            connectionInfo.setUnshareable(noSharingHolder.get() != null);
400        }
401
402        void enter() {
403            noSharingHolder.set(Boolean.TRUE);
404        }
405
406        void exit() {
407            noSharingHolder.remove();
408        }
409
410    }
411
412    public void enterNoSharing() {
413        nosharing.enter();
414    }
415
416    public void exitNoSharing() {
417        nosharing.exit();
418    }
419
420}