001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * George Lefter 018 * Florent Guillaume 019 * Julien Carsique 020 */ 021package org.nuxeo.ecm.directory.sql; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.InputStreamReader; 026import java.io.Serializable; 027import java.sql.Connection; 028import java.sql.DatabaseMetaData; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.sql.Timestamp; 034import java.util.ArrayList; 035import java.util.Calendar; 036import java.util.GregorianCalendar; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041 042import org.apache.commons.csv.CSVFormat; 043import org.apache.commons.csv.CSVParser; 044import org.apache.commons.csv.CSVRecord; 045import org.apache.commons.lang.StringUtils; 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048 049import org.nuxeo.ecm.core.storage.sql.ColumnSpec; 050import org.nuxeo.ecm.core.storage.sql.ColumnType; 051import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger; 052import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 053import org.nuxeo.ecm.core.storage.sql.jdbc.db.Insert; 054import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 055import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl; 056import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 057import org.nuxeo.ecm.directory.DirectoryException; 058import org.nuxeo.runtime.api.Framework; 059 060public class SQLHelper { 061 062 private static final Log log = LogFactory.getLog(SQLHelper.class); 063 064 public static final String SQL_NULL_MARKER = "__NULL__"; 065 066 private static final String SQL_SCRIPT_CHARSET = "UTF-8"; 067 068 private static final Object DIRECTORY_INIT_LOCK = new Object(); 069 070 private final Table table; 071 072 private final String tableName; 073 074 private final Connection connection; 075 076 private final String policy; 077 078 private final String dataFileName; 079 080 protected final char characterSeparator; 081 082 private JDBCLogger logger = new JDBCLogger("SQLDirectory"); 083 084 public SQLHelper(Connection connection, Table table, String dataFileName, char characterSeparator, String policy) { 085 this.table = table; 086 this.connection = connection; 087 this.policy = policy; 088 this.dataFileName = dataFileName; 089 tableName = table.getPhysicalName(); 090 this.characterSeparator = characterSeparator; 091 } 092 093 public SQLHelper(Connection connection, Table table, String dataFileName, String policy) { 094 this(connection, table, dataFileName, ',', policy); 095 } 096 097 public boolean setupTable() throws DirectoryException { 098 log.debug(String.format("setting up table '%s', policy='%s'", tableName, policy)); 099 if (policy.equals("never")) { 100 log.debug("policy='never', skipping setup"); 101 return false; 102 } 103 104 synchronized (DIRECTORY_INIT_LOCK) { 105 106 boolean tableExists = tableExists(); 107 108 // check the field names match the column names 109 if (policy.equals("on_missing_columns") && tableExists) { 110 if (hasMatchingColumns()) { 111 // all required columns were found 112 log.debug("policy='on_missing_columns' and all column matched, skipping sql setup script"); 113 return false; 114 } else { 115 log.debug("policy='on_missing_columns' and some columns are missing"); 116 addMissingColumns(); 117 return true; 118 } 119 } 120 121 createTable(tableExists); 122 123 if (dataFileName == null) { 124 // no dataFile found, do not try to execute it 125 log.debug(String.format("Table '%s': no data file found", tableName)); 126 return true; 127 } 128 129 loadData(); 130 } 131 132 return true; 133 } 134 135 private void addMissingColumns() throws DirectoryException { 136 try { 137 Statement stmt = connection.createStatement(); 138 139 for (Column column : getMissingColumns(false)) { 140 String alter = table.getAddColumnSql(column); 141 if (logger.isLogEnabled()) { 142 logger.log(alter); 143 } 144 stmt.execute(alter); 145 } 146 } catch (SQLException e) { 147 throw new DirectoryException(String.format("Table '%s' alteration failed: %s", table, e.getMessage()), e); 148 } 149 } 150 151 private void createTable(boolean tableExists) throws DirectoryException { 152 try { 153 Statement stmt = connection.createStatement(); 154 155 if (tableExists) { 156 // drop table 157 String dropSql = table.getDropSql(); 158 if (logger.isLogEnabled()) { 159 logger.log(dropSql); 160 } 161 stmt.execute(dropSql); 162 } 163 164 String createSql = table.getCreateSql(); 165 if (logger.isLogEnabled()) { 166 logger.log(createSql); 167 } 168 stmt.execute(createSql); 169 for (String sql : table.getPostCreateSqls(null)) { 170 if (logger.isLogEnabled()) { 171 logger.log(sql); 172 } 173 stmt.execute(sql); 174 } 175 } catch (SQLException e) { 176 throw new DirectoryException(String.format("Table '%s' creation failed: %s", table, e.getMessage()), e); 177 } 178 } 179 180 public boolean hasMatchingColumns() { 181 Set<Column> missingColumns = getMissingColumns(true); 182 if (missingColumns == null || missingColumns.size() > 0) { 183 return false; 184 } else { 185 // all fields have a matching column, this looks not that bad 186 log.debug(String.format("all fields matched for table '%s'", tableName)); 187 return true; 188 } 189 } 190 191 public Set<Column> getMissingColumns(Boolean breakAtFirstMissing) { 192 try { 193 Set<Column> missingColumns = new HashSet<>(); 194 195 // Test whether there are new fields added in the schema that are 196 // not present in the table schema. If so it is advised to 197 // reinitialise the database. 198 199 Set<String> columnNames = getPhysicalColumns(); 200 201 // check the field names match the column names (case-insensitive) 202 for (Column column : table.getColumns()) { 203 // TODO: check types as well 204 String fieldName = column.getPhysicalName(); 205 if (!columnNames.contains(fieldName)) { 206 log.debug(String.format("required field: %s is missing", fieldName)); 207 missingColumns.add(column); 208 209 if (breakAtFirstMissing) { 210 return null; 211 } 212 } 213 } 214 215 return missingColumns; 216 } catch (SQLException e) { 217 log.warn("error while introspecting table: " + tableName, e); 218 return null; 219 } 220 } 221 222 private Set<String> getPhysicalColumns() throws SQLException { 223 ResultSet rs = null; 224 Set<String> columnNames = new HashSet<>(); 225 try { 226 // fetch the database columns definitions 227 DatabaseMetaData metadata = connection.getMetaData(); 228 rs = metadata.getColumns(null, "%", tableName, "%"); 229 230 while (rs.next()) { 231 columnNames.add(rs.getString("COLUMN_NAME")); 232 } 233 } finally { 234 if (rs != null) { 235 try { 236 rs.close(); 237 } catch (SQLException e) { 238 log.warn("Error while trying to close result set", e); 239 } 240 } 241 } 242 return columnNames; 243 } 244 245 private boolean tableExists() throws DirectoryException { 246 try { 247 // Check if table exists using metadata 248 DatabaseMetaData metaData = connection.getMetaData(); 249 String schemaName = null; 250 String productName = metaData.getDatabaseProductName(); 251 if ("Oracle".equals(productName)) { 252 Statement st = connection.createStatement(); 253 String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL"; 254 log.trace("SQL: " + sql); 255 ResultSet rs = st.executeQuery(sql); 256 rs.next(); 257 schemaName = rs.getString(1); 258 log.trace("checking existing tables for oracle database, schema: " + schemaName); 259 rs.close(); 260 st.close(); 261 } 262 ResultSet rs = metaData.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" }); 263 boolean exists = rs.next(); 264 rs.close(); 265 log.debug(String.format("checking if table %s exists: %s", table.getPhysicalName(), Boolean.valueOf(exists))); 266 return exists; 267 } catch (SQLException e) { 268 throw new DirectoryException(e); 269 } 270 } 271 272 private void loadData() throws DirectoryException { 273 log.debug("loading data file: " + dataFileName); 274 CSVParser csvParser = null; 275 PreparedStatement ps = null; 276 try { 277 InputStream is = getClass().getClassLoader().getResourceAsStream(dataFileName); 278 if (is == null) { 279 is = Framework.getResourceLoader().getResourceAsStream(dataFileName); 280 if (is == null) { 281 throw new DirectoryException("data file not found: " + dataFileName); 282 } 283 } 284 285 csvParser = new CSVParser(new InputStreamReader(is, SQL_SCRIPT_CHARSET), CSVFormat.DEFAULT.withDelimiter( 286 characterSeparator).withHeader()); 287 Map<String, Integer> header = csvParser.getHeaderMap(); 288 List<Column> columns = new ArrayList<>(); 289 Insert insert = new Insert(table); 290 for (String columnName : header.keySet()) { 291 String trimmedColumnName = columnName.trim(); 292 Column column = table.getColumn(trimmedColumnName); 293 if (column == null) { 294 throw new DirectoryException("column not found: " + trimmedColumnName); 295 } 296 columns.add(table.getColumn(trimmedColumnName)); 297 insert.addColumn(column); 298 } 299 300 String insertSql = insert.getStatement(); 301 log.debug("insert statement: " + insertSql); 302 ps = connection.prepareStatement(insertSql); 303 for (CSVRecord record : csvParser) { 304 if (record.size() == 0 || record.size() == 1 && StringUtils.isBlank(record.get(0))) { 305 // NXP-2538: allow columns with only one value but skip 306 // empty lines 307 continue; 308 } 309 if (!record.isConsistent()) { 310 log.error("invalid column count while reading CSV file: " + dataFileName + ", values: " + record); 311 continue; 312 } 313 if (logger.isLogEnabled()) { 314 List<Serializable> values = new ArrayList<>(header.size()); 315 for (String value : record) { 316 if (SQL_NULL_MARKER.equals(value)) { 317 value = null; 318 } 319 values.add(value); 320 } 321 logger.logSQL(insertSql, values); 322 } 323 324 for (int i = 0; i < header.size(); i++) { 325 Column column = columns.get(i); 326 String value = record.get(i); 327 Serializable v; 328 try { 329 if (SQL_NULL_MARKER.equals(value)) { 330 v = null; 331 } else if (column.getType().spec == ColumnSpec.STRING) { 332 v = value; 333 } else if (column.getType().spec == ColumnSpec.BOOLEAN) { 334 v = Boolean.valueOf(value); 335 } else if (column.getType().spec == ColumnSpec.LONG) { 336 v = Long.valueOf(value); 337 } else if (column.getType().spec == ColumnSpec.TIMESTAMP) { 338 Calendar cal = new GregorianCalendar(); 339 cal.setTime(Timestamp.valueOf(value)); 340 v = cal; 341 } else if (column.getType().spec == ColumnSpec.DOUBLE) { 342 v = Double.valueOf(value); 343 } else { 344 throw new DirectoryException("unrecognized column type: " + column.getType() + ", values: " 345 + record); 346 } 347 column.setToPreparedStatement(ps, i + 1, v); 348 } catch (IllegalArgumentException e) { 349 throw new DirectoryException(String.format( 350 "failed to set column '%s' on table '%s', values: %s", column.getPhysicalName(), 351 table.getPhysicalName(), record), e); 352 } catch (SQLException e) { 353 throw new DirectoryException(String.format("Table '%s' initialization failed: %s, values: %s", 354 table.getPhysicalName(), e.getMessage(), record), e); 355 } 356 } 357 ps.execute(); 358 } 359 } catch (IOException e) { 360 throw new DirectoryException("Read error while reading data file: " + dataFileName, e); 361 } catch (SQLException e) { 362 throw new DirectoryException(String.format("Table '%s' initialization failed: %s", table.getPhysicalName(), 363 e.getMessage()), e); 364 } finally { 365 DirectoryException e = new DirectoryException(); 366 try { 367 if (csvParser != null) { 368 csvParser.close(); 369 } 370 } catch (IOException ioe) { 371 e.addSuppressed(ioe); 372 } 373 try { 374 if (ps != null) { 375 ps.close(); 376 } 377 } catch (SQLException sqle) { 378 e.addSuppressed(sqle); 379 } 380 if (e.getSuppressed().length > 0) { 381 throw e; 382 } 383 } 384 } 385 386 public static Table addTable(String name, Dialect dialect, boolean nativeCase) { 387 String physicalName = dialect.getTableName(name); 388 if (!nativeCase && name.length() == physicalName.length()) { 389 // we can keep the name specified in the config 390 physicalName = name; 391 } 392 return new TableImpl(dialect, physicalName, physicalName); 393 } 394 395 public static Column addColumn(Table table, String fieldName, ColumnType type, boolean nativeCase) { 396 String physicalName = table.getDialect().getColumnName(fieldName); 397 if (!nativeCase && fieldName.length() == physicalName.length()) { 398 // we can keep the name specified in the config 399 physicalName = fieldName; 400 } 401 Column column = new Column(table, physicalName, type, fieldName); 402 return ((TableImpl) table).addColumn(fieldName, column); 403 } 404 405}