001/*
002 * (C) Copyright 2006-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Bogdan Stefanescu
018 *     Florent Guillaume
019 */
020
021package org.nuxeo.ecm.core.api.local;
022
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicLong;
027
028import javax.transaction.Status;
029import javax.transaction.Synchronization;
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.ecm.core.api.AbstractSession;
033import org.nuxeo.ecm.core.api.CoreInstance;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.NuxeoException;
036import org.nuxeo.ecm.core.api.NuxeoPrincipal;
037import org.nuxeo.ecm.core.model.Repository;
038import org.nuxeo.ecm.core.model.Session;
039import org.nuxeo.ecm.core.repository.RepositoryService;
040import org.nuxeo.runtime.api.Framework;
041import org.nuxeo.runtime.transaction.TransactionHelper;
042
043/**
044 * Local Session: implementation of {@link CoreSession} beyond {@link AbstractSession}, dealing with low-level stuff.
045 */
046public class LocalSession extends AbstractSession implements Synchronization {
047
048    private static final long serialVersionUID = 1L;
049
050    private static final AtomicLong SID_COUNTER = new AtomicLong();
051
052    private static final Log log = LogFactory.getLog(LocalSession.class);
053
054    protected String repositoryName;
055
056    protected NuxeoPrincipal principal;
057
058    /** Defined once at connect time. */
059    private String sessionId;
060
061    /**
062     * Thread-local session allocated.
063     */
064    private final ThreadLocal<SessionInfo> sessionHolder = new ThreadLocal<>();
065
066    /**
067     * All sessions allocated in all threads, in order to detect close leaks.
068     */
069    private final Set<SessionInfo> allSessions = Collections.newSetFromMap(new ConcurrentHashMap<SessionInfo, Boolean>());
070
071    public LocalSession(String repositoryName, NuxeoPrincipal principal) {
072        if (TransactionHelper.isTransactionMarkedRollback()) {
073            throw new NuxeoException("Cannot create a CoreSession when transaction is marked rollback-only");
074        }
075        if (!TransactionHelper.isTransactionActive()) {
076            throw new NuxeoException("Cannot create a CoreSession outside a transaction");
077        }
078        this.repositoryName = repositoryName;
079        this.principal = principal;
080        createMetrics(); // needs repo name
081        sessionId = newSessionId(repositoryName, principal);
082        if (log.isDebugEnabled()) {
083            log.debug("Creating CoreSession: " + sessionId);
084        }
085        createSession(); // create first session for current thread
086    }
087
088    @Override
089    public String getRepositoryName() {
090        return repositoryName;
091    }
092
093    protected static String newSessionId(String repositoryName, NuxeoPrincipal principal) {
094        return repositoryName + '/' + principal.getName() + '#' + SID_COUNTER.incrementAndGet();
095    }
096
097    @Override
098    public String getSessionId() {
099        return sessionId;
100    }
101
102    @Override
103    public Session getSession() {
104        if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
105            throw new NuxeoException("Cannot use a CoreSession outside a transaction");
106        }
107        TransactionHelper.checkTransactionTimeout();
108        SessionInfo si = sessionHolder.get();
109        if (si == null || !si.session.isLive()) {
110            // close old one, previously completed
111            closeInThisThread();
112            if (log.isDebugEnabled()) {
113                log.debug("Reconnecting CoreSession: " + sessionId);
114            }
115            if (TransactionHelper.isTransactionMarkedRollback()) {
116                throw new NuxeoException("Cannot reconnect a CoreSession when transaction is marked rollback-only");
117            }
118            si = createSession();
119        }
120        return si.session;
121    }
122
123    /**
124     * Creates the session. It will be destroyed by calling {@link #destroy}.
125     */
126    protected SessionInfo createSession() {
127        RepositoryService repositoryService = Framework.getService(RepositoryService.class);
128        Repository repository = repositoryService.getRepository(repositoryName);
129        if (repository == null) {
130            throw new LocalException("No such repository: " + repositoryName);
131        }
132        Session session = repository.getSession();
133        TransactionHelper.registerSynchronization(this);
134        SessionInfo si = new SessionInfo(session);
135        sessionHolder.set(si);
136        allSessions.add(si);
137        if (log.isDebugEnabled()) {
138            log.debug("Adding thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
139        }
140        return si;
141    }
142
143    @Override
144    public boolean isLive(boolean onThread) {
145        if (!onThread) {
146            return !allSessions.isEmpty();
147        }
148        return sessionHolder.get() != null;
149    }
150
151    @Override
152    public void close() {
153        CoreInstance.closeCoreSession(this); // calls back destroy()
154    }
155
156    @Override
157    public void beforeCompletion() {
158        // insure the connection is closed before commit
159        closeInThisThread();
160    }
161
162    @Override
163    public void afterCompletion(int status) {
164        if (status == Status.STATUS_ROLLEDBACK) {
165            // insure the connection is closed on roll-back also
166            closeInThisThread();
167        }
168    }
169
170    protected void closeInThisThread() {
171        SessionInfo si = sessionHolder.get();
172        if (si == null) {
173            return;
174        }
175        if (log.isDebugEnabled()) {
176            log.debug("Removing thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId);
177        }
178        try {
179            si.session.close();
180        } finally {
181            sessionHolder.remove();
182            allSessions.remove(si);
183        }
184    }
185
186    // explicit close()
187    @Override
188    public void destroy() {
189        if (log.isDebugEnabled()) {
190            log.debug("Destroying CoreSession: " + sessionId);
191        }
192        closeInThisThread();
193    }
194
195    @Override
196    public NuxeoPrincipal getPrincipal() {
197        return principal;
198    }
199
200    @Override
201    public boolean isStateSharedByAllThreadSessions() {
202        // by design we always share state when in the same thread (through the sessionHolder ThreadLocal)
203        return true;
204    }
205
206}