001/*
002 * (C) Copyright 2006-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 */
020
021package org.nuxeo.ecm.core.api.local;
022
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicLong;
027
028import javax.transaction.Status;
029import javax.transaction.Synchronization;
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.core.api.AbstractSession;
033import org.nuxeo.ecm.core.api.CloseableCoreSession;
034import org.nuxeo.ecm.core.api.CoreInstance;
035import org.nuxeo.ecm.core.api.CoreSession;
036import org.nuxeo.ecm.core.api.NuxeoException;
037import org.nuxeo.ecm.core.api.NuxeoPrincipal;
038import org.nuxeo.ecm.core.model.Repository;
039import org.nuxeo.ecm.core.model.Session;
040import org.nuxeo.ecm.core.repository.RepositoryService;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.transaction.TransactionHelper;
043
044/**
045 * Local Session: implementation of {@link CoreSession} beyond {@link AbstractSession}, dealing with low-level stuff.
046 */
047public class LocalSession extends AbstractSession implements CloseableCoreSession, Synchronization {
048
049    private static final long serialVersionUID = 1L;
050
051    private static final AtomicLong SID_COUNTER = new AtomicLong();
052
053    private static final Log log = LogFactory.getLog(LocalSession.class);
054
055    protected String repositoryName;
056
057    protected NuxeoPrincipal principal;
058
059    /** Defined once at connect time. */
060    private String sessionId;
061
062    /**
063     * Thread-local session allocated.
064     */
065    private final ThreadLocal<SessionInfo> sessionHolder = new ThreadLocal<>();
066
067    /**
068     * All sessions allocated in all threads, in order to detect close leaks.
069     */
070    private final Set<SessionInfo> allSessions = Collections.newSetFromMap(new ConcurrentHashMap<SessionInfo, Boolean>());
071
072    public LocalSession(String repositoryName, NuxeoPrincipal principal) {
073        if (TransactionHelper.isTransactionMarkedRollback()) {
074            throw new NuxeoException("Cannot create a CoreSession when transaction is marked rollback-only");
075        }
076        if (!TransactionHelper.isTransactionActive()) {
077            throw new NuxeoException("Cannot create a CoreSession outside a transaction");
078        }
079        this.repositoryName = repositoryName;
080        this.principal = principal;
081        createMetrics(); // needs repo name
082        sessionId = newSessionId(repositoryName, principal);
083        if (log.isDebugEnabled()) {
084            log.debug("Creating CoreSession: " + sessionId);
085        }
086        createSession(); // create first session for current thread
087    }
088
089    @Override
090    public String getRepositoryName() {
091        return repositoryName;
092    }
093
094    protected static String newSessionId(String repositoryName, NuxeoPrincipal principal) {
095        return repositoryName + '/' + principal.getName() + '#' + SID_COUNTER.incrementAndGet();
096    }
097
098    @Override
099    public String getSessionId() {
100        return sessionId;
101    }
102
103    @Override
104    public Session getSession() {
105        if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
106            throw new NuxeoException("Cannot use a CoreSession outside a transaction");
107        }
108        TransactionHelper.checkTransactionTimeout();
109        SessionInfo si = sessionHolder.get();
110        if (si == null || !si.session.isLive()) {
111            // close old one, previously completed
112            closeInThisThread();
113            if (log.isDebugEnabled()) {
114                log.debug("Reconnecting CoreSession: " + sessionId);
115            }
116            if (TransactionHelper.isTransactionMarkedRollback()) {
117                throw new NuxeoException("Cannot reconnect a CoreSession when transaction is marked rollback-only");
118            }
119            si = createSession();
120        }
121        return si.session;
122    }
123
124    /**
125     * Creates the session. It will be destroyed by calling {@link #destroy}.
126     */
127    protected SessionInfo createSession() {
128        RepositoryService repositoryService = Framework.getService(RepositoryService.class);
129        Repository repository = repositoryService.getRepository(repositoryName);
130        if (repository == null) {
131            throw new LocalException("No such repository: " + repositoryName);
132        }
133        Session session = repository.getSession();
134        TransactionHelper.registerSynchronization(this);
135        SessionInfo si = new SessionInfo(session);
136        sessionHolder.set(si);
137        allSessions.add(si);
138        if (log.isDebugEnabled()) {
139            log.debug("Adding thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
140        }
141        return si;
142    }
143
144    @Override
145    public boolean isLive(boolean onThread) {
146        if (!onThread) {
147            return !allSessions.isEmpty();
148        }
149        return sessionHolder.get() != null;
150    }
151
152    @Override
153    public void close() {
154        CoreInstance.closeCoreSession(this); // calls back destroy()
155    }
156
157    @Override
158    public void beforeCompletion() {
159        // insure the connection is closed before commit
160        closeInThisThread();
161    }
162
163    @Override
164    public void afterCompletion(int status) {
165        if (status == Status.STATUS_ROLLEDBACK) {
166            // insure the connection is closed on roll-back also
167            closeInThisThread();
168        }
169    }
170
171    protected void closeInThisThread() {
172        SessionInfo si = sessionHolder.get();
173        if (si == null) {
174            return;
175        }
176        if (log.isDebugEnabled()) {
177            log.debug("Removing thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
178        }
179        try {
180            si.session.close();
181        } finally {
182            sessionHolder.remove();
183            allSessions.remove(si);
184        }
185    }
186
187    // explicit close()
188    @Override
189    public void destroy() {
190        if (log.isDebugEnabled()) {
191            log.debug("Destroying CoreSession: " + sessionId);
192        }
193        closeInThisThread();
194    }
195
196    @Override
197    public NuxeoPrincipal getPrincipal() {
198        return principal;
199    }
200
201    @Override
202    public boolean isStateSharedByAllThreadSessions() {
203        // by design we always share state when in the same thread (through the sessionHolder ThreadLocal)
204        return true;
205    }
206
207}