001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     George Lefter
018 *     Florent Guillaume
019 *     Julien Carsique
020 */
021package org.nuxeo.ecm.directory.sql;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.InputStreamReader;
026import java.io.Serializable;
027import java.sql.Connection;
028import java.sql.DatabaseMetaData;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.sql.Timestamp;
034import java.util.ArrayList;
035import java.util.Calendar;
036import java.util.GregorianCalendar;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041
042import org.apache.commons.csv.CSVFormat;
043import org.apache.commons.csv.CSVParser;
044import org.apache.commons.csv.CSVRecord;
045import org.apache.commons.lang.StringUtils;
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048
049import org.nuxeo.ecm.core.storage.sql.ColumnSpec;
050import org.nuxeo.ecm.core.storage.sql.ColumnType;
051import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger;
052import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
053import org.nuxeo.ecm.core.storage.sql.jdbc.db.Insert;
054import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
055import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl;
056import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
057import org.nuxeo.ecm.directory.DirectoryException;
058import org.nuxeo.runtime.api.Framework;
059
060public class SQLHelper {
061
062    private static final Log log = LogFactory.getLog(SQLHelper.class);
063
064    public static final String SQL_NULL_MARKER = "__NULL__";
065
066    private static final String SQL_SCRIPT_CHARSET = "UTF-8";
067
068    private static final Object DIRECTORY_INIT_LOCK = new Object();
069
070    private final Table table;
071
072    private final String tableName;
073
074    private final Connection connection;
075
076    private final String policy;
077
078    private final String dataFileName;
079
080    protected final char characterSeparator;
081
082    private JDBCLogger logger = new JDBCLogger("SQLDirectory");
083
084    public SQLHelper(Connection connection, Table table, String dataFileName, char characterSeparator, String policy) {
085        this.table = table;
086        this.connection = connection;
087        this.policy = policy;
088        this.dataFileName = dataFileName;
089        tableName = table.getPhysicalName();
090        this.characterSeparator = characterSeparator;
091    }
092
093    public SQLHelper(Connection connection, Table table, String dataFileName, String policy) {
094        this(connection, table, dataFileName, ',', policy);
095    }
096
097    public boolean setupTable() throws DirectoryException {
098        log.debug(String.format("setting up table '%s', policy='%s'", tableName, policy));
099        if (policy.equals("never")) {
100            log.debug("policy='never', skipping setup");
101            return false;
102        }
103
104        synchronized (DIRECTORY_INIT_LOCK) {
105
106            boolean tableExists = tableExists();
107
108            // check the field names match the column names
109            if (policy.equals("on_missing_columns") && tableExists) {
110                if (hasMatchingColumns()) {
111                    // all required columns were found
112                    log.debug("policy='on_missing_columns' and all column matched, skipping sql setup script");
113                    return false;
114                } else {
115                    log.debug("policy='on_missing_columns' and some columns are missing");
116                    addMissingColumns();
117                    return true;
118                }
119            }
120
121            createTable(tableExists);
122
123            if (dataFileName == null) {
124                // no dataFile found, do not try to execute it
125                log.debug(String.format("Table '%s': no data file found", tableName));
126                return true;
127            }
128
129            loadData();
130        }
131
132        return true;
133    }
134
135    private void addMissingColumns() throws DirectoryException {
136        try {
137            Statement stmt = connection.createStatement();
138
139            for (Column column : getMissingColumns(false)) {
140                String alter = table.getAddColumnSql(column);
141                if (logger.isLogEnabled()) {
142                    logger.log(alter);
143                }
144                stmt.execute(alter);
145            }
146        } catch (SQLException e) {
147            throw new DirectoryException(String.format("Table '%s' alteration failed: %s", table, e.getMessage()), e);
148        }
149    }
150
151    private void createTable(boolean tableExists) throws DirectoryException {
152        try {
153            Statement stmt = connection.createStatement();
154
155            if (tableExists) {
156                // drop table
157                String dropSql = table.getDropSql();
158                if (logger.isLogEnabled()) {
159                    logger.log(dropSql);
160                }
161                stmt.execute(dropSql);
162            }
163
164            String createSql = table.getCreateSql();
165            if (logger.isLogEnabled()) {
166                logger.log(createSql);
167            }
168            stmt.execute(createSql);
169            for (String sql : table.getPostCreateSqls(null)) {
170                if (logger.isLogEnabled()) {
171                    logger.log(sql);
172                }
173                stmt.execute(sql);
174            }
175        } catch (SQLException e) {
176            throw new DirectoryException(String.format("Table '%s' creation failed: %s", table, e.getMessage()), e);
177        }
178    }
179
180    public boolean hasMatchingColumns() {
181        Set<Column> missingColumns = getMissingColumns(true);
182        if (missingColumns == null || missingColumns.size() > 0) {
183            return false;
184        } else {
185            // all fields have a matching column, this looks not that bad
186            log.debug(String.format("all fields matched for table '%s'", tableName));
187            return true;
188        }
189    }
190
191    public Set<Column> getMissingColumns(Boolean breakAtFirstMissing) {
192        try {
193            Set<Column> missingColumns = new HashSet<>();
194
195            // Test whether there are new fields added in the schema that are
196            // not present in the table schema. If so it is advised to
197            // reinitialise the database.
198
199            Set<String> columnNames = getPhysicalColumns();
200
201            // check the field names match the column names (case-insensitive)
202            for (Column column : table.getColumns()) {
203                // TODO: check types as well
204                String fieldName = column.getPhysicalName();
205                if (!columnNames.contains(fieldName)) {
206                    log.debug(String.format("required field: %s is missing", fieldName));
207                    missingColumns.add(column);
208
209                    if (breakAtFirstMissing) {
210                        return null;
211                    }
212                }
213            }
214
215            return missingColumns;
216        } catch (SQLException e) {
217            log.warn("error while introspecting table: " + tableName, e);
218            return null;
219        }
220    }
221
222    private Set<String> getPhysicalColumns() throws SQLException {
223        ResultSet rs = null;
224        Set<String> columnNames = new HashSet<>();
225        try {
226            // fetch the database columns definitions
227            DatabaseMetaData metadata = connection.getMetaData();
228            rs = metadata.getColumns(null, "%", tableName, "%");
229
230            while (rs.next()) {
231                columnNames.add(rs.getString("COLUMN_NAME"));
232            }
233        } finally {
234            if (rs != null) {
235                try {
236                    rs.close();
237                } catch (SQLException e) {
238                    log.warn("Error while trying to close result set", e);
239                }
240            }
241        }
242        return columnNames;
243    }
244
245    private boolean tableExists() throws DirectoryException {
246        try {
247            // Check if table exists using metadata
248            DatabaseMetaData metaData = connection.getMetaData();
249            String schemaName = null;
250            String productName = metaData.getDatabaseProductName();
251            if ("Oracle".equals(productName)) {
252                Statement st = connection.createStatement();
253                String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL";
254                log.trace("SQL: " + sql);
255                ResultSet rs = st.executeQuery(sql);
256                rs.next();
257                schemaName = rs.getString(1);
258                log.trace("checking existing tables for oracle database, schema: " + schemaName);
259                rs.close();
260                st.close();
261            }
262            ResultSet rs = metaData.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" });
263            boolean exists = rs.next();
264            rs.close();
265            log.debug(String.format("checking if table %s exists: %s", table.getPhysicalName(), Boolean.valueOf(exists)));
266            return exists;
267        } catch (SQLException e) {
268            throw new DirectoryException(e);
269        }
270    }
271
272    private void loadData() throws DirectoryException {
273        log.debug("loading data file: " + dataFileName);
274        CSVParser csvParser = null;
275        PreparedStatement ps = null;
276        try {
277            InputStream is = getClass().getClassLoader().getResourceAsStream(dataFileName);
278            if (is == null) {
279                is = Framework.getResourceLoader().getResourceAsStream(dataFileName);
280                if (is == null) {
281                    throw new DirectoryException("data file not found: " + dataFileName);
282                }
283            }
284
285            csvParser = new CSVParser(new InputStreamReader(is, SQL_SCRIPT_CHARSET), CSVFormat.DEFAULT.withDelimiter(
286                    characterSeparator).withHeader());
287            Map<String, Integer> header = csvParser.getHeaderMap();
288            List<Column> columns = new ArrayList<>();
289            Insert insert = new Insert(table);
290            for (String columnName : header.keySet()) {
291                String trimmedColumnName = columnName.trim();
292                Column column = table.getColumn(trimmedColumnName);
293                if (column == null) {
294                    throw new DirectoryException("column not found: " + trimmedColumnName);
295                }
296                columns.add(table.getColumn(trimmedColumnName));
297                insert.addColumn(column);
298            }
299
300            String insertSql = insert.getStatement();
301            log.debug("insert statement: " + insertSql);
302            ps = connection.prepareStatement(insertSql);
303            for (CSVRecord record : csvParser) {
304                if (record.size() == 0 || record.size() == 1 && StringUtils.isBlank(record.get(0))) {
305                    // NXP-2538: allow columns with only one value but skip
306                    // empty lines
307                    continue;
308                }
309                if (!record.isConsistent()) {
310                    log.error("invalid column count while reading CSV file: " + dataFileName + ", values: " + record);
311                    continue;
312                }
313                if (logger.isLogEnabled()) {
314                    List<Serializable> values = new ArrayList<>(header.size());
315                    for (String value : record) {
316                        if (SQL_NULL_MARKER.equals(value)) {
317                            value = null;
318                        }
319                        values.add(value);
320                    }
321                    logger.logSQL(insertSql, values);
322                }
323
324                for (int i = 0; i < header.size(); i++) {
325                    Column column = columns.get(i);
326                    String value = record.get(i);
327                    Serializable v;
328                    try {
329                        if (SQL_NULL_MARKER.equals(value)) {
330                            v = null;
331                        } else if (column.getType().spec == ColumnSpec.STRING) {
332                            v = value;
333                        } else if (column.getType().spec == ColumnSpec.BOOLEAN) {
334                            v = Boolean.valueOf(value);
335                        } else if (column.getType().spec == ColumnSpec.LONG) {
336                            v = Long.valueOf(value);
337                        } else if (column.getType().spec == ColumnSpec.TIMESTAMP) {
338                            Calendar cal = new GregorianCalendar();
339                            cal.setTime(Timestamp.valueOf(value));
340                            v = cal;
341                        } else if (column.getType().spec == ColumnSpec.DOUBLE) {
342                            v = Double.valueOf(value);
343                        } else {
344                            throw new DirectoryException("unrecognized column type: " + column.getType() + ", values: "
345                                    + record);
346                        }
347                        column.setToPreparedStatement(ps, i + 1, v);
348                    } catch (IllegalArgumentException e) {
349                        throw new DirectoryException(String.format(
350                                "failed to set column '%s' on table '%s', values: %s", column.getPhysicalName(),
351                                table.getPhysicalName(), record), e);
352                    } catch (SQLException e) {
353                        throw new DirectoryException(String.format("Table '%s' initialization failed: %s, values: %s",
354                                table.getPhysicalName(), e.getMessage(), record), e);
355                    }
356                }
357                ps.execute();
358            }
359        } catch (IOException e) {
360            throw new DirectoryException("Read error while reading data file: " + dataFileName, e);
361        } catch (SQLException e) {
362            throw new DirectoryException(String.format("Table '%s' initialization failed: %s", table.getPhysicalName(),
363                    e.getMessage()), e);
364        } finally {
365            DirectoryException e = new DirectoryException();
366            try {
367                if (csvParser != null) {
368                    csvParser.close();
369                }
370            } catch (IOException ioe) {
371                e.addSuppressed(ioe);
372            }
373            try {
374                if (ps != null) {
375                    ps.close();
376                }
377            } catch (SQLException sqle) {
378                e.addSuppressed(sqle);
379            }
380            if (e.getSuppressed().length > 0) {
381                throw e;
382            }
383        }
384    }
385
386    public static Table addTable(String name, Dialect dialect, boolean nativeCase) {
387        String physicalName = dialect.getTableName(name);
388        if (!nativeCase && name.length() == physicalName.length()) {
389            // we can keep the name specified in the config
390            physicalName = name;
391        }
392        return new TableImpl(dialect, physicalName, physicalName);
393    }
394
395    public static Column addColumn(Table table, String fieldName, ColumnType type, boolean nativeCase) {
396        String physicalName = table.getDialect().getColumnName(fieldName);
397        if (!nativeCase && fieldName.length() == physicalName.length()) {
398            // we can keep the name specified in the config
399            physicalName = fieldName;
400        }
401        Column column = new Column(table, physicalName, type, fieldName);
402        return ((TableImpl) table).addColumn(fieldName, column);
403    }
404
405}