001/*
002 * (C) Copyright 2006-2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.repository;
021
022import static javax.transaction.Status.STATUS_COMMITTED;
023import static javax.transaction.Status.STATUS_ROLLEDBACK;
024
025import java.time.Duration;
026import java.util.List;
027import java.util.Map;
028import java.util.NoSuchElementException;
029import java.util.concurrent.ConcurrentHashMap;
030
031import javax.transaction.Synchronization;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
036import org.apache.commons.pool2.KeyedObjectPool;
037import org.apache.commons.pool2.PoolUtils;
038import org.apache.commons.pool2.PooledObject;
039import org.apache.commons.pool2.impl.DefaultPooledObject;
040import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
041import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
042import org.nuxeo.common.utils.DurationUtils;
043import org.nuxeo.ecm.core.api.DocumentNotFoundException;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.core.api.UnrestrictedSessionRunner;
046import org.nuxeo.ecm.core.api.repository.PoolConfiguration;
047import org.nuxeo.ecm.core.api.repository.RepositoryManager;
048import org.nuxeo.ecm.core.model.Repository;
049import org.nuxeo.ecm.core.model.Session;
050import org.nuxeo.runtime.api.Framework;
051import org.nuxeo.runtime.cluster.ClusterService;
052import org.nuxeo.runtime.model.ComponentContext;
053import org.nuxeo.runtime.model.ComponentManager;
054import org.nuxeo.runtime.model.ComponentName;
055import org.nuxeo.runtime.model.ComponentStartOrders;
056import org.nuxeo.runtime.model.DefaultComponent;
057import org.nuxeo.runtime.transaction.TransactionHelper;
058
059/**
060 * Component and service managing low-level repository instances.
061 */
062public class RepositoryService extends DefaultComponent {
063
064    public static final ComponentName NAME = new ComponentName("org.nuxeo.ecm.core.repository.RepositoryService");
065
066    /** @since 11.1 */
067    public static final String CLUSTER_START_DURATION_PROP = "org.nuxeo.repository.cluster.start.duration";
068
069    /** @since 11.1 */
070    public static final Duration CLUSTER_START_DURATION_DEFAULT = Duration.ofMinutes(1);
071
072    private static final Log log = LogFactory.getLog(RepositoryService.class);
073
074    public static final String XP_REPOSITORY = "repository";
075
076    private final Map<String, Repository> repositories = new ConcurrentHashMap<>();
077
078    // for monitoring
079    protected GenericKeyedObjectPool<String, Session> basePool;
080
081    protected PoolConfiguration poolConfig;
082
083    protected KeyedObjectPool<String, Session> pool;
084
085    public void shutdown() {
086        log.info("Shutting down repository manager");
087        repositories.values().forEach(Repository::shutdown);
088        repositories.clear();
089    }
090
091    @Override
092    public int getApplicationStartedOrder() {
093        return ComponentStartOrders.REPOSITORY;
094    }
095
096    @Override
097    public void start(ComponentContext context) {
098        initPool();
099        TransactionHelper.runInTransaction(this::doCreateRepositories);
100        Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() {
101            @Override
102            public void afterStart(ComponentManager mgr, boolean isResume) {
103                initRepositories(); // call all RepositoryInitializationHandler
104            }
105
106            @Override
107            public void afterStop(ComponentManager mgr, boolean isStandby) {
108                Framework.getRuntime().getComponentManager().removeListener(this);
109            }
110        });
111    }
112
113    @Override
114    public void stop(ComponentContext context) {
115        TransactionHelper.runInTransaction(this::shutdown);
116        shutdownPool();
117    }
118
119    protected void initPoolConfig() {
120        PoolConfiguration poolConfig = null; // NOSONAR
121        RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
122        if (repositoryManager != null) {
123            org.nuxeo.ecm.core.api.repository.Repository repo = repositoryManager.getDefaultRepository();
124            if (repo != null) {
125                poolConfig = repo.getPoolConfig();
126            }
127        }
128        if (poolConfig == null) {
129            poolConfig = new PoolConfiguration();
130        }
131        this.poolConfig = poolConfig;
132    }
133
134    protected void initPool() {
135        initPoolConfig();
136        GenericKeyedObjectPoolConfig<Session> config = new GenericKeyedObjectPoolConfig<>();
137        config.setMaxTotal(poolConfig.getMaxPoolSize());
138        config.setMaxTotalPerKey(poolConfig.getMaxPoolSize());
139        config.setMaxIdlePerKey(poolConfig.getMaxPoolSize());
140        config.setMinIdlePerKey(poolConfig.getMinPoolSize());
141        config.setMaxWaitMillis(poolConfig.getBlockingTimeoutMillis());
142        basePool = new GenericKeyedObjectPool<>(new SessionFactory(), config);
143        // use an eroding pool to avoid keeping idle sessions too long
144        pool = PoolUtils.erodingPool(basePool);
145    }
146
147    protected void shutdownPool() {
148        pool.close();
149    }
150
151    public void resetPool() {
152        basePool.clear();
153    }
154
155    // for monitoring
156    public GenericKeyedObjectPool<String, ?> getPool() {
157        return basePool;
158    }
159
160    /**
161     * Start a tx and initialize repositories content. This method is publicly exposed since it is needed by tests to
162     * initialize repositories after cleanups (see CoreFeature).
163     *
164     * @since 8.4
165     */
166    public void initRepositories() {
167        TransactionHelper.runInTransaction(this::doInitRepositories);
168    }
169
170    /**
171     * Creates all the repositories. Requires an active transaction.
172     *
173     * @since 9.3
174     */
175    protected void doCreateRepositories() {
176        repositories.clear();
177        for (String repositoryName : getRepositoryNames()) {
178            RepositoryFactory factory = getFactory(repositoryName);
179            if (factory == null) {
180                continue;
181            }
182            createRepository(repositoryName, factory);
183        }
184    }
185
186    protected void createRepository(String repositoryName, RepositoryFactory factory) {
187        ClusterService clusterService = Framework.getService(ClusterService.class);
188        String prop = Framework.getProperty(CLUSTER_START_DURATION_PROP);
189        Duration duration = DurationUtils.parsePositive(prop, CLUSTER_START_DURATION_DEFAULT);
190        Duration pollDelay = Duration.ofSeconds(1);
191        clusterService.runAtomically("start-repository-" + repositoryName, duration, pollDelay, () -> {
192            Repository repository = (Repository) factory.call();
193            repositories.put(repositoryName, repository);
194        });
195    }
196
197    /**
198     * Initializes all the repositories. Requires an active transaction.
199     *
200     * @since 9.3
201     */
202    protected void doInitRepositories() {
203        // give up if no handler configured
204        RepositoryInitializationHandler handler = RepositoryInitializationHandler.getInstance();
205        if (handler == null) {
206            return;
207        }
208        // invoke handlers
209        for (String name : getRepositoryNames()) {
210            initializeRepository(handler, name);
211        }
212    }
213
214    @Override
215    public <T> T getAdapter(Class<T> adapter) {
216        if (adapter.isAssignableFrom(getClass())) {
217            return adapter.cast(this);
218        }
219        return null;
220    }
221
222    protected void initializeRepository(final RepositoryInitializationHandler handler, String name) {
223        new UnrestrictedSessionRunner(name) {
224            @Override
225            public void run() {
226                handler.initializeRepository(session);
227            }
228        }.runUnrestricted();
229    }
230
231    /**
232     * Gets a repository given its name.
233     * <p>
234     * Null is returned if no repository with that name was registered.
235     *
236     * @param repositoryName the repository name
237     * @return the repository instance or null if no repository with that name was registered
238     */
239    public Repository getRepository(String repositoryName) {
240        return repositories.get(repositoryName);
241    }
242
243    protected RepositoryFactory getFactory(String repositoryName) {
244        RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
245        if (repositoryManager == null) {
246            // tests with no high-level repository manager
247            return null;
248        }
249        org.nuxeo.ecm.core.api.repository.Repository repo = repositoryManager.getRepository(repositoryName);
250        if (repo == null) {
251            return null;
252        }
253        RepositoryFactory repositoryFactory = (RepositoryFactory) repo.getRepositoryFactory();
254        if (repositoryFactory == null) {
255            throw new NullPointerException("Missing repositoryFactory for repository: " + repositoryName);
256        }
257        return repositoryFactory;
258    }
259
260    public List<String> getRepositoryNames() {
261        RepositoryManager repositoryManager = Framework.getService(RepositoryManager.class);
262        return repositoryManager.getRepositoryNames();
263    }
264
265    public int getActiveSessionsCount() {
266        return pool.getNumActive();
267    }
268
269    public int getActiveSessionsCount(String repositoryName) {
270        return pool.getNumActive(repositoryName);
271    }
272
273    /**
274     * Thread-local sessions allocated, per repository.
275     */
276    protected static final Map<String, ThreadLocal<Session>> SESSIONS = new ConcurrentHashMap<>(1);
277
278    /**
279     * Gets a session.
280     * <p>
281     * The session is first looked up in the current transaction, otherwise fetched from a pool.
282     *
283     * @param repositoryName the repository name
284     * @return the session
285     * @since 11.1
286     */
287    public Session getSession(String repositoryName) {
288        if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
289            throw new NuxeoException("Cannot use a session outside a transaction");
290        }
291        TransactionHelper.checkTransactionTimeout();
292        ThreadLocal<Session> threadSessions = SESSIONS.computeIfAbsent(repositoryName, r -> new ThreadLocal<>());
293        Session session = threadSessions.get();
294        if (session == null) {
295            if (!TransactionHelper.isTransactionActive()) {
296                throw new NuxeoException("Cannot use a session when transaction is marked rollback-only");
297            }
298            session = getSessionFromPool(repositoryName, threadSessions::remove);
299            threadSessions.set(session);
300        }
301        return session;
302    }
303
304    protected Session getSessionFromPool(String repositoryName, Runnable cleanup) {
305        Session session;
306        try {
307            session = pool.borrowObject(repositoryName);
308        } catch (NoSuchElementException e) {
309            String err = String.format(
310                    "Connection pool is fully used,"
311                            + " consider increasing nuxeo.vcs.blocking-timeout-millis (currently %s)"
312                            + " or nuxeo.vcs.max-pool-size (currently %s)",
313                    poolConfig.getBlockingTimeoutMillis(), poolConfig.getMaxPoolSize());
314            throw new NuxeoException(err, e);
315        } catch (RuntimeException e) {
316            throw e;
317        } catch (Exception e) {
318            if (e instanceof InterruptedException) { // NOSONAR
319                Thread.currentThread().interrupt();
320            }
321            throw new NuxeoException(e);
322        }
323        // register synchronization for transaction commit/rollback
324        // and to return to pool and remove from thread-local at end of transaction
325        TransactionHelper.registerSynchronization(new SessionSynchronization(session, cleanup));
326        session.start();
327        return session;
328    }
329
330    /** @since 11.1 */
331    protected class SessionSynchronization implements Synchronization {
332
333        protected final Session session;
334
335        protected final Runnable cleanup;
336
337        protected SessionSynchronization(Session session, Runnable cleanup) {
338            this.session = session;
339            this.cleanup = cleanup;
340        }
341
342        @Override
343        public void beforeCompletion() {
344            session.end();
345        }
346
347        @Override
348        public void afterCompletion(int status) {
349            boolean completedAbruptly = true;
350            try {
351                if (status == STATUS_COMMITTED) {
352                    session.commit();
353                } else if (status == STATUS_ROLLEDBACK) {
354                    session.rollback();
355                } else {
356                    log.error("Unexpected afterCompletion status: " + status);
357                }
358                completedAbruptly = false;
359            } finally {
360                try {
361                    String repositoryName = session.getRepositoryName();
362                    if (status == STATUS_COMMITTED && !completedAbruptly) {
363                        pool.returnObject(repositoryName, session);
364                    } else {
365                        pool.invalidateObject(repositoryName, session);
366                    }
367                } catch (Exception e) {
368                    if (e instanceof InterruptedException) { // NOSONAR
369                        Thread.currentThread().interrupt();
370                    }
371                    log.error(e, e);
372                } finally {
373                    cleanup.run();
374                }
375            }
376        }
377    }
378
379    /** @since 11.1 */
380    protected class SessionFactory extends BaseKeyedPooledObjectFactory<String, Session> {
381
382        @Override
383        public Session create(String repositoryName) throws Exception {
384            Repository repository = getRepository(repositoryName);
385            if (repository == null) {
386                throw new DocumentNotFoundException("No such repository: " + repositoryName);
387            }
388            return repository.getSession();
389        }
390
391        @Override
392        public PooledObject<Session> wrap(Session session) {
393            return new DefaultPooledObject<>(session);
394        }
395
396        @Override
397        public void destroyObject(String repositoryName, PooledObject<Session> p) throws Exception {
398            p.getObject().destroy();
399        }
400    }
401
402}