001/* 002 * (C) Copyright 2006-2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Bogdan Stefanescu 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.repository; 021 022import static javax.transaction.Status.STATUS_COMMITTED; 023import static javax.transaction.Status.STATUS_ROLLEDBACK; 024 025import java.time.Duration; 026import java.util.List; 027import java.util.Map; 028import java.util.NoSuchElementException; 029import java.util.concurrent.ConcurrentHashMap; 030 031import javax.transaction.Synchronization; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; 036import org.apache.commons.pool2.KeyedObjectPool; 037import org.apache.commons.pool2.PoolUtils; 038import org.apache.commons.pool2.PooledObject; 039import org.apache.commons.pool2.impl.DefaultPooledObject; 040import org.apache.commons.pool2.impl.GenericKeyedObjectPool; 041import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; 042import org.nuxeo.common.utils.DurationUtils; 043import org.nuxeo.ecm.core.api.DocumentNotFoundException; 044import org.nuxeo.ecm.core.api.NuxeoException; 045import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner; 046import org.nuxeo.ecm.core.api.repository.PoolConfiguration; 047import org.nuxeo.ecm.core.api.repository.RepositoryManager; 048import org.nuxeo.ecm.core.model.Repository; 049import org.nuxeo.ecm.core.model.Session; 050import org.nuxeo.runtime.api.Framework; 051import org.nuxeo.runtime.cluster.ClusterService; 052import org.nuxeo.runtime.model.ComponentContext; 053import org.nuxeo.runtime.model.ComponentManager; 054import org.nuxeo.runtime.model.ComponentName; 055import org.nuxeo.runtime.model.ComponentStartOrders; 056import org.nuxeo.runtime.model.DefaultComponent; 057import org.nuxeo.runtime.transaction.TransactionHelper; 058 059/** 060 * Component and service managing low-level repository instances. 061 */ 062public class RepositoryService extends DefaultComponent { 063 064 public static final ComponentName NAME = new ComponentName("org.nuxeo.ecm.core.repository.RepositoryService"); 065 066 /** @since 11.1 */ 067 public static final String CLUSTER_START_DURATION_PROP = "org.nuxeo.repository.cluster.start.duration"; 068 069 /** @since 11.1 */ 070 public static final Duration CLUSTER_START_DURATION_DEFAULT = Duration.ofMinutes(1); 071 072 private static final Log log = LogFactory.getLog(RepositoryService.class); 073 074 public static final String XP_REPOSITORY = "repository"; 075 076 private final Map<String, Repository> repositories = new ConcurrentHashMap<>(); 077 078 // for monitoring 079 protected GenericKeyedObjectPool<String, Session> basePool; 080 081 protected PoolConfiguration poolConfig; 082 083 protected KeyedObjectPool<String, Session> pool; 084 085 public void shutdown() { 086 log.info("Shutting down repository manager"); 087 repositories.values().forEach(Repository::shutdown); 088 repositories.clear(); 089 } 090 091 @Override 092 public int getApplicationStartedOrder() { 093 return ComponentStartOrders.REPOSITORY; 094 } 095 096 @Override 097 public void start(ComponentContext context) { 098 initPool(); 099 TransactionHelper.runInTransaction(this::doCreateRepositories); 100 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 101 @Override 102 public void afterStart(ComponentManager mgr, boolean isResume) { 103 initRepositories(); // call all RepositoryInitializationHandler 104 } 105 106 @Override 107 public void afterStop(ComponentManager mgr, boolean isStandby) { 108 Framework.getRuntime().getComponentManager().removeListener(this); 109 } 110 }); 111 } 112 113 @Override 114 public void stop(ComponentContext context) { 115 TransactionHelper.runInTransaction(this::shutdown); 116 shutdownPool(); 117 } 118 119 protected void initPoolConfig() { 120 PoolConfiguration poolConfig = null; // NOSONAR 121 RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 122 if (repositoryManager != null) { 123 org.nuxeo.ecm.core.api.repository.Repository repo = repositoryManager.getDefaultRepository(); 124 if (repo != null) { 125 poolConfig = repo.getPoolConfig(); 126 } 127 } 128 if (poolConfig == null) { 129 poolConfig = new PoolConfiguration(); 130 } 131 this.poolConfig = poolConfig; 132 } 133 134 protected void initPool() { 135 initPoolConfig(); 136 GenericKeyedObjectPoolConfig<Session> config = new GenericKeyedObjectPoolConfig<>(); 137 config.setMaxTotal(poolConfig.getMaxPoolSize()); 138 config.setMaxTotalPerKey(poolConfig.getMaxPoolSize()); 139 config.setMaxIdlePerKey(poolConfig.getMaxPoolSize()); 140 config.setMinIdlePerKey(poolConfig.getMinPoolSize()); 141 config.setMaxWaitMillis(poolConfig.getBlockingTimeoutMillis()); 142 basePool = new GenericKeyedObjectPool<>(new SessionFactory(), config); 143 // use an eroding pool to avoid keeping idle sessions too long 144 pool = PoolUtils.erodingPool(basePool); 145 } 146 147 protected void shutdownPool() { 148 pool.close(); 149 } 150 151 public void resetPool() { 152 basePool.clear(); 153 } 154 155 // for monitoring 156 public GenericKeyedObjectPool<String, ?> getPool() { 157 return basePool; 158 } 159 160 /** 161 * Start a tx and initialize repositories content. This method is publicly exposed since it is needed by tests to 162 * initialize repositories after cleanups (see CoreFeature). 163 * 164 * @since 8.4 165 */ 166 public void initRepositories() { 167 TransactionHelper.runInTransaction(this::doInitRepositories); 168 } 169 170 /** 171 * Creates all the repositories. Requires an active transaction. 172 * 173 * @since 9.3 174 */ 175 protected void doCreateRepositories() { 176 repositories.clear(); 177 for (String repositoryName : getRepositoryNames()) { 178 RepositoryFactory factory = getFactory(repositoryName); 179 if (factory == null) { 180 continue; 181 } 182 createRepository(repositoryName, factory); 183 } 184 } 185 186 protected void createRepository(String repositoryName, RepositoryFactory factory) { 187 ClusterService clusterService = Framework.getService(ClusterService.class); 188 String prop = Framework.getProperty(CLUSTER_START_DURATION_PROP); 189 Duration duration = DurationUtils.parsePositive(prop, CLUSTER_START_DURATION_DEFAULT); 190 Duration pollDelay = Duration.ofSeconds(1); 191 clusterService.runAtomically("start-repository-" + repositoryName, duration, pollDelay, () -> { 192 Repository repository = (Repository) factory.call(); 193 repositories.put(repositoryName, repository); 194 }); 195 } 196 197 /** 198 * Initializes all the repositories. Requires an active transaction. 199 * 200 * @since 9.3 201 */ 202 protected void doInitRepositories() { 203 // give up if no handler configured 204 RepositoryInitializationHandler handler = RepositoryInitializationHandler.getInstance(); 205 if (handler == null) { 206 return; 207 } 208 // invoke handlers 209 for (String name : getRepositoryNames()) { 210 initializeRepository(handler, name); 211 } 212 } 213 214 @Override 215 public <T> T getAdapter(Class<T> adapter) { 216 if (adapter.isAssignableFrom(getClass())) { 217 return adapter.cast(this); 218 } 219 return null; 220 } 221 222 protected void initializeRepository(final RepositoryInitializationHandler handler, String name) { 223 new UnrestrictedSessionRunner(name) { 224 @Override 225 public void run() { 226 handler.initializeRepository(session); 227 } 228 }.runUnrestricted(); 229 } 230 231 /** 232 * Gets a repository given its name. 233 * <p> 234 * Null is returned if no repository with that name was registered. 235 * 236 * @param repositoryName the repository name 237 * @return the repository instance or null if no repository with that name was registered 238 */ 239 public Repository getRepository(String repositoryName) { 240 return repositories.get(repositoryName); 241 } 242 243 protected RepositoryFactory getFactory(String repositoryName) { 244 RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 245 if (repositoryManager == null) { 246 // tests with no high-level repository manager 247 return null; 248 } 249 org.nuxeo.ecm.core.api.repository.Repository repo = repositoryManager.getRepository(repositoryName); 250 if (repo == null) { 251 return null; 252 } 253 RepositoryFactory repositoryFactory = (RepositoryFactory) repo.getRepositoryFactory(); 254 if (repositoryFactory == null) { 255 throw new NullPointerException("Missing repositoryFactory for repository: " + repositoryName); 256 } 257 return repositoryFactory; 258 } 259 260 public List<String> getRepositoryNames() { 261 RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class); 262 return repositoryManager.getRepositoryNames(); 263 } 264 265 public int getActiveSessionsCount() { 266 return pool.getNumActive(); 267 } 268 269 public int getActiveSessionsCount(String repositoryName) { 270 return pool.getNumActive(repositoryName); 271 } 272 273 /** 274 * Thread-local sessions allocated, per repository. 275 */ 276 protected static final Map<String, ThreadLocal<Session>> SESSIONS = new ConcurrentHashMap<>(1); 277 278 /** 279 * Gets a session. 280 * <p> 281 * The session is first looked up in the current transaction, otherwise fetched from a pool. 282 * 283 * @param repositoryName the repository name 284 * @return the session 285 * @since 11.1 286 */ 287 public Session getSession(String repositoryName) { 288 if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) { 289 throw new NuxeoException("Cannot use a session outside a transaction"); 290 } 291 TransactionHelper.checkTransactionTimeout(); 292 ThreadLocal<Session> threadSessions = SESSIONS.computeIfAbsent(repositoryName, r -> new ThreadLocal<>()); 293 Session session = threadSessions.get(); 294 if (session == null) { 295 if (!TransactionHelper.isTransactionActive()) { 296 throw new NuxeoException("Cannot use a session when transaction is marked rollback-only"); 297 } 298 session = getSessionFromPool(repositoryName, threadSessions::remove); 299 threadSessions.set(session); 300 } 301 return session; 302 } 303 304 protected Session getSessionFromPool(String repositoryName, Runnable cleanup) { 305 Session session; 306 try { 307 session = pool.borrowObject(repositoryName); 308 } catch (NoSuchElementException e) { 309 String err = String.format( 310 "Connection pool is fully used," 311 + " consider increasing nuxeo.vcs.blocking-timeout-millis (currently %s)" 312 + " or nuxeo.vcs.max-pool-size (currently %s)", 313 poolConfig.getBlockingTimeoutMillis(), poolConfig.getMaxPoolSize()); 314 throw new NuxeoException(err, e); 315 } catch (RuntimeException e) { 316 throw e; 317 } catch (Exception e) { 318 if (e instanceof InterruptedException) { // NOSONAR 319 Thread.currentThread().interrupt(); 320 } 321 throw new NuxeoException(e); 322 } 323 // register synchronization for transaction commit/rollback 324 // and to return to pool and remove from thread-local at end of transaction 325 TransactionHelper.registerSynchronization(new SessionSynchronization(session, cleanup)); 326 session.start(); 327 return session; 328 } 329 330 /** @since 11.1 */ 331 protected class SessionSynchronization implements Synchronization { 332 333 protected final Session session; 334 335 protected final Runnable cleanup; 336 337 protected SessionSynchronization(Session session, Runnable cleanup) { 338 this.session = session; 339 this.cleanup = cleanup; 340 } 341 342 @Override 343 public void beforeCompletion() { 344 session.end(); 345 } 346 347 @Override 348 public void afterCompletion(int status) { 349 boolean completedAbruptly = true; 350 try { 351 if (status == STATUS_COMMITTED) { 352 session.commit(); 353 } else if (status == STATUS_ROLLEDBACK) { 354 session.rollback(); 355 } else { 356 log.error("Unexpected afterCompletion status: " + status); 357 } 358 completedAbruptly = false; 359 } finally { 360 try { 361 String repositoryName = session.getRepositoryName(); 362 if (status == STATUS_COMMITTED && !completedAbruptly) { 363 pool.returnObject(repositoryName, session); 364 } else { 365 pool.invalidateObject(repositoryName, session); 366 } 367 } catch (Exception e) { 368 if (e instanceof InterruptedException) { // NOSONAR 369 Thread.currentThread().interrupt(); 370 } 371 log.error(e, e); 372 } finally { 373 cleanup.run(); 374 } 375 } 376 } 377 } 378 379 /** @since 11.1 */ 380 protected class SessionFactory extends BaseKeyedPooledObjectFactory<String, Session> { 381 382 @Override 383 public Session create(String repositoryName) throws Exception { 384 Repository repository = getRepository(repositoryName); 385 if (repository == null) { 386 throw new DocumentNotFoundException("No such repository: " + repositoryName); 387 } 388 return repository.getSession(); 389 } 390 391 @Override 392 public PooledObject<Session> wrap(Session session) { 393 return new DefaultPooledObject<>(session); 394 } 395 396 @Override 397 public void destroyObject(String repositoryName, PooledObject<Session> p) throws Exception { 398 p.getObject().destroy(); 399 } 400 } 401 402}