001/*
002 * (C) Copyright 2006-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     George Lefter
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.directory.sql;
021
022import java.sql.Connection;
023import java.sql.SQLException;
024import java.util.Arrays;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.stream.Collectors;
028
029import javax.transaction.Synchronization;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.schema.SchemaManager;
035import org.nuxeo.ecm.core.schema.types.Field;
036import org.nuxeo.ecm.core.schema.types.Schema;
037import org.nuxeo.ecm.core.storage.sql.ColumnType;
038import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
039import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
040import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
041import org.nuxeo.ecm.directory.AbstractDirectory;
042import org.nuxeo.ecm.directory.DirectoryCSVLoader;
043import org.nuxeo.ecm.directory.DirectoryException;
044import org.nuxeo.ecm.directory.Session;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.datasource.ConnectionHelper;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049public class SQLDirectory extends AbstractDirectory {
050
051    protected class TxSessionCleaner implements Synchronization {
052        private final SQLSession session;
053
054        Throwable initContext = captureInitContext();
055
056        protected TxSessionCleaner(SQLSession session) {
057            this.session = session;
058        }
059
060        protected Throwable captureInitContext() {
061            if (!log.isDebugEnabled()) {
062                return null;
063            }
064            return new Throwable("SQL directory session init context in " + SQLDirectory.this);
065        }
066
067        protected void checkIsNotLive() {
068            try {
069                if (!session.isLive()) {
070                    return;
071                }
072                if (initContext != null) {
073                    log.warn("Closing a sql directory session for you " + session, initContext);
074                } else {
075                    log.warn("Closing a sql directory session for you " + session);
076                }
077                if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
078                    log.warn("Closing sql directory session outside a transaction" + session);
079                }
080                session.close();
081            } catch (DirectoryException e) {
082                log.error("Cannot state on sql directory session before commit " + SQLDirectory.this, e);
083            }
084
085        }
086
087        @Override
088        public void beforeCompletion() {
089            checkIsNotLive();
090        }
091
092        @Override
093        public void afterCompletion(int status) {
094            checkIsNotLive();
095        }
096
097    }
098
099    public static final Log log = LogFactory.getLog(SQLDirectory.class);
100
101    private final boolean nativeCase;
102
103    private Table table;
104
105    private Schema schema;
106
107    // columns to fetch when an entry is read (with the password)
108    protected List<Column> readColumnsAll;
109
110    // columns to fetch when an entry is read (excludes the password)
111    protected List<Column> readColumns;
112
113    // columns to fetch when an entry is read (with the password), as SQL
114    protected String readColumnsAllSQL;
115
116    // columns to fetch when an entry is read (excludes the password), as SQL
117    protected String readColumnsSQL;
118
119    private volatile Dialect dialect;
120
121    public SQLDirectory(SQLDirectoryDescriptor descriptor) {
122        super(descriptor, TableReference.class);
123
124        // Add specific references
125        addTableReferences(descriptor.getTableReferences());
126
127        nativeCase = Boolean.TRUE.equals(descriptor.nativeCase);
128
129        // Cache fallback
130        fallbackOnDefaultCache();
131    }
132
133    @Override
134    public SQLDirectoryDescriptor getDescriptor() {
135        return (SQLDirectoryDescriptor) descriptor;
136    }
137
138    /**
139     * Lazily initializes the connection.
140     *
141     * @return {@code true} if CSV data should be loaded
142     * @since 8.4
143     */
144    protected boolean initConnectionIfNeeded() {
145        // double checked locking with volatile pattern to ensure concurrent lazy init
146        if (dialect == null) {
147            synchronized (this) {
148                if (dialect == null) {
149                    return initConnection();
150                }
151            }
152        }
153        return false;
154    }
155
156    /**
157     * Initializes the table.
158     *
159     * @return {@code true} if CSV data should be loaded
160     * @since 6.0
161     */
162    protected boolean initConnection() {
163        initSchemaFieldMap();
164        SQLDirectoryDescriptor descriptor = getDescriptor();
165        try (Connection sqlConnection = getConnection()) {
166            dialect = Dialect.createDialect(sqlConnection, null);
167            // setup table and fields maps
168            String tableName = descriptor.tableName == null ? descriptor.name : descriptor.tableName;
169            table = SQLHelper.addTable(tableName, dialect, useNativeCase());
170            SchemaManager schemaManager = Framework.getService(SchemaManager.class);
171            schema = schemaManager.getSchema(getSchema());
172            if (schema == null) {
173                throw new DirectoryException("schema not found: " + getSchema());
174            }
175            readColumnsAll = new LinkedList<>();
176            readColumns = new LinkedList<>();
177            boolean hasPrimary = false;
178            for (Field f : schema.getFields()) {
179                String fieldName = f.getName().getLocalName();
180
181                if (!isReference(fieldName)) {
182                    boolean isId = fieldName.equals(getIdField());
183                    ColumnType type = ColumnType.fromField(f);
184                    if (isId && descriptor.isAutoincrementIdField()) {
185                        type = ColumnType.AUTOINC;
186                    }
187                    Column column = SQLHelper.addColumn(table, fieldName, type, useNativeCase());
188                    if (isId) {
189                        if (descriptor.isAutoincrementIdField()) {
190                            column.setIdentity(true);
191                        }
192                        column.setPrimary(true);
193                        column.setNullable(false);
194                        hasPrimary = true;
195                    }
196                    readColumnsAll.add(column);
197                    if (!fieldName.equals(descriptor.passwordField)) {
198                        readColumns.add(column);
199                    }
200                }
201            }
202            readColumnsAllSQL = readColumnsAll.stream().map(Column::getQuotedName).collect(Collectors.joining(", "));
203            readColumnsSQL = readColumns.stream().map(Column::getQuotedName).collect(Collectors.joining(", "));
204            if (!hasPrimary) {
205                throw new DirectoryException(String.format("Directory '%s' id field '%s' is not present in schema '%s'",
206                        getName(), getIdField(), getSchema()));
207            }
208
209            SQLHelper helper = new SQLHelper(sqlConnection, table, descriptor.getCreateTablePolicy());
210            return helper.setupTable();
211        } catch (SQLException e) {
212            // exception on close
213            throw new DirectoryException(e);
214        }
215    }
216
217    public Connection getConnection() throws DirectoryException {
218        SQLDirectoryDescriptor descriptor = getDescriptor();
219        if (StringUtils.isBlank(descriptor.dataSourceName)) {
220            throw new DirectoryException("Missing dataSource for SQL directory: " + getName());
221        }
222        try {
223            return ConnectionHelper.getConnection(descriptor.dataSourceName);
224        } catch (SQLException e) {
225            throw new DirectoryException("Cannot connect to SQL directory '" + getName() + "': " + e.getMessage(), e);
226        }
227    }
228
229    @Override
230    public Session getSession() throws DirectoryException {
231        boolean loadData = initConnectionIfNeeded();
232        SQLSession session = new SQLSession(this, getDescriptor());
233        addSession(session);
234        if (loadData && descriptor.getDataFileName() != null) {
235            Schema schema = Framework.getService(SchemaManager.class).getSchema(getSchema());
236            Framework.doPrivileged(() -> DirectoryCSVLoader.loadData(descriptor.getDataFileName(),
237                    descriptor.getDataFileCharacterSeparator(), schema, session::createEntry));
238        }
239        return session;
240    }
241
242    protected void addSession(final SQLSession session) throws DirectoryException {
243        super.addSession(session);
244        registerInTx(session);
245    }
246
247    protected void registerInTx(final SQLSession session) throws DirectoryException {
248        if (!TransactionHelper.isTransactionActive()) {
249            return;
250        }
251        TransactionHelper.registerSynchronization(new TxSessionCleaner(session));
252    }
253
254    public Table getTable() {
255        return table;
256    }
257
258    public Dialect getDialect() {
259        return dialect;
260    }
261
262    public boolean useNativeCase() {
263        return nativeCase;
264    }
265
266    @Override
267    public boolean isMultiTenant() {
268        return table.getColumn(TENANT_ID_FIELD) != null;
269    }
270
271    @Override
272    public String toString() {
273        return "SQLDirectory [name=" + descriptor.name + "]";
274    }
275
276    protected void addTableReferences(TableReferenceDescriptor[] tableReferences) {
277        if (tableReferences != null) {
278            Arrays.stream(tableReferences).map(TableReference::new).forEach(this::addReference);
279        }
280    }
281
282}