001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     George Lefter
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.directory.sql;
021
022import java.sql.Connection;
023import java.sql.SQLException;
024import java.util.Arrays;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.stream.Collectors;
028
029import javax.transaction.Synchronization;
030
031import org.apache.commons.lang.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.schema.SchemaManager;
035import org.nuxeo.ecm.core.schema.types.Field;
036import org.nuxeo.ecm.core.schema.types.Schema;
037import org.nuxeo.ecm.core.storage.sql.ColumnType;
038import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
039import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
040import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
041import org.nuxeo.ecm.directory.AbstractDirectory;
042import org.nuxeo.ecm.directory.DirectoryCSVLoader;
043import org.nuxeo.ecm.directory.DirectoryException;
044import org.nuxeo.ecm.directory.Session;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.datasource.ConnectionHelper;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049public class SQLDirectory extends AbstractDirectory {
050
051    protected class TxSessionCleaner implements Synchronization {
052        private final SQLSession session;
053
054        Throwable initContext = captureInitContext();
055
056        protected TxSessionCleaner(SQLSession session) {
057            this.session = session;
058        }
059
060        protected Throwable captureInitContext() {
061            if (!log.isDebugEnabled()) {
062                return null;
063            }
064            return new Throwable("SQL directory session init context in " + SQLDirectory.this);
065        }
066
067        protected void checkIsNotLive() {
068            try {
069                if (!session.isLive()) {
070                    return;
071                }
072                if (initContext != null) {
073                    log.warn("Closing a sql directory session for you " + session, initContext);
074                } else {
075                    log.warn("Closing a sql directory session for you " + session);
076                }
077                if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
078                    log.warn("Closing sql directory session outside a transaction" + session);
079                }
080                session.close();
081            } catch (DirectoryException e) {
082                log.error("Cannot state on sql directory session before commit " + SQLDirectory.this, e);
083            }
084
085        }
086
087        @Override
088        public void beforeCompletion() {
089            checkIsNotLive();
090        }
091
092        @Override
093        public void afterCompletion(int status) {
094            checkIsNotLive();
095        }
096
097    }
098
099    public static final Log log = LogFactory.getLog(SQLDirectory.class);
100
101    public static final String TENANT_ID_FIELD = "tenantId";
102
103    private final boolean nativeCase;
104
105    private Table table;
106
107    private Schema schema;
108
109    // columns to fetch when an entry is read (with the password)
110    protected List<Column> readColumnsAll;
111
112    // columns to fetch when an entry is read (excludes the password)
113    protected List<Column> readColumns;
114
115    // columns to fetch when an entry is read (with the password), as SQL
116    protected String readColumnsAllSQL;
117
118    // columns to fetch when an entry is read (excludes the password), as SQL
119    protected String readColumnsSQL;
120
121    private volatile Dialect dialect;
122
123    public SQLDirectory(SQLDirectoryDescriptor descriptor) {
124        super(descriptor, TableReference.class);
125
126        // Add specific references
127        addTableReferences(descriptor.getTableReferences());
128
129        nativeCase = Boolean.TRUE.equals(descriptor.nativeCase);
130
131        // Cache fallback
132        fallbackOnDefaultCache();
133    }
134
135    @Override
136    public SQLDirectoryDescriptor getDescriptor() {
137        return (SQLDirectoryDescriptor) descriptor;
138    }
139
140    /**
141     * Lazily initializes the connection.
142     *
143     * @return {@code true} if CSV data should be loaded
144     * @since 8.4
145     */
146    protected boolean initConnectionIfNeeded() {
147        // double checked locking with volatile pattern to ensure concurrent lazy init
148        if (dialect == null) {
149            synchronized (this) {
150                if (dialect == null) {
151                    return initConnection();
152                }
153            }
154        }
155        return false;
156    }
157
158    /**
159     * Initializes the table.
160     *
161     * @return {@code true} if CSV data should be loaded
162     * @since 6.0
163     */
164    protected boolean initConnection() {
165        initSchemaFieldMap();
166        SQLDirectoryDescriptor descriptor = getDescriptor();
167        try (Connection sqlConnection = getConnection()) {
168            dialect = Dialect.createDialect(sqlConnection, null);
169            // setup table and fields maps
170            String tableName = descriptor.tableName == null ? descriptor.name : descriptor.tableName;
171            table = SQLHelper.addTable(tableName, dialect, useNativeCase());
172            SchemaManager schemaManager = Framework.getService(SchemaManager.class);
173            schema = schemaManager.getSchema(getSchema());
174            if (schema == null) {
175                throw new DirectoryException("schema not found: " + getSchema());
176            }
177            readColumnsAll = new LinkedList<>();
178            readColumns = new LinkedList<>();
179            boolean hasPrimary = false;
180            for (Field f : schema.getFields()) {
181                String fieldName = f.getName().getLocalName();
182
183                if (!isReference(fieldName)) {
184                    boolean isId = fieldName.equals(getIdField());
185                    ColumnType type = ColumnType.fromField(f);
186                    if (isId && descriptor.isAutoincrementIdField()) {
187                        type = ColumnType.AUTOINC;
188                    }
189                    Column column = SQLHelper.addColumn(table, fieldName, type, useNativeCase());
190                    if (isId) {
191                        if (descriptor.isAutoincrementIdField()) {
192                            column.setIdentity(true);
193                        }
194                        column.setPrimary(true);
195                        column.setNullable(false);
196                        hasPrimary = true;
197                    }
198                    readColumnsAll.add(column);
199                    if (!fieldName.equals(descriptor.passwordField)) {
200                        readColumns.add(column);
201                    }
202                }
203            }
204            readColumnsAllSQL = readColumnsAll.stream().map(Column::getQuotedName).collect(Collectors.joining(", "));
205            readColumnsSQL = readColumns.stream().map(Column::getQuotedName).collect(Collectors.joining(", "));
206            if (!hasPrimary) {
207                throw new DirectoryException(String.format(
208                        "Directory '%s' id field '%s' is not present in schema '%s'", getName(), getIdField(),
209                        getSchema()));
210            }
211
212            SQLHelper helper = new SQLHelper(sqlConnection, table, descriptor.getCreateTablePolicy());
213            return helper.setupTable();
214        } catch (SQLException e) {
215            // exception on close
216            throw new DirectoryException(e);
217        }
218    }
219
220    public Connection getConnection() throws DirectoryException {
221        SQLDirectoryDescriptor descriptor = getDescriptor();
222        if (StringUtils.isBlank(descriptor.dataSourceName)) {
223            throw new DirectoryException("Missing dataSource for SQL directory: " + getName());
224        }
225        try {
226            return ConnectionHelper.getConnection(descriptor.dataSourceName);
227        } catch (SQLException e) {
228            throw new DirectoryException("Cannot connect to SQL directory '" + getName() + "': " + e.getMessage(), e);
229        }
230    }
231
232    @Override
233    public Session getSession() throws DirectoryException {
234        boolean loadData = initConnectionIfNeeded();
235        SQLSession session = new SQLSession(this, getDescriptor());
236        addSession(session);
237        if (loadData && descriptor.getDataFileName() != null) {
238            Schema schema = Framework.getService(SchemaManager.class).getSchema(getSchema());
239            Framework.doPrivileged(() -> DirectoryCSVLoader.loadData(descriptor.getDataFileName(),
240                    descriptor.getDataFileCharacterSeparator(), schema, session::createEntry));
241        }
242        return session;
243    }
244
245    protected void addSession(final SQLSession session) throws DirectoryException {
246        super.addSession(session);
247        registerInTx(session);
248    }
249
250    protected void registerInTx(final SQLSession session) throws DirectoryException {
251        if (!TransactionHelper.isTransactionActive()) {
252            return;
253        }
254        TransactionHelper.registerSynchronization(new TxSessionCleaner(session));
255    }
256
257    public Table getTable() {
258        return table;
259    }
260
261    public Dialect getDialect() {
262        return dialect;
263    }
264
265    public boolean useNativeCase() {
266        return nativeCase;
267    }
268
269    @Override
270    public boolean isMultiTenant() {
271        return table.getColumn(TENANT_ID_FIELD) != null;
272    }
273
274    @Override
275    public String toString() {
276        return "SQLDirectory [name=" + descriptor.name + "]";
277    }
278
279    protected void addTableReferences(TableReferenceDescriptor[] tableReferences) {
280        if (tableReferences != null) {
281            Arrays.stream(tableReferences).map(TableReference::new).forEach(this::addReference);
282        }
283    }
284
285}