001/* 002 * (C) Copyright 2016-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.runtime.jtajca; 017 018import java.util.Date; 019import java.util.HashSet; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.TimerTask; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.stream.Collectors; 028 029import javax.resource.ResourceException; 030import javax.transaction.TransactionManager; 031 032import org.apache.commons.logging.LogFactory; 033import org.apache.geronimo.connector.outbound.AbstractConnectionManager; 034import org.apache.geronimo.connector.outbound.ConnectionHandleInterceptor; 035import org.apache.geronimo.connector.outbound.ConnectionInfo; 036import org.apache.geronimo.connector.outbound.ConnectionInterceptor; 037import org.apache.geronimo.connector.outbound.ConnectionReturnAction; 038import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor; 039import org.apache.geronimo.connector.outbound.GenericConnectionManager; 040import org.apache.geronimo.connector.outbound.MCFConnectionInterceptor; 041import org.apache.geronimo.connector.outbound.PoolIdleReleaserTimer; 042import org.apache.geronimo.connector.outbound.SubjectInterceptor; 043import org.apache.geronimo.connector.outbound.SubjectSource; 044import org.apache.geronimo.connector.outbound.TCCLInterceptor; 045import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PartitionedPool; 046import org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport; 047import org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport; 048import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker; 049import org.apache.geronimo.transaction.manager.RecoverableTransactionManager; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Setups a connection according to the pooling attributes, mainly duplicated from {@link GenericConnectionManager} for 055 * injecting a connection validation interceptor. 056 * 057 * @since 8.3 058 */ 059public class NuxeoConnectionManager extends AbstractConnectionManager { 060 061 private static final long serialVersionUID = 1L; 062 063 protected static final Logger log = LoggerFactory.getLogger(NuxeoConnectionManager.class); 064 065 protected final NuxeoConnectionTrackingCoordinator coordinator; 066 067 public NuxeoConnectionManager(int activettl, NuxeoValidationSupport validationSupport, 068 TransactionSupport transactionSupport, PoolingSupport pooling, SubjectSource subjectSource, 069 NuxeoConnectionTrackingCoordinator connectionTracker, RecoverableTransactionManager transactionManager, 070 String name, ClassLoader classLoader) { 071 super(new InterceptorsImpl(validationSupport, transactionSupport, pooling, subjectSource, name, 072 connectionTracker, transactionManager, classLoader), transactionManager, name); 073 coordinator = connectionTracker; 074 activemonitor = new ActiveMonitor(activettl); 075 nosharing = new Nosharing(); 076 } 077 078 static class InterceptorsImpl implements AbstractConnectionManager.Interceptors { 079 080 private final ConnectionInterceptor stack; 081 082 private final ConnectionInterceptor recoveryStack; 083 084 private final PoolingSupport poolingSupport; 085 086 /** 087 * Order of constructed interceptors: 088 * <p/> 089 * ConnectionTrackingInterceptor (connectionTracker != null) TCCLInterceptor ConnectionHandleInterceptor 090 * ValidationHandleInterceptor TransactionCachingInterceptor (useTransactions & useTransactionCaching) 091 * TransactionEnlistingInterceptor (useTransactions) SubjectInterceptor (realmBridge != null) 092 * SinglePoolConnectionInterceptor or MultiPoolConnectionInterceptor LocalXAResourceInsertionInterceptor or 093 * XAResourceInsertionInterceptor (useTransactions (&localTransactions)) MCFConnectionInterceptor 094 */ 095 public InterceptorsImpl(NuxeoValidationSupport validationSupport, TransactionSupport transactionSupport, 096 PoolingSupport pooling, SubjectSource subjectSource, String name, ConnectionTracker connectionTracker, 097 TransactionManager transactionManager, ClassLoader classLoader) { 098 poolingSupport = pooling; 099 // check for consistency between attributes 100 if (subjectSource == null && pooling instanceof PartitionedPool 101 && ((PartitionedPool) pooling).isPartitionBySubject()) { 102 throw new IllegalStateException("To use Subject in pooling, you need a SecurityDomain"); 103 } 104 105 // Set up the interceptor stack 106 MCFConnectionInterceptor tail = new MCFConnectionInterceptor(); 107 ConnectionInterceptor stack = tail; 108 stack = transactionSupport.addXAResourceInsertionInterceptor(stack, name); 109 stack = pooling.addPoolingInterceptors(stack); 110 if (log.isTraceEnabled()) { 111 log.trace("Connection Manager " + name + " installed pool " + stack); 112 } 113 stack = validationSupport.addValidationInterceptors(stack); 114 stack = transactionSupport.addTransactionInterceptors(stack, transactionManager); 115 116 if (subjectSource != null) { 117 stack = new SubjectInterceptor(stack, subjectSource); 118 } 119 120 if (transactionSupport.isRecoverable()) { 121 recoveryStack = new TCCLInterceptor(stack, classLoader); 122 } else { 123 recoveryStack = null; 124 } 125 126 stack = new ConnectionHandleInterceptor(stack); 127 stack = new TCCLInterceptor(stack, classLoader); 128 if (connectionTracker != null) { 129 stack = new ConnectionTrackingInterceptor(stack, name, connectionTracker); 130 } 131 tail.setStack(stack); 132 this.stack = stack; 133 if (log.isDebugEnabled()) { 134 StringBuilder s = new StringBuilder("ConnectionManager Interceptor stack;\n"); 135 stack.info(s); 136 log.debug(s.toString()); 137 } 138 } 139 140 @Override 141 public ConnectionInterceptor getStack() { 142 return stack; 143 } 144 145 @Override 146 public ConnectionInterceptor getRecoveryStack() { 147 return recoveryStack; 148 } 149 150 @Override 151 public PoolingSupport getPoolingAttributes() { 152 return poolingSupport; 153 } 154 155 } 156 157 @Override 158 public void doStop() throws Exception { 159 if (getConnectionCount() < getPartitionMinSize()) { 160 Thread.sleep(10); // wait for filling tasks completion 161 } 162 super.doStop(); 163 } 164 165 /** 166 * @see ActiveMonitor#killTimedoutConnections 167 * @since 8.4 168 */ 169 public List<ActiveMonitor.TimeToLive> killActiveTimedoutConnections(long clock) { 170 return activemonitor.killTimedoutConnections(clock); 171 } 172 173 /** 174 * @see ActiveMonitor#killedCount 175 * @since 8.4 176 */ 177 public long getKilledConnectionCount() { 178 return activemonitor.killedCount; 179 } 180 181 final ActiveMonitor activemonitor; 182 183 class ActiveMonitor implements ConnectionTracker { 184 185 final int ttl; 186 187 ActiveMonitor(int delay) { 188 ttl = delay; 189 if (ttl > 0) { 190 scheduleCleanups(); 191 } 192 coordinator.addTracker(this); 193 } 194 195 final Map<ConnectionInfo, TimeToLive> ttls = new ConcurrentHashMap<>(); 196 197 long killedCount = 0L; 198 199 CleanupTask cleanup = new CleanupTask(); 200 201 class CleanupTask extends TimerTask { 202 203 @Override 204 public void run() { 205 killActiveTimedoutConnections(System.currentTimeMillis()); 206 } 207 208 } 209 210 void cancelCleanups() { 211 cleanup.cancel(); 212 } 213 214 void scheduleCleanups() { 215 PoolIdleReleaserTimer.getTimer().scheduleAtFixedRate(cleanup, 60 * 1000, 60 * 1000); 216 } 217 218 @Override 219 public void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor, 220 ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException { 221 int delay = ttl(); 222 if (delay > 0) { 223 ttls.put(connectionInfo, new TimeToLive(connectionInfo, Thread.currentThread().getName(), 224 System.currentTimeMillis(), delay)); 225 } 226 } 227 228 @Override 229 public void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor, 230 ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) { 231 ttls.remove(connectionInfo); 232 } 233 234 @Override 235 public void setEnvironment(ConnectionInfo connectionInfo, String key) { 236 } 237 238 /** 239 * Kill active connection that have timed out relative to the given {@code clock}. 240 * 241 * @return information about the killed connections 242 * @since 8.4 243 */ 244 public List<TimeToLive> killTimedoutConnections(long clock) { 245 List<TimeToLive> killed = new LinkedList<>(); 246 Iterator<TimeToLive> it = ttls.values().iterator(); 247 while (it.hasNext()) { 248 TimeToLive ttl = it.next(); 249 if (ttl.deadline <= clock) { 250 ttl.killAndLog(); 251 killed.add(ttl); 252 it.remove(); 253 } 254 255 } 256 return killed; 257 } 258 259 /** 260 * Logs active connections 261 * 262 * @since 8.4 263 */ 264 public void log() { 265 for (TimeToLive ttl : ttls.values()) { 266 LogFactory.getLog(TimeToLive.class).warn(ttl, ttl.info.getTrace()); 267 } 268 } 269 270 /** 271 * List active connections 272 * 273 * @since 8.4 274 */ 275 public Set<TimeToLive> list() { 276 return new HashSet<>(ttls.values()); 277 } 278 279 public class TimeToLive { 280 281 public final ConnectionInfo info; 282 283 public final String threadName; 284 285 public final long obtained; 286 287 public final long deadline; 288 289 TimeToLive(ConnectionInfo info, String threadName, long obtained, int delay) { 290 this.info = info; 291 this.threadName = threadName; 292 this.obtained = System.currentTimeMillis(); 293 deadline = obtained + delay; 294 } 295 296 void killAndLog() { 297 try { 298 info.getManagedConnectionInfo().getPoolInterceptor().returnConnection(info, 299 ConnectionReturnAction.DESTROY); 300 } catch (Throwable error) { 301 if (error instanceof InterruptedException) { 302 Thread.currentThread().interrupt(); 303 throw error; 304 } 305 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class) 306 .error("Caught error while killing " + info, error); 307 } finally { 308 killedCount += 1; 309 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class) 310 .error("Killed " + message(new StringBuilder()), info.getTrace()); 311 } 312 } 313 314 void log(long clock) { 315 if (deadline < clock) { 316 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).info(message(new StringBuilder()), 317 info.getTrace()); 318 } else { 319 LogFactory.getLog(NuxeoConnectionTrackingCoordinator.class).error(message(new StringBuilder()), 320 info.getTrace()); 321 } 322 } 323 324 public StringBuilder message(StringBuilder builder) { 325 return builder.append(info) 326 .append(", was obtained by ") 327 .append(threadName) 328 .append(" at ") 329 .append(new Date(obtained)) 330 .append(" and timed out at ") 331 .append(new Date(deadline)); 332 } 333 334 @Override 335 public String toString() { 336 return String.format("TimeToLive(%x) %s", hashCode(), message(new StringBuilder()).toString()); 337 } 338 } 339 340 final ThreadLocal<Integer> context = new ThreadLocal<>(); 341 342 public void enter(int delay) { 343 context.set(delay); 344 } 345 346 public void exit() { 347 context.remove(); 348 } 349 350 int ttl() { 351 Integer value = context.get(); 352 if (value != null) { 353 return value.intValue(); 354 } 355 return ttl; 356 } 357 } 358 359 public int getActiveTimeoutMinutes() { 360 return activemonitor.ttl / (60 * 1000); 361 } 362 363 public Set<ConnectionInfo> listActive() { 364 return activemonitor.ttls.values().stream().map(ttl -> ttl.info).collect(Collectors.toSet()); 365 } 366 367 public void enterActiveMonitor(int delay) { 368 activemonitor.enter(delay); 369 } 370 371 public void exitActiveTimedout() { 372 activemonitor.exit(); 373 } 374 375 final Nosharing nosharing; 376 377 class Nosharing implements ConnectionTracker { 378 379 Nosharing() { 380 coordinator.addTracker(this); 381 } 382 383 @Override 384 public void handleObtained(ConnectionTrackingInterceptor connectionTrackingInterceptor, 385 ConnectionInfo connectionInfo, boolean reassociate) throws ResourceException { 386 387 } 388 389 @Override 390 public void handleReleased(ConnectionTrackingInterceptor connectionTrackingInterceptor, 391 ConnectionInfo connectionInfo, ConnectionReturnAction connectionReturnAction) { 392 393 } 394 395 final ThreadLocal<Boolean> noSharingHolder = new ThreadLocal<>(); 396 397 @Override 398 public void setEnvironment(ConnectionInfo connectionInfo, String key) { 399 connectionInfo.setUnshareable(noSharingHolder.get() != null); 400 } 401 402 void enter() { 403 noSharingHolder.set(Boolean.TRUE); 404 } 405 406 void exit() { 407 noSharingHolder.remove(); 408 } 409 410 } 411 412 public void enterNoSharing() { 413 nosharing.enter(); 414 } 415 416 public void exitNoSharing() { 417 nosharing.exit(); 418 } 419 420}