001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     George Lefter
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.directory.sql;
021
022import java.sql.Connection;
023import java.sql.SQLException;
024import java.util.Arrays;
025import java.util.LinkedHashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.stream.Collectors;
029
030import javax.transaction.Synchronization;
031
032import org.apache.commons.lang.StringUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.core.cache.CacheService;
036import org.nuxeo.ecm.core.schema.SchemaManager;
037import org.nuxeo.ecm.core.schema.types.Field;
038import org.nuxeo.ecm.core.schema.types.Schema;
039import org.nuxeo.ecm.core.storage.sql.ColumnType;
040import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
041import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
042import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
043import org.nuxeo.ecm.directory.AbstractDirectory;
044import org.nuxeo.ecm.directory.DirectoryCSVLoader;
045import org.nuxeo.ecm.directory.DirectoryException;
046import org.nuxeo.ecm.directory.Session;
047import org.nuxeo.runtime.api.Framework;
048import org.nuxeo.runtime.datasource.ConnectionHelper;
049import org.nuxeo.runtime.transaction.TransactionHelper;
050
051public class SQLDirectory extends AbstractDirectory {
052
053    protected class TxSessionCleaner implements Synchronization {
054        private final SQLSession session;
055
056        Throwable initContext = captureInitContext();
057
058        protected TxSessionCleaner(SQLSession session) {
059            this.session = session;
060        }
061
062        protected Throwable captureInitContext() {
063            if (!log.isDebugEnabled()) {
064                return null;
065            }
066            return new Throwable("SQL directory session init context in " + SQLDirectory.this);
067        }
068
069        protected void checkIsNotLive() {
070            try {
071                if (!session.isLive()) {
072                    return;
073                }
074                if (initContext != null) {
075                    log.warn("Closing a sql directory session for you " + session, initContext);
076                } else {
077                    log.warn("Closing a sql directory session for you " + session);
078                }
079                if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
080                    log.warn("Closing sql directory session outside a transaction" + session);
081                }
082                session.close();
083            } catch (DirectoryException e) {
084                log.error("Cannot state on sql directory session before commit " + SQLDirectory.this, e);
085            }
086
087        }
088
089        @Override
090        public void beforeCompletion() {
091            checkIsNotLive();
092        }
093
094        @Override
095        public void afterCompletion(int status) {
096            checkIsNotLive();
097        }
098
099    }
100
101    public static final Log log = LogFactory.getLog(SQLDirectory.class);
102
103    public static final String TENANT_ID_FIELD = "tenantId";
104
105    private final boolean nativeCase;
106
107    private Table table;
108
109    private Schema schema;
110
111    // columns to fetch when an entry is read (with the password)
112    protected List<Column> readColumnsAll;
113
114    // columns to fetch when an entry is read (excludes the password)
115    protected List<Column> readColumns;
116
117    // columns to fetch when an entry is read (with the password), as SQL
118    protected String readColumnsAllSQL;
119
120    // columns to fetch when an entry is read (excludes the password), as SQL
121    protected String readColumnsSQL;
122
123    private volatile Dialect dialect;
124
125    public SQLDirectory(SQLDirectoryDescriptor descriptor) {
126        super(descriptor, TableReference.class);
127
128        // Add specific references
129        addTableReferences(descriptor.getTableReferences());
130
131        nativeCase = Boolean.TRUE.equals(descriptor.nativeCase);
132
133        // Cache fallback
134        CacheService cacheService = Framework.getLocalService(CacheService.class);
135        if (cacheService != null) {
136            if (descriptor.cacheEntryName == null && descriptor.getCacheMaxSize() != 0) {
137                cache.setEntryCacheName("cache-" + getName());
138                cacheService.registerCache("cache-" + getName(),
139                        descriptor.getCacheMaxSize(),
140                        descriptor.getCacheTimeout() / 60);
141            }
142            if (descriptor.cacheEntryWithoutReferencesName == null && descriptor.getCacheMaxSize() != 0) {
143                cache.setEntryCacheWithoutReferencesName(
144                        "cacheWithoutReference-" + getName());
145                cacheService.registerCache("cacheWithoutReference-" + getName(),
146                        descriptor.getCacheMaxSize(),
147                        descriptor.getCacheTimeout() / 60);
148            }
149        }
150    }
151
152    @Override
153    public SQLDirectoryDescriptor getDescriptor() {
154        return (SQLDirectoryDescriptor) descriptor;
155    }
156
157    /**
158     * Lazily initializes the connection.
159     *
160     * @return {@code true} if CSV data should be loaded
161     * @since 8.4
162     */
163    protected boolean initConnectionIfNeeded() {
164        // double checked locking with volatile pattern to ensure concurrent lazy init
165        if (dialect == null) {
166            synchronized (this) {
167                if (dialect == null) {
168                    return initConnection();
169                }
170            }
171        }
172        return false;
173    }
174
175    /**
176     * Initializes the table.
177     *
178     * @return {@code true} if CSV data should be loaded
179     * @since 6.0
180     */
181    protected boolean initConnection() {
182        SQLDirectoryDescriptor descriptor = getDescriptor();
183
184        try (Connection sqlConnection = getConnection()) {
185            dialect = Dialect.createDialect(sqlConnection, null);
186            // setup table and fields maps
187            String tableName = descriptor.tableName == null ? descriptor.name : descriptor.tableName;
188            table = SQLHelper.addTable(tableName, dialect, useNativeCase());
189            SchemaManager schemaManager = Framework.getLocalService(SchemaManager.class);
190            schema = schemaManager.getSchema(getSchema());
191            if (schema == null) {
192                throw new DirectoryException("schema not found: " + getSchema());
193            }
194            schemaFieldMap = new LinkedHashMap<>();
195            readColumnsAll = new LinkedList<>();
196            readColumns = new LinkedList<>();
197            boolean hasPrimary = false;
198            for (Field f : schema.getFields()) {
199                String fieldName = f.getName().getLocalName();
200                schemaFieldMap.put(fieldName, f);
201
202                if (!isReference(fieldName)) {
203                    boolean isId = fieldName.equals(getIdField());
204                    ColumnType type = ColumnType.fromField(f);
205                    if (isId && descriptor.isAutoincrementIdField()) {
206                        type = ColumnType.AUTOINC;
207                    }
208                    Column column = SQLHelper.addColumn(table, fieldName, type, useNativeCase());
209                    if (isId) {
210                        if (descriptor.isAutoincrementIdField()) {
211                            column.setIdentity(true);
212                        }
213                        column.setPrimary(true);
214                        column.setNullable(false);
215                        hasPrimary = true;
216                    }
217                    readColumnsAll.add(column);
218                    if (!fieldName.equals(descriptor.passwordField)) {
219                        readColumns.add(column);
220                    }
221                }
222            }
223            readColumnsAllSQL = readColumnsAll.stream().map(Column::getQuotedName).collect(Collectors.joining(", "));
224            readColumnsSQL = readColumns.stream().map(Column::getQuotedName).collect(Collectors.joining(", "));
225            if (!hasPrimary) {
226                throw new DirectoryException(String.format(
227                        "Directory '%s' id field '%s' is not present in schema '%s'", getName(), getIdField(),
228                        getSchema()));
229            }
230
231            SQLHelper helper = new SQLHelper(sqlConnection, table, descriptor.getCreateTablePolicy());
232            return helper.setupTable();
233        } catch (SQLException e) {
234            // exception on close
235            throw new DirectoryException(e);
236        }
237    }
238
239    public Connection getConnection() throws DirectoryException {
240        SQLDirectoryDescriptor descriptor = getDescriptor();
241        if (StringUtils.isBlank(descriptor.dataSourceName)) {
242            throw new DirectoryException("Missing dataSource for SQL directory: " + getName());
243        }
244        try {
245            return ConnectionHelper.getConnection(descriptor.dataSourceName);
246        } catch (SQLException e) {
247            throw new DirectoryException("Cannot connect to SQL directory '" + getName() + "': " + e.getMessage(), e);
248        }
249    }
250
251    @Override
252    public Session getSession() throws DirectoryException {
253        boolean loadData = initConnectionIfNeeded();
254        SQLSession session = new SQLSession(this, getDescriptor());
255        addSession(session);
256        if (loadData && descriptor.getDataFileName() != null) {
257            Schema schema = Framework.getService(SchemaManager.class).getSchema(getSchema());
258            Framework.doPrivileged(() -> DirectoryCSVLoader.loadData(descriptor.getDataFileName(),
259                    descriptor.getDataFileCharacterSeparator(), schema, session::createEntry));
260        }
261        return session;
262    }
263
264    protected void addSession(final SQLSession session) throws DirectoryException {
265        super.addSession(session);
266        registerInTx(session);
267    }
268
269    protected void registerInTx(final SQLSession session) throws DirectoryException {
270        if (!TransactionHelper.isTransactionActive()) {
271            return;
272        }
273        TransactionHelper.registerSynchronization(new TxSessionCleaner(session));
274    }
275
276    public Table getTable() {
277        return table;
278    }
279
280    public Dialect getDialect() {
281        return dialect;
282    }
283
284    public boolean useNativeCase() {
285        return nativeCase;
286    }
287
288    @Override
289    public boolean isMultiTenant() {
290        return table.getColumn(TENANT_ID_FIELD) != null;
291    }
292
293    @Override
294    public String toString() {
295        return "SQLDirectory [name=" + descriptor.name + "]";
296    }
297
298    protected void addTableReferences(TableReferenceDescriptor[] tableReferences) {
299        if (tableReferences != null) {
300            Arrays.stream(tableReferences).map(TableReference::new).forEach(this::addReference);
301        }
302    }
303
304}