001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     George Lefter
016 *     Florent Guillaume
017 *     Julien Carsique
018 */
019package org.nuxeo.ecm.directory.sql;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.Serializable;
025import java.sql.Connection;
026import java.sql.DatabaseMetaData;
027import java.sql.PreparedStatement;
028import java.sql.ResultSet;
029import java.sql.SQLException;
030import java.sql.Statement;
031import java.sql.Timestamp;
032import java.util.ArrayList;
033import java.util.Calendar;
034import java.util.GregorianCalendar;
035import java.util.HashSet;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039
040import org.apache.commons.csv.CSVFormat;
041import org.apache.commons.csv.CSVParser;
042import org.apache.commons.csv.CSVRecord;
043import org.apache.commons.lang.StringUtils;
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046
047import org.nuxeo.ecm.core.storage.sql.ColumnSpec;
048import org.nuxeo.ecm.core.storage.sql.ColumnType;
049import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger;
050import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
051import org.nuxeo.ecm.core.storage.sql.jdbc.db.Insert;
052import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
053import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl;
054import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
055import org.nuxeo.ecm.directory.DirectoryException;
056import org.nuxeo.runtime.api.Framework;
057
058public class SQLHelper {
059
060    private static final Log log = LogFactory.getLog(SQLHelper.class);
061
062    public static final String SQL_NULL_MARKER = "__NULL__";
063
064    private static final String SQL_SCRIPT_CHARSET = "UTF-8";
065
066    private static final Object DIRECTORY_INIT_LOCK = new Object();
067
068    private final Table table;
069
070    private final String tableName;
071
072    private final Connection connection;
073
074    private final String policy;
075
076    private final String dataFileName;
077
078    protected final char characterSeparator;
079
080    private JDBCLogger logger = new JDBCLogger("SQLDirectory");
081
082    public SQLHelper(Connection connection, Table table, String dataFileName, char characterSeparator, String policy) {
083        this.table = table;
084        this.connection = connection;
085        this.policy = policy;
086        this.dataFileName = dataFileName;
087        tableName = table.getPhysicalName();
088        this.characterSeparator = characterSeparator;
089    }
090
091    public SQLHelper(Connection connection, Table table, String dataFileName, String policy) {
092        this(connection, table, dataFileName, ',', policy);
093    }
094
095    public boolean setupTable() throws DirectoryException {
096        log.debug(String.format("setting up table '%s', policy='%s'", tableName, policy));
097        if (policy.equals("never")) {
098            log.debug("policy='never', skipping setup");
099            return false;
100        }
101
102        synchronized (DIRECTORY_INIT_LOCK) {
103
104            boolean tableExists = tableExists();
105
106            // check the field names match the column names
107            if (policy.equals("on_missing_columns") && tableExists) {
108                if (hasMatchingColumns()) {
109                    // all required columns were found
110                    log.debug("policy='on_missing_columns' and all column matched, skipping sql setup script");
111                    return false;
112                } else {
113                    log.debug("policy='on_missing_columns' and some columns are missing");
114                    addMissingColumns();
115                    return true;
116                }
117            }
118
119            createTable(tableExists);
120
121            if (dataFileName == null) {
122                // no dataFile found, do not try to execute it
123                log.debug(String.format("Table '%s': no data file found", tableName));
124                return true;
125            }
126
127            loadData();
128        }
129
130        return true;
131    }
132
133    private void addMissingColumns() throws DirectoryException {
134        try {
135            Statement stmt = connection.createStatement();
136
137            for (Column column : getMissingColumns(false)) {
138                String alter = table.getAddColumnSql(column);
139                if (logger.isLogEnabled()) {
140                    logger.log(alter);
141                }
142                stmt.execute(alter);
143            }
144        } catch (SQLException e) {
145            throw new DirectoryException(String.format("Table '%s' alteration failed: %s", table, e.getMessage()), e);
146        }
147    }
148
149    private void createTable(boolean tableExists) throws DirectoryException {
150        try {
151            Statement stmt = connection.createStatement();
152
153            if (tableExists) {
154                // drop table
155                String dropSql = table.getDropSql();
156                if (logger.isLogEnabled()) {
157                    logger.log(dropSql);
158                }
159                stmt.execute(dropSql);
160            }
161
162            String createSql = table.getCreateSql();
163            if (logger.isLogEnabled()) {
164                logger.log(createSql);
165            }
166            stmt.execute(createSql);
167            for (String sql : table.getPostCreateSqls(null)) {
168                if (logger.isLogEnabled()) {
169                    logger.log(sql);
170                }
171                stmt.execute(sql);
172            }
173        } catch (SQLException e) {
174            throw new DirectoryException(String.format("Table '%s' creation failed: %s", table, e.getMessage()), e);
175        }
176    }
177
178    public boolean hasMatchingColumns() {
179        Set<Column> missingColumns = getMissingColumns(true);
180        if (missingColumns == null || missingColumns.size() > 0) {
181            return false;
182        } else {
183            // all fields have a matching column, this looks not that bad
184            log.debug(String.format("all fields matched for table '%s'", tableName));
185            return true;
186        }
187    }
188
189    public Set<Column> getMissingColumns(Boolean breakAtFirstMissing) {
190        try {
191            Set<Column> missingColumns = new HashSet<>();
192
193            // Test whether there are new fields added in the schema that are
194            // not present in the table schema. If so it is advised to
195            // reinitialise the database.
196
197            Set<String> columnNames = getPhysicalColumns();
198
199            // check the field names match the column names (case-insensitive)
200            for (Column column : table.getColumns()) {
201                // TODO: check types as well
202                String fieldName = column.getPhysicalName();
203                if (!columnNames.contains(fieldName)) {
204                    log.debug(String.format("required field: %s is missing", fieldName));
205                    missingColumns.add(column);
206
207                    if (breakAtFirstMissing) {
208                        return null;
209                    }
210                }
211            }
212
213            return missingColumns;
214        } catch (SQLException e) {
215            log.warn("error while introspecting table: " + tableName, e);
216            return null;
217        }
218    }
219
220    private Set<String> getPhysicalColumns() throws SQLException {
221        ResultSet rs = null;
222        Set<String> columnNames = new HashSet<>();
223        try {
224            // fetch the database columns definitions
225            DatabaseMetaData metadata = connection.getMetaData();
226            rs = metadata.getColumns(null, "%", tableName, "%");
227
228            while (rs.next()) {
229                columnNames.add(rs.getString("COLUMN_NAME"));
230            }
231        } finally {
232            if (rs != null) {
233                try {
234                    rs.close();
235                } catch (SQLException e) {
236                    log.warn("Error while trying to close result set", e);
237                }
238            }
239        }
240        return columnNames;
241    }
242
243    private boolean tableExists() throws DirectoryException {
244        try {
245            // Check if table exists using metadata
246            DatabaseMetaData metaData = connection.getMetaData();
247            String schemaName = null;
248            String productName = metaData.getDatabaseProductName();
249            if ("Oracle".equals(productName)) {
250                Statement st = connection.createStatement();
251                String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL";
252                log.trace("SQL: " + sql);
253                ResultSet rs = st.executeQuery(sql);
254                rs.next();
255                schemaName = rs.getString(1);
256                log.trace("checking existing tables for oracle database, schema: " + schemaName);
257                rs.close();
258                st.close();
259            }
260            ResultSet rs = metaData.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" });
261            boolean exists = rs.next();
262            rs.close();
263            log.debug(String.format("checking if table %s exists: %s", table.getPhysicalName(), Boolean.valueOf(exists)));
264            return exists;
265        } catch (SQLException e) {
266            throw new DirectoryException(e);
267        }
268    }
269
270    private void loadData() throws DirectoryException {
271        log.debug("loading data file: " + dataFileName);
272        CSVParser csvParser = null;
273        PreparedStatement ps = null;
274        try {
275            InputStream is = getClass().getClassLoader().getResourceAsStream(dataFileName);
276            if (is == null) {
277                is = Framework.getResourceLoader().getResourceAsStream(dataFileName);
278                if (is == null) {
279                    throw new DirectoryException("data file not found: " + dataFileName);
280                }
281            }
282
283            csvParser = new CSVParser(new InputStreamReader(is, SQL_SCRIPT_CHARSET), CSVFormat.DEFAULT.withDelimiter(
284                    characterSeparator).withHeader());
285            Map<String, Integer> header = csvParser.getHeaderMap();
286            List<Column> columns = new ArrayList<>();
287            Insert insert = new Insert(table);
288            for (String columnName : header.keySet()) {
289                String trimmedColumnName = columnName.trim();
290                Column column = table.getColumn(trimmedColumnName);
291                if (column == null) {
292                    throw new DirectoryException("column not found: " + trimmedColumnName);
293                }
294                columns.add(table.getColumn(trimmedColumnName));
295                insert.addColumn(column);
296            }
297
298            String insertSql = insert.getStatement();
299            log.debug("insert statement: " + insertSql);
300            ps = connection.prepareStatement(insertSql);
301            for (CSVRecord record : csvParser) {
302                if (record.size() == 0 || record.size() == 1 && StringUtils.isBlank(record.get(0))) {
303                    // NXP-2538: allow columns with only one value but skip
304                    // empty lines
305                    continue;
306                }
307                if (!record.isConsistent()) {
308                    log.error("invalid column count while reading CSV file: " + dataFileName + ", values: " + record);
309                    continue;
310                }
311                if (logger.isLogEnabled()) {
312                    List<Serializable> values = new ArrayList<>(header.size());
313                    for (String value : record) {
314                        if (SQL_NULL_MARKER.equals(value)) {
315                            value = null;
316                        }
317                        values.add(value);
318                    }
319                    logger.logSQL(insertSql, values);
320                }
321
322                for (int i = 0; i < header.size(); i++) {
323                    Column column = columns.get(i);
324                    String value = record.get(i);
325                    Serializable v;
326                    try {
327                        if (SQL_NULL_MARKER.equals(value)) {
328                            v = null;
329                        } else if (column.getType().spec == ColumnSpec.STRING) {
330                            v = value;
331                        } else if (column.getType().spec == ColumnSpec.BOOLEAN) {
332                            v = Boolean.valueOf(value);
333                        } else if (column.getType().spec == ColumnSpec.LONG) {
334                            v = Long.valueOf(value);
335                        } else if (column.getType().spec == ColumnSpec.TIMESTAMP) {
336                            Calendar cal = new GregorianCalendar();
337                            cal.setTime(Timestamp.valueOf(value));
338                            v = cal;
339                        } else if (column.getType().spec == ColumnSpec.DOUBLE) {
340                            v = Double.valueOf(value);
341                        } else {
342                            throw new DirectoryException("unrecognized column type: " + column.getType() + ", values: "
343                                    + record);
344                        }
345                        column.setToPreparedStatement(ps, i + 1, v);
346                    } catch (IllegalArgumentException e) {
347                        throw new DirectoryException(String.format(
348                                "failed to set column '%s' on table '%s', values: %s", column.getPhysicalName(),
349                                table.getPhysicalName(), record), e);
350                    } catch (SQLException e) {
351                        throw new DirectoryException(String.format("Table '%s' initialization failed: %s, values: %s",
352                                table.getPhysicalName(), e.getMessage(), record), e);
353                    }
354                }
355                ps.execute();
356            }
357        } catch (IOException e) {
358            throw new DirectoryException("Read error while reading data file: " + dataFileName, e);
359        } catch (SQLException e) {
360            throw new DirectoryException(String.format("Table '%s' initialization failed: %s", table.getPhysicalName(),
361                    e.getMessage()), e);
362        } finally {
363            DirectoryException e = new DirectoryException();
364            try {
365                if (csvParser != null) {
366                    csvParser.close();
367                }
368            } catch (IOException ioe) {
369                e.addSuppressed(ioe);
370            }
371            try {
372                if (ps != null) {
373                    ps.close();
374                }
375            } catch (SQLException sqle) {
376                e.addSuppressed(sqle);
377            }
378            if (e.getSuppressed().length > 0) {
379                throw e;
380            }
381        }
382    }
383
384    public static Table addTable(String name, Dialect dialect, boolean nativeCase) {
385        String physicalName = dialect.getTableName(name);
386        if (!nativeCase && name.length() == physicalName.length()) {
387            // we can keep the name specified in the config
388            physicalName = name;
389        }
390        return new TableImpl(dialect, physicalName, physicalName);
391    }
392
393    public static Column addColumn(Table table, String fieldName, ColumnType type, boolean nativeCase) {
394        String physicalName = table.getDialect().getColumnName(fieldName);
395        if (!nativeCase && fieldName.length() == physicalName.length()) {
396            // we can keep the name specified in the config
397            physicalName = fieldName;
398        }
399        Column column = new Column(table, physicalName, type, fieldName);
400        return ((TableImpl) table).addColumn(fieldName, column);
401    }
402
403}