001/* 002 * Copyright (c) 2006-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Bogdan Stefanescu 011 * Florent Guillaume 012 */ 013 014package org.nuxeo.ecm.core.api.local; 015 016import java.util.Collections; 017import java.util.Set; 018import java.util.concurrent.ConcurrentHashMap; 019import java.util.concurrent.atomic.AtomicLong; 020 021import javax.transaction.Synchronization; 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.nuxeo.ecm.core.api.AbstractSession; 025import org.nuxeo.ecm.core.api.CoreInstance; 026import org.nuxeo.ecm.core.api.CoreSession; 027import org.nuxeo.ecm.core.api.NuxeoPrincipal; 028import org.nuxeo.ecm.core.model.Session; 029import org.nuxeo.ecm.core.repository.RepositoryService; 030import org.nuxeo.runtime.api.Framework; 031import org.nuxeo.runtime.transaction.TransactionHelper; 032 033/** 034 * Local Session: implementation of {@link CoreSession} beyond {@link AbstractSession}, dealing with low-level stuff. 035 */ 036public class LocalSession extends AbstractSession implements Synchronization { 037 038 private static final long serialVersionUID = 1L; 039 040 private static final AtomicLong SID_COUNTER = new AtomicLong(); 041 042 private static final Log log = LogFactory.getLog(LocalSession.class); 043 044 protected String repositoryName; 045 046 protected NuxeoPrincipal principal; 047 048 /** Defined once at connect time. */ 049 private String sessionId; 050 051 /** 052 * Thread-local session allocated. 053 */ 054 private final ThreadLocal<SessionInfo> sessionHolder = new ThreadLocal<>(); 055 056 /** 057 * All sessions allocated in all threads, in order to detect close leaks. 058 */ 059 private final Set<SessionInfo> allSessions = Collections.newSetFromMap(new ConcurrentHashMap<SessionInfo, Boolean>()); 060 061 public static CoreSession createInstance() { 062 return new LocalSession(); 063 } 064 065 @Override 066 public String getRepositoryName() { 067 return repositoryName; 068 } 069 070 @Override 071 public void connect(String repositoryName, NuxeoPrincipal principal) { 072 if (sessionId != null) { 073 throw new LocalException("CoreSession already connected"); 074 } 075 this.repositoryName = repositoryName; 076 this.principal = principal; 077 createMetrics(); // needs repo name 078 sessionId = newSessionId(repositoryName, principal); 079 if (log.isDebugEnabled()) { 080 log.debug("Creating CoreSession: " + sessionId); 081 } 082 createSession(); // create first session for current thread 083 } 084 085 protected static String newSessionId(String repositoryName, NuxeoPrincipal principal) { 086 return repositoryName + '/' + principal.getName() + '#' + SID_COUNTER.incrementAndGet(); 087 } 088 089 @Override 090 public String getSessionId() { 091 return sessionId; 092 } 093 094 @Override 095 public Session getSession() { 096 SessionInfo si = sessionHolder.get(); 097 if (si == null || !si.session.isLive()) { 098 // close old one, previously completed 099 closeInThisThread(); 100 if (!TransactionHelper.isTransactionActive()) { 101 throw new LocalException("No transaction active, cannot reconnect: " + sessionId); 102 } 103 if (log.isDebugEnabled()) { 104 log.debug("Reconnecting CoreSession: " + sessionId); 105 } 106 si = createSession(); 107 } 108 return si.session; 109 } 110 111 /** 112 * Creates the session. It will be destroyed by calling {@link #destroy}. 113 */ 114 protected SessionInfo createSession() { 115 RepositoryService repositoryService = Framework.getLocalService(RepositoryService.class); 116 Session session = repositoryService.getSession(repositoryName); 117 TransactionHelper.registerSynchronization(this); 118 SessionInfo si = new SessionInfo(session); 119 sessionHolder.set(si); 120 allSessions.add(si); 121 if (log.isDebugEnabled()) { 122 log.debug("Adding thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId); 123 } 124 return si; 125 } 126 127 @Override 128 public boolean isLive(boolean onThread) { 129 if (!onThread) { 130 return !allSessions.isEmpty(); 131 } 132 return sessionHolder.get() != null; 133 } 134 135 @Override 136 public void close() { 137 CoreInstance.closeCoreSession(this); // calls back destroy() 138 } 139 140 @Override 141 public void beforeCompletion() { 142 // insure the connection is closed before commit 143 closeInThisThread(); 144 } 145 146 @Override 147 public void afterCompletion(int status) { 148 } 149 150 protected void closeInThisThread() { 151 SessionInfo si = sessionHolder.get(); 152 if (si == null) { 153 return; 154 } 155 if (log.isDebugEnabled()) { 156 log.debug("Removing thread " + Thread.currentThread().getName() + " for CoreSession: " + sessionId); 157 } 158 try { 159 si.session.close(); 160 } finally { 161 sessionHolder.remove(); 162 allSessions.remove(si); 163 } 164 } 165 166 // explicit close() 167 @Override 168 public void destroy() { 169 if (log.isDebugEnabled()) { 170 log.debug("Destroying CoreSession: " + sessionId); 171 } 172 closeInThisThread(); 173 } 174 175 @Override 176 public NuxeoPrincipal getPrincipal() { 177 return principal; 178 } 179 180 @Override 181 public boolean isStateSharedByAllThreadSessions() { 182 return getSession().isStateSharedByAllThreadSessions(); 183 } 184 185}