001/*
002 * Copyright (c) 2006-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Bogdan Stefanescu
011 *     Florent Guillaume
012 */
013
014package org.nuxeo.ecm.core.api.local;
015
016import java.util.Collections;
017import java.util.Set;
018import java.util.concurrent.ConcurrentHashMap;
019import java.util.concurrent.atomic.AtomicLong;
020
021import javax.transaction.Synchronization;
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.nuxeo.ecm.core.api.AbstractSession;
025import org.nuxeo.ecm.core.api.CoreInstance;
026import org.nuxeo.ecm.core.api.CoreSession;
027import org.nuxeo.ecm.core.api.NuxeoPrincipal;
028import org.nuxeo.ecm.core.model.Session;
029import org.nuxeo.ecm.core.repository.RepositoryService;
030import org.nuxeo.runtime.api.Framework;
031import org.nuxeo.runtime.transaction.TransactionHelper;
032
033/**
034 * Local Session: implementation of {@link CoreSession} beyond {@link AbstractSession}, dealing with low-level stuff.
035 */
036public class LocalSession extends AbstractSession implements Synchronization {
037
038    private static final long serialVersionUID = 1L;
039
040    private static final AtomicLong SID_COUNTER = new AtomicLong();
041
042    private static final Log log = LogFactory.getLog(LocalSession.class);
043
044    protected String repositoryName;
045
046    protected NuxeoPrincipal principal;
047
048    /** Defined once at connect time. */
049    private String sessionId;
050
051    /**
052     * Thread-local session allocated.
053     */
054    private final ThreadLocal<SessionInfo> sessionHolder = new ThreadLocal<>();
055
056    /**
057     * All sessions allocated in all threads, in order to detect close leaks.
058     */
059    private final Set<SessionInfo> allSessions = Collections.newSetFromMap(new ConcurrentHashMap<SessionInfo, Boolean>());
060
061    public static CoreSession createInstance() {
062        return new LocalSession();
063    }
064
065    @Override
066    public String getRepositoryName() {
067        return repositoryName;
068    }
069
070    @Override
071    public void connect(String repositoryName, NuxeoPrincipal principal) {
072        if (sessionId != null) {
073            throw new LocalException("CoreSession already connected");
074        }
075        this.repositoryName = repositoryName;
076        this.principal = principal;
077        createMetrics(); // needs repo name
078        sessionId = newSessionId(repositoryName, principal);
079        if (log.isDebugEnabled()) {
080            log.debug("Creating CoreSession: " + sessionId);
081        }
082        createSession(); // create first session for current thread
083    }
084
085    protected static String newSessionId(String repositoryName, NuxeoPrincipal principal) {
086        return repositoryName + '/' + principal.getName() + '#' + SID_COUNTER.incrementAndGet();
087    }
088
089    @Override
090    public String getSessionId() {
091        return sessionId;
092    }
093
094    @Override
095    public Session getSession() {
096        SessionInfo si = sessionHolder.get();
097        if (si == null || !si.session.isLive()) {
098            // close old one, previously completed
099            closeInThisThread();
100            if (!TransactionHelper.isTransactionActive()) {
101                throw new LocalException("No transaction active, cannot reconnect: " + sessionId);
102            }
103            if (log.isDebugEnabled()) {
104                log.debug("Reconnecting CoreSession: " + sessionId);
105            }
106            si = createSession();
107        }
108        return si.session;
109    }
110
111    /**
112     * Creates the session. It will be destroyed by calling {@link #destroy}.
113     */
114    protected SessionInfo createSession() {
115        RepositoryService repositoryService = Framework.getLocalService(RepositoryService.class);
116        Session session = repositoryService.getSession(repositoryName);
117        TransactionHelper.registerSynchronization(this);
118        SessionInfo si = new SessionInfo(session);
119        sessionHolder.set(si);
120        allSessions.add(si);
121        if (log.isDebugEnabled()) {
122            log.debug("Adding thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
123        }
124        return si;
125    }
126
127    @Override
128    public boolean isLive(boolean onThread) {
129        if (!onThread) {
130            return !allSessions.isEmpty();
131        }
132        return sessionHolder.get() != null;
133    }
134
135    @Override
136    public void close() {
137        CoreInstance.closeCoreSession(this); // calls back destroy()
138    }
139
140    @Override
141    public void beforeCompletion() {
142        // insure the connection is closed before commit
143        closeInThisThread();
144    }
145
146    @Override
147    public void afterCompletion(int status) {
148    }
149
150    protected void closeInThisThread() {
151        SessionInfo si = sessionHolder.get();
152        if (si == null) {
153            return;
154        }
155        if (log.isDebugEnabled()) {
156            log.debug("Removing thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
157        }
158        try {
159            si.session.close();
160        } finally {
161            sessionHolder.remove();
162            allSessions.remove(si);
163        }
164    }
165
166    // explicit close()
167    @Override
168    public void destroy() {
169        if (log.isDebugEnabled()) {
170            log.debug("Destroying CoreSession: " + sessionId);
171        }
172        closeInThisThread();
173    }
174
175    @Override
176    public NuxeoPrincipal getPrincipal() {
177        return principal;
178    }
179
180    @Override
181    public boolean isStateSharedByAllThreadSessions() {
182        return getSession().isStateSharedByAllThreadSessions();
183    }
184
185}