001/* 002 * (C) Copyright 2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.nuxeo.runtime.jtajca; 018 019import java.util.Date; 020import java.util.HashMap; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.LinkedList; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.TimerTask; 028import java.util.stream.Collectors; 029 030import javax.resource.ResourceException; 031import javax.transaction.TransactionManager; 032 033import org.apache.commons.logging.LogFactory; 034import org.apache.geronimo.connector.outbound.AbstractConnectionManager; 035import org.apache.geronimo.connector.outbound.ConnectionHandleInterceptor; 036import org.apache.geronimo.connector.outbound.ConnectionInfo; 037import org.apache.geronimo.connector.outbound.ConnectionInterceptor; 038import org.apache.geronimo.connector.outbound.ConnectionReturnAction; 039import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor; 040import org.apache.geronimo.connector.outbound.GenericConnectionManager; 041import org.apache.geronimo.connector.outbound.MCFConnectionInterceptor; 042import org.apache.geronimo.connector.outbound.PoolIdleReleaserTimer; 043import org.apache.geronimo.connector.outbound.SubjectInterceptor; 044import org.apache.geronimo.connector.outbound.SubjectSource; 045import org.apache.geronimo.connector.outbound.TCCLInterceptor; 046import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PartitionedPool; 047import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport; 048import org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport; 049import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker; 050import org.apache.geronimo.transaction.manager.RecoverableTransactionManager; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Setups a connection according to the pooling attributes, mainly duplicated from {@link GenericConnectionManager} for 056 * injecting a connection validation interceptor. 057 * 058 * @since 8.3 059 */ 060public class NuxeoConnectionManager extends AbstractConnectionManager { 061 private static final long serialVersionUID = 1L; 062 protected static final Logger log = LoggerFactory.getLogger(NuxeoConnectionManager.class); 063 064 final NuxeoConnectionTrackingCoordinator coordinator; 065 066 public NuxeoConnectionManager(int activettl, NuxeoValidationSupport validationSupport, 067 TransactionSupport transactionSupport, PoolingSupport pooling, SubjectSource subjectSource, 068 NuxeoConnectionTrackingCoordinator connectionTracker, RecoverableTransactionManager transactionManager, 069 String name, ClassLoader classLoader) { 070 super(new InterceptorsImpl(validationSupport, transactionSupport, pooling, subjectSource, name, 071 connectionTracker, transactionManager, classLoader), transactionManager, name); 072 coordinator = connectionTracker; 073 activemonitor = new ActiveMonitor(activettl); 074 nosharing = new Nosharing(); 075 } 076 077 static class InterceptorsImpl implements AbstractConnectionManager.Interceptors { 078 079 private final ConnectionInterceptor stack; 080 private final ConnectionInterceptor recoveryStack; 081 private final PoolingSupport poolingSupport; 082 083 /** 084 * Order of constructed interceptors: 085 * <p/> 086 * ConnectionTrackingInterceptor (connectionTracker != null) TCCLInterceptor ConnectionHandleInterceptor 087 * ValidationHandleInterceptor TransactionCachingInterceptor (useTransactions & useTransactionCaching) 088 * TransactionEnlistingInterceptor (useTransactions) SubjectInterceptor (realmBridge != null) 089 * SinglePoolConnectionInterceptor or MultiPoolConnectionInterceptor LocalXAResourceInsertionInterceptor or 090 * XAResourceInsertionInterceptor (useTransactions (&localTransactions)) MCFConnectionInterceptor 091 */ 092 public InterceptorsImpl(NuxeoValidationSupport validationSupport, TransactionSupport transactionSupport, 093 PoolingSupport pooling, SubjectSource subjectSource, String name, ConnectionTracker connectionTracker, 094 TransactionManager transactionManager, ClassLoader classLoader) { 095 poolingSupport = pooling; 096 // check for consistency between attributes 097 if (subjectSource == null && pooling instanceof PartitionedPool 098 && ((PartitionedPool) pooling).isPartitionBySubject()) { 099 throw new IllegalStateException("To use Subject in pooling, you need a SecurityDomain"); 100 } 101 102 // Set up the interceptor stack 103 MCFConnectionInterceptor tail = new MCFConnectionInterceptor(); 104 ConnectionInterceptor stack = tail; 105 stack = transactionSupport.addXAResourceInsertionInterceptor(stack, name); 106 stack = pooling.addPoolingInterceptors(stack); 107 if (log.isTraceEnabled()) { 108 log.trace("Connection Manager " + name + " installed pool " + stack); 109 } 110 stack = validationSupport.addValidationInterceptors(stack); 111 stack = transactionSupport.addTransactionInterceptors(stack, transactionManager); 112 113 if (subjectSource != null) { 114 stack = new SubjectInterceptor(stack, subjectSource); 115 } 116 117 if (transactionSupport.isRecoverable()) { 118 recoveryStack = new TCCLInterceptor(stack, classLoader); 119 } else { 120 recoveryStack = null; 121 } 122 123 stack = new ConnectionHandleInterceptor(stack); 124 stack = new TCCLInterceptor(stack, classLoader); 125 if (connectionTracker != null) { 126 stack = new ConnectionTrackingInterceptor(stack, name, connectionTracker); 127 } 128 tail.setStack(stack); 129 this.stack = stack; 130 if (log.isDebugEnabled()) { 131 StringBuilder s = new StringBuilder("ConnectionManager Interceptor stack;\n"); 132 stack.info(s); 133 log.debug(s.toString()); 134 } 135 } 136 137 @Override 138 public ConnectionInterceptor getStack() { 139 return stack; 140 } 141 142 @Override 143 public ConnectionInterceptor getRecoveryStack() { 144 return recoveryStack; 145 } 146 147 @Override 148 public PoolingSupport getPoolingAttributes() { 149 return poolingSupport; 150 } 151 152 } 153 154 @Override 155 public void doStop() throws Exception { 156 if (getConnectionCount() < getPartitionMinSize()) { 157 Thread.sleep(10); // wait for filling tasks completion 158 } 159 super.doStop(); 160 } 161 162 /** 163 * 164 * @see ActiveMonitor#killTimedoutConnections 165 * @since 8.4 166 */ 167 public List<ActiveMonitor.TimeToLive> killActiveTimedoutConnections(long clock) { 168 return activemonitor.killTimedoutConnections(clock); 169 } 170 171 /** 172 * 173 * @see NuxeoConnectionTrackingCoordinator#k 174 * @since 8.4 175 */ 176 public long getKilledConnectionCount() { 177 return activemonitor.killedCount; 178 } 179 180 final ActiveMonitor activemonitor; 181 182 class ActiveMonitor implements ConnectionTracker { 183 184 final int ttl; 185 186 ActiveMonitor(int delay) { 187 ttl = delay; 188 if (ttl > 0) { 189 scheduleCleanups(); 190 } 191 coordinator.addTracker(this); 192 } 193 194 final Map<ConnectionInfo, TimeToLive> ttls = new HashMap<>(); 195 196 long killedCount = 0L; 197 198 CleanupTask cleanup = new CleanupTask(); 199 200 class CleanupTask extends TimerTask { 201 202 @Override 203 public void run() { 204 killActiveTimedoutConnections(System.currentTimeMillis()); 205 } 206 207 } 208 209 void cancelCleanups() { 210 cleanup.cancel(); 211 } 212 213 void scheduleCleanups() { 214 PoolIdleReleaserTimer.getTimer().scheduleAtFixedRate(cleanup, 60 * 1000, 60 * 1000); 215 } 216 217 @Override 218 public synchronized void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor, 219 ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException { 220 int delay = ttl(); 221 if (delay > 0) { 222 ttls.put(connectionInfo, new TimeToLive(connectionInfo, Thread.currentThread().getName(), System.currentTimeMillis(), delay)); 223 } 224 } 225 226 @Override 227 public synchronized void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor, 228 ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) { 229 ttls.remove(connectionInfo); 230 } 231 232 @Override 233 public void setEnvironment(ConnectionInfo connectionInfo, String key) { 234 return; 235 } 236 237 /** 238 * Kill active connection that have timed out relative to the given {@code clock}. 239 * 240 * @return information about the killed connections 241 * @param clock 242 * @since 8.4 243 */ 244 public synchronized List<TimeToLive> killTimedoutConnections(long clock) { 245 List<TimeToLive> killed = new LinkedList<>(); 246 Iterator<TimeToLive> it = ttls.values().iterator(); 247 while (it.hasNext()) { 248 TimeToLive ttl = it.next(); 249 if (ttl.deadline <= clock) { 250 ttl.killAndLog(); 251 killed.add(ttl); 252 it.remove(); 253 } 254 255 } 256 return killed; 257 } 258 259 /** 260 * Logs active connections 261 * 262 * 263 * @since 8.4 264 */ 265 public void log() { 266 for (TimeToLive ttl : ttls.values()) { 267 LogFactory.getLog(TimeToLive.class).warn(ttl, ttl.info.getTrace()); 268 } 269 } 270 271 272 /** 273 * List active connections 274 * 275 * 276 * @since 8.4 277 */ 278 public Set<TimeToLive> list() { 279 return new HashSet<>(ttls.values()); 280 } 281 282 public class TimeToLive { 283 284 public final ConnectionInfo info; 285 286 public final String threadName; 287 288 public final long obtained; 289 290 public final long deadline; 291 292 TimeToLive(ConnectionInfo info, String threadName, long obtained, int delay) { 293 this.info = info; 294 this.threadName = threadName; 295 this.obtained = System.currentTimeMillis(); 296 deadline = obtained + delay; 297 } 298 299 void killAndLog() { 300 try { 301 info.getManagedConnectionInfo().getPoolInterceptor().returnConnection(info, 302 ConnectionReturnAction.DESTROY); 303 } catch (Throwable error) { 304 if (error instanceof InterruptedException) { 305 Thread.currentThread().interrupt(); 306 throw error; 307 } 308 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class) 309 .error("Caught error while killing " + info, error); 310 } finally { 311 killedCount += 1; 312 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class) 313 .error("Killed " + message(new StringBuilder()), info.getTrace()); 314 } 315 } 316 317 void log(long clock) { 318 if (deadline < clock) { 319 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).info(message(new StringBuilder()), 320 info.getTrace()); 321 } else { 322 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).error(message(new StringBuilder()), 323 info.getTrace()); 324 } 325 } 326 327 public StringBuilder message(StringBuilder builder) { 328 return builder.append(info).append(", was obtained by ").append(threadName).append(" at ") 329 .append(new Date(obtained)).append(" and timed out at ").append(new Date(deadline)); 330 } 331 332 @Override 333 public String toString() { 334 return String.format("TimeToLive(%x) %s", hashCode(), message(new StringBuilder()).toString()); 335 } 336 } 337 338 final ThreadLocal<Integer> context = new ThreadLocal<>(); 339 340 public void enter(int delay) { 341 context.set(delay); 342 }; 343 344 public void exit() { 345 context.remove(); 346 } 347 348 int ttl() { 349 Integer value = context.get(); 350 if (value != null) { 351 return value.intValue(); 352 } 353 return ttl; 354 } 355 } 356 357 public int getActiveTimeoutMinutes() { 358 return activemonitor.ttl / (60 * 1000); 359 } 360 361 public Set<ConnectionInfo> listActive() { 362 return activemonitor.ttls.values().stream().map(ttl -> ttl.info).collect(Collectors.toSet()); 363 } 364 365 public void enterActiveMonitor(int delay) { 366 activemonitor.enter(delay); 367 } 368 369 public void exitActiveTimedout() { 370 activemonitor.exit(); 371 } 372 373 final Nosharing nosharing; 374 375 class Nosharing implements ConnectionTracker { 376 377 Nosharing() { 378 coordinator.addTracker(this); 379 } 380 381 @Override 382 public void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor, 383 ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException { 384 385 } 386 387 @Override 388 public void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor, 389 ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) { 390 391 } 392 393 final ThreadLocal<Boolean> noSharingHolder = new ThreadLocal<Boolean>(); 394 395 @Override 396 public void setEnvironment(ConnectionInfo connectionInfo, String key) { 397 connectionInfo.setUnshareable(noSharingHolder.get() == null ? false : true); 398 } 399 400 void enter() { 401 noSharingHolder.set(Boolean.TRUE); 402 } 403 404 void exit() { 405 noSharingHolder.remove(); 406 } 407 408 } 409 410 public void enterNoSharing() { 411 nosharing.enter(); 412 } 413 414 public void exitNoSharing() { 415 nosharing.exit(); 416 } 417 418}