001/*
002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.nuxeo.runtime.jtajca;
018
019import java.util.Date;
020import java.util.HashMap;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.LinkedList;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.TimerTask;
028import java.util.stream.Collectors;
029
030import javax.resource.ResourceException;
031import javax.transaction.TransactionManager;
032
033import org.apache.commons.logging.LogFactory;
034import org.apache.geronimo.connector.outbound.AbstractConnectionManager;
035import org.apache.geronimo.connector.outbound.ConnectionHandleInterceptor;
036import org.apache.geronimo.connector.outbound.ConnectionInfo;
037import org.apache.geronimo.connector.outbound.ConnectionInterceptor;
038import org.apache.geronimo.connector.outbound.ConnectionReturnAction;
039import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor;
040import org.apache.geronimo.connector.outbound.GenericConnectionManager;
041import org.apache.geronimo.connector.outbound.MCFConnectionInterceptor;
042import org.apache.geronimo.connector.outbound.PoolIdleReleaserTimer;
043import org.apache.geronimo.connector.outbound.SubjectInterceptor;
044import org.apache.geronimo.connector.outbound.SubjectSource;
045import org.apache.geronimo.connector.outbound.TCCLInterceptor;
046import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PartitionedPool;
047import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport;
048import org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport;
049import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
050import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Setups a connection according to the pooling attributes, mainly duplicated from {@link GenericConnectionManager} for
056 * injecting a connection validation interceptor.
057 *
058 * @since 8.3
059 */
060public class NuxeoConnectionManager extends AbstractConnectionManager {
061    private static final long serialVersionUID = 1L;
062    protected static final Logger log = LoggerFactory.getLogger(NuxeoConnectionManager.class);
063
064    final NuxeoConnectionTrackingCoordinator coordinator;
065
066    public NuxeoConnectionManager(int activettl, NuxeoValidationSupport validationSupport,
067            TransactionSupport transactionSupport, PoolingSupport pooling, SubjectSource subjectSource,
068            NuxeoConnectionTrackingCoordinator connectionTracker, RecoverableTransactionManager transactionManager,
069            String name, ClassLoader classLoader) {
070        super(new InterceptorsImpl(validationSupport, transactionSupport, pooling, subjectSource, name,
071                connectionTracker, transactionManager, classLoader), transactionManager, name);
072        coordinator = connectionTracker;
073        activemonitor = new ActiveMonitor(activettl);
074        nosharing = new Nosharing();
075    }
076
077    static class InterceptorsImpl implements AbstractConnectionManager.Interceptors {
078
079        private final ConnectionInterceptor stack;
080        private final ConnectionInterceptor recoveryStack;
081        private final PoolingSupport poolingSupport;
082
083        /**
084         * Order of constructed interceptors:
085         * <p/>
086         * ConnectionTrackingInterceptor (connectionTracker != null) TCCLInterceptor ConnectionHandleInterceptor
087         * ValidationHandleInterceptor TransactionCachingInterceptor (useTransactions & useTransactionCaching)
088         * TransactionEnlistingInterceptor (useTransactions) SubjectInterceptor (realmBridge != null)
089         * SinglePoolConnectionInterceptor or MultiPoolConnectionInterceptor LocalXAResourceInsertionInterceptor or
090         * XAResourceInsertionInterceptor (useTransactions (&localTransactions)) MCFConnectionInterceptor
091         */
092        public InterceptorsImpl(NuxeoValidationSupport validationSupport, TransactionSupport transactionSupport,
093                PoolingSupport pooling, SubjectSource subjectSource, String name, ConnectionTracker connectionTracker,
094                TransactionManager transactionManager, ClassLoader classLoader) {
095            poolingSupport = pooling;
096            // check for consistency between attributes
097            if (subjectSource == null && pooling instanceof PartitionedPool
098                    && ((PartitionedPool) pooling).isPartitionBySubject()) {
099                throw new IllegalStateException("To use Subject in pooling, you need a SecurityDomain");
100            }
101
102            // Set up the interceptor stack
103            MCFConnectionInterceptor tail = new MCFConnectionInterceptor();
104            ConnectionInterceptor stack = tail;
105            stack = transactionSupport.addXAResourceInsertionInterceptor(stack, name);
106            stack = pooling.addPoolingInterceptors(stack);
107            if (log.isTraceEnabled()) {
108                log.trace("Connection Manager " + name + " installed pool " + stack);
109            }
110            stack = validationSupport.addValidationInterceptors(stack);
111            stack = transactionSupport.addTransactionInterceptors(stack, transactionManager);
112
113            if (subjectSource != null) {
114                stack = new SubjectInterceptor(stack, subjectSource);
115            }
116
117            if (transactionSupport.isRecoverable()) {
118                recoveryStack = new TCCLInterceptor(stack, classLoader);
119            } else {
120                recoveryStack = null;
121            }
122
123            stack = new ConnectionHandleInterceptor(stack);
124            stack = new TCCLInterceptor(stack, classLoader);
125            if (connectionTracker != null) {
126                stack = new ConnectionTrackingInterceptor(stack, name, connectionTracker);
127            }
128            tail.setStack(stack);
129            this.stack = stack;
130            if (log.isDebugEnabled()) {
131                StringBuilder s = new StringBuilder("ConnectionManager Interceptor stack;\n");
132                stack.info(s);
133                log.debug(s.toString());
134            }
135        }
136
137        @Override
138        public ConnectionInterceptor getStack() {
139            return stack;
140        }
141
142        @Override
143        public ConnectionInterceptor getRecoveryStack() {
144            return recoveryStack;
145        }
146
147        @Override
148        public PoolingSupport getPoolingAttributes() {
149            return poolingSupport;
150        }
151
152    }
153
154    @Override
155    public void doStop() throws Exception {
156        if (getConnectionCount() < getPartitionMinSize()) {
157            Thread.sleep(10); // wait for filling tasks completion
158        }
159        super.doStop();
160    }
161
162    /**
163     *
164     * @see ActiveMonitor#killTimedoutConnections
165     * @since 8.4
166     */
167    public List<ActiveMonitor.TimeToLive> killActiveTimedoutConnections(long clock) {
168        return activemonitor.killTimedoutConnections(clock);
169    }
170
171    /**
172     *
173     * @see NuxeoConnectionTrackingCoordinator#k
174     * @since 8.4
175     */
176    public long getKilledConnectionCount() {
177        return activemonitor.killedCount;
178    }
179
180    final ActiveMonitor activemonitor;
181
182    class ActiveMonitor implements ConnectionTracker {
183
184        final int ttl;
185
186        ActiveMonitor(int delay) {
187            ttl = delay;
188            if (ttl > 0) {
189                scheduleCleanups();
190            }
191            coordinator.addTracker(this);
192        }
193
194        final Map<ConnectionInfo, TimeToLive> ttls = new HashMap<>();
195
196        long killedCount = 0L;
197
198        CleanupTask cleanup = new CleanupTask();
199
200        class CleanupTask extends TimerTask {
201
202            @Override
203            public void run() {
204                killActiveTimedoutConnections(System.currentTimeMillis());
205            }
206
207        }
208
209        void cancelCleanups() {
210            cleanup.cancel();
211        }
212
213        void scheduleCleanups() {
214            PoolIdleReleaserTimer.getTimer().scheduleAtFixedRate(cleanup, 60 * 1000, 60 * 1000);
215        }
216
217        @Override
218        public synchronized void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor,
219                ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException {
220            int delay = ttl();
221            if (delay > 0) {
222                ttls.put(connectionInfo, new TimeToLive(connectionInfo, Thread.currentThread().getName(), System.currentTimeMillis(), delay));
223            }
224        }
225
226        @Override
227        public synchronized void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor,
228                ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) {
229            ttls.remove(connectionInfo);
230        }
231
232        @Override
233        public void setEnvironment(ConnectionInfo connectionInfo, String key) {
234            return;
235        }
236
237        /**
238         * Kill active connection that have timed out relative to the given {@code clock}.
239         *
240         * @return information about the killed connections
241         * @param clock
242         * @since 8.4
243         */
244        public synchronized List<TimeToLive> killTimedoutConnections(long clock) {
245            List<TimeToLive> killed = new LinkedList<>();
246            Iterator<TimeToLive> it = ttls.values().iterator();
247            while (it.hasNext()) {
248                TimeToLive ttl = it.next();
249                if (ttl.deadline <= clock) {
250                    ttl.killAndLog();
251                    killed.add(ttl);
252                    it.remove();
253                }
254
255            }
256            return killed;
257        }
258
259        /**
260         * Logs active connections
261         *
262         *
263         * @since 8.4
264         */
265        public void log() {
266            for (TimeToLive ttl : ttls.values()) {
267                LogFactory.getLog(TimeToLive.class).warn(ttl, ttl.info.getTrace());
268            }
269        }
270
271
272        /**
273         * List active connections
274         *
275         *
276         * @since 8.4
277         */
278        public Set<TimeToLive> list() {
279            return new HashSet<>(ttls.values());
280        }
281
282        public class TimeToLive {
283
284            public final ConnectionInfo info;
285
286            public final String threadName;
287
288            public final long obtained;
289
290            public final long deadline;
291
292            TimeToLive(ConnectionInfo info, String threadName, long obtained, int delay) {
293                this.info = info;
294                this.threadName = threadName;
295                this.obtained = System.currentTimeMillis();
296                deadline = obtained + delay;
297            }
298
299            void killAndLog() {
300                try {
301                    info.getManagedConnectionInfo().getPoolInterceptor().returnConnection(info,
302                            ConnectionReturnAction.DESTROY);
303                } catch (Throwable error) {
304                    if (error instanceof InterruptedException) {
305                        Thread.currentThread().interrupt();
306                        throw error;
307                    }
308                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class)
309                            .error("Caught error while killing " + info, error);
310                } finally {
311                    killedCount += 1;
312                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class)
313                            .error("Killed " + message(new StringBuilder()), info.getTrace());
314                }
315            }
316
317            void log(long clock) {
318                if (deadline < clock) {
319                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).info(message(new StringBuilder()),
320                            info.getTrace());
321                } else {
322                    LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).error(message(new StringBuilder()),
323                            info.getTrace());
324                }
325            }
326
327            public StringBuilder message(StringBuilder builder) {
328                return builder.append(info).append(",  was obtained by ").append(threadName).append(" at ")
329                        .append(new Date(obtained)).append(" and timed out at ").append(new Date(deadline));
330            }
331
332            @Override
333            public String toString() {
334                return String.format("TimeToLive(%x) %s", hashCode(), message(new StringBuilder()).toString());
335            }
336        }
337
338        final ThreadLocal<Integer> context = new ThreadLocal<>();
339
340        public void enter(int delay) {
341            context.set(delay);
342        };
343
344        public void exit() {
345            context.remove();
346        }
347
348        int ttl() {
349            Integer value = context.get();
350            if (value != null) {
351                return value.intValue();
352            }
353            return ttl;
354        }
355    }
356
357    public int getActiveTimeoutMinutes() {
358        return activemonitor.ttl / (60 * 1000);
359    }
360
361    public Set<ConnectionInfo> listActive() {
362        return activemonitor.ttls.values().stream().map(ttl -> ttl.info).collect(Collectors.toSet());
363    }
364
365    public void enterActiveMonitor(int delay) {
366        activemonitor.enter(delay);
367    }
368
369    public void exitActiveTimedout() {
370        activemonitor.exit();
371    }
372
373    final Nosharing nosharing;
374
375    class Nosharing implements ConnectionTracker {
376
377        Nosharing() {
378            coordinator.addTracker(this);
379        }
380
381        @Override
382        public void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor,
383                ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException {
384
385        }
386
387        @Override
388        public void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor,
389                ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) {
390
391        }
392
393        final ThreadLocal<Boolean> noSharingHolder = new ThreadLocal<Boolean>();
394
395        @Override
396        public void setEnvironment(ConnectionInfo connectionInfo, String key) {
397            connectionInfo.setUnshareable(noSharingHolder.get() == null ? false : true);
398        }
399
400        void enter() {
401            noSharingHolder.set(Boolean.TRUE);
402        }
403
404        void exit() {
405            noSharingHolder.remove();
406        }
407
408    }
409
410    public void enterNoSharing() {
411        nosharing.enter();
412    }
413
414    public void exitNoSharing() {
415        nosharing.exit();
416    }
417
418}