001/*
002 * (C) Copyright 2006-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 */
020
021package org.nuxeo.ecm.core.api.local;
022
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicLong;
027
028import javax.transaction.Synchronization;
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.nuxeo.ecm.core.api.AbstractSession;
032import org.nuxeo.ecm.core.api.CoreInstance;
033import org.nuxeo.ecm.core.api.CoreSession;
034import org.nuxeo.ecm.core.api.NuxeoPrincipal;
035import org.nuxeo.ecm.core.model.Session;
036import org.nuxeo.ecm.core.repository.RepositoryService;
037import org.nuxeo.runtime.api.Framework;
038import org.nuxeo.runtime.transaction.TransactionHelper;
039
040/**
041 * Local Session: implementation of {@link CoreSession} beyond {@link AbstractSession}, dealing with low-level stuff.
042 */
043public class LocalSession extends AbstractSession implements Synchronization {
044
045    private static final long serialVersionUID = 1L;
046
047    private static final AtomicLong SID_COUNTER = new AtomicLong();
048
049    private static final Log log = LogFactory.getLog(LocalSession.class);
050
051    protected String repositoryName;
052
053    protected NuxeoPrincipal principal;
054
055    /** Defined once at connect time. */
056    private String sessionId;
057
058    /**
059     * Thread-local session allocated.
060     */
061    private final ThreadLocal<SessionInfo> sessionHolder = new ThreadLocal<>();
062
063    /**
064     * All sessions allocated in all threads, in order to detect close leaks.
065     */
066    private final Set<SessionInfo> allSessions = Collections.newSetFromMap(new ConcurrentHashMap<SessionInfo, Boolean>());
067
068    public static CoreSession createInstance() {
069        return new LocalSession();
070    }
071
072    @Override
073    public String getRepositoryName() {
074        return repositoryName;
075    }
076
077    @Override
078    public void connect(String repositoryName, NuxeoPrincipal principal) {
079        if (sessionId != null) {
080            throw new LocalException("CoreSession already connected");
081        }
082        this.repositoryName = repositoryName;
083        this.principal = principal;
084        createMetrics(); // needs repo name
085        sessionId = newSessionId(repositoryName, principal);
086        if (log.isDebugEnabled()) {
087            log.debug("Creating CoreSession: " + sessionId);
088        }
089        createSession(); // create first session for current thread
090    }
091
092    protected static String newSessionId(String repositoryName, NuxeoPrincipal principal) {
093        return repositoryName + '/' + principal.getName() + '#' + SID_COUNTER.incrementAndGet();
094    }
095
096    @Override
097    public String getSessionId() {
098        return sessionId;
099    }
100
101    @Override
102    public Session getSession() {
103        SessionInfo si = sessionHolder.get();
104        if (si == null || !si.session.isLive()) {
105            // close old one, previously completed
106            closeInThisThread();
107            if (!TransactionHelper.isTransactionActive()) {
108                throw new LocalException("No transaction active, cannot reconnect: " + sessionId);
109            }
110            if (log.isDebugEnabled()) {
111                log.debug("Reconnecting CoreSession: " + sessionId);
112            }
113            si = createSession();
114        }
115        return si.session;
116    }
117
118    /**
119     * Creates the session. It will be destroyed by calling {@link #destroy}.
120     */
121    protected SessionInfo createSession() {
122        RepositoryService repositoryService = Framework.getLocalService(RepositoryService.class);
123        Session session = repositoryService.getSession(repositoryName);
124        TransactionHelper.registerSynchronization(this);
125        SessionInfo si = new SessionInfo(session);
126        sessionHolder.set(si);
127        allSessions.add(si);
128        if (log.isDebugEnabled()) {
129            log.debug("Adding thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
130        }
131        return si;
132    }
133
134    @Override
135    public boolean isLive(boolean onThread) {
136        if (!onThread) {
137            return !allSessions.isEmpty();
138        }
139        return sessionHolder.get() != null;
140    }
141
142    @Override
143    public void close() {
144        CoreInstance.closeCoreSession(this); // calls back destroy()
145    }
146
147    @Override
148    public void beforeCompletion() {
149        // insure the connection is closed before commit
150        closeInThisThread();
151    }
152
153    @Override
154    public void afterCompletion(int status) {
155    }
156
157    protected void closeInThisThread() {
158        SessionInfo si = sessionHolder.get();
159        if (si == null) {
160            return;
161        }
162        if (log.isDebugEnabled()) {
163            log.debug("Removing thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
164        }
165        try {
166            si.session.close();
167        } finally {
168            sessionHolder.remove();
169            allSessions.remove(si);
170        }
171    }
172
173    // explicit close()
174    @Override
175    public void destroy() {
176        if (log.isDebugEnabled()) {
177            log.debug("Destroying CoreSession: " + sessionId);
178        }
179        closeInThisThread();
180    }
181
182    @Override
183    public NuxeoPrincipal getPrincipal() {
184        return principal;
185    }
186
187    @Override
188    public boolean isStateSharedByAllThreadSessions() {
189        return getSession().isStateSharedByAllThreadSessions();
190    }
191
192}