001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Olivier Grisel 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.directory.sql; 021 022import java.io.Serializable; 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035 036import org.nuxeo.common.xmap.annotation.XNode; 037import org.nuxeo.common.xmap.annotation.XObject; 038import org.nuxeo.ecm.core.schema.types.SchemaImpl; 039import org.nuxeo.ecm.core.schema.types.primitives.StringType; 040import org.nuxeo.ecm.core.storage.sql.ColumnType; 041import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 042import org.nuxeo.ecm.core.storage.sql.jdbc.db.Delete; 043import org.nuxeo.ecm.core.storage.sql.jdbc.db.Insert; 044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Select; 045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table.IndexType; 047import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 048import org.nuxeo.ecm.directory.AbstractReference; 049import org.nuxeo.ecm.directory.BaseDirectoryDescriptor; 050import org.nuxeo.ecm.directory.Directory; 051import org.nuxeo.ecm.directory.DirectoryCSVLoader; 052import org.nuxeo.ecm.directory.DirectoryException; 053 054@XObject(value = "tableReference") 055public class TableReference extends AbstractReference implements Cloneable { 056 057 @XNode("@field") 058 public void setFieldName(String fieldName) { 059 this.fieldName = fieldName; 060 } 061 062 @Override 063 @XNode("@directory") 064 public void setTargetDirectoryName(String targetDirectoryName) { 065 this.targetDirectoryName = targetDirectoryName; 066 } 067 068 @XNode("@table") 069 protected String tableName; 070 071 @XNode("@sourceColumn") 072 protected String sourceColumn; 073 074 @XNode("@targetColumn") 075 protected String targetColumn; 076 077 @XNode("@dataFile") 078 protected String dataFileName; 079 080 private Table table; 081 082 private Dialect dialect; 083 084 private boolean initialized = false; 085 086 private SQLDirectory getSQLSourceDirectory() throws DirectoryException { 087 Directory dir = getSourceDirectory(); 088 return (SQLDirectory) dir; 089 } 090 091 private void initialize(SQLSession sqlSession) throws DirectoryException { 092 Connection connection = sqlSession.sqlConnection; 093 SQLDirectory directory = getSQLSourceDirectory(); 094 Table table = getTable(); 095 SQLHelper helper = new SQLHelper(connection, table, directory.getDescriptor().getCreateTablePolicy()); 096 boolean loadData = helper.setupTable(); 097 if (loadData && dataFileName != null) { 098 // fake schema for DirectoryCSVLoader.loadData 099 SchemaImpl schema = new SchemaImpl(tableName, null); 100 schema.addField(sourceColumn, StringType.INSTANCE, null, 0, Collections.emptySet()); 101 schema.addField(targetColumn, StringType.INSTANCE, null, 0, Collections.emptySet()); 102 Insert insert = new Insert(table); 103 for (Column column : table.getColumns()) { 104 insert.addColumn(column); 105 } 106 try (PreparedStatement ps = connection.prepareStatement(insert.getStatement())) { 107 Consumer<Map<String, Object>> loader = new Consumer<Map<String, Object>>() { 108 @Override 109 public void accept(Map<String, Object> map) { 110 try { 111 ps.setString(1, (String) map.get(sourceColumn)); 112 ps.setString(2, (String) map.get(targetColumn)); 113 ps.execute(); 114 } catch (SQLException e) { 115 throw new DirectoryException(e); 116 } 117 } 118 }; 119 DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, 120 schema, loader); 121 } catch (SQLException e) { 122 throw new DirectoryException(String.format("Table '%s' initialization failed", tableName), e); 123 } 124 } 125 } 126 127 @Override 128 public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException { 129 if (targetIds == null) { 130 return; 131 } 132 try (SQLSession session = getSQLSession()) { 133 addLinks(sourceId, targetIds, session); 134 } 135 } 136 137 @Override 138 public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException { 139 if (sourceIds == null) { 140 return; 141 } 142 try (SQLSession session = getSQLSession()) { 143 addLinks(sourceIds, targetId, session); 144 } 145 } 146 147 public void addLinks(String sourceId, List<String> targetIds, SQLSession session) throws DirectoryException { 148 if (targetIds == null) { 149 return; 150 } 151 for (String targetId : targetIds) { 152 addLink(sourceId, targetId, session, true); 153 } 154 } 155 156 public void addLinks(List<String> sourceIds, String targetId, SQLSession session) throws DirectoryException { 157 if (sourceIds == null) { 158 return; 159 } 160 for (String sourceId : sourceIds) { 161 addLink(sourceId, targetId, session, true); 162 } 163 } 164 165 public boolean exists(String sourceId, String targetId, SQLSession session) throws DirectoryException { 166 // String selectSql = String.format( 167 // "SELECT COUNT(*) FROM %s WHERE %s = ? AND %s = ?", tableName, 168 // sourceColumn, targetColumn); 169 170 Table table = getTable(); 171 Select select = new Select(table); 172 select.setFrom(table.getQuotedName()); 173 select.setWhat("count(*)"); 174 String whereString = String.format("%s = ? and %s = ?", table.getColumn(sourceColumn).getQuotedName(), 175 table.getColumn(targetColumn).getQuotedName()); 176 177 select.setWhere(whereString); 178 179 String selectSql = select.getStatement(); 180 if (session.logger.isLogEnabled()) { 181 session.logger.logSQL(selectSql, Arrays.<Serializable> asList(sourceId, targetId)); 182 } 183 184 PreparedStatement ps = null; 185 ResultSet rs = null; 186 try { 187 ps = session.sqlConnection.prepareStatement(selectSql); 188 ps.setString(1, sourceId); 189 ps.setString(2, targetId); 190 rs = ps.executeQuery(); 191 rs.next(); 192 return rs.getInt(1) > 0; 193 } catch (SQLException e) { 194 throw new DirectoryException(String.format("error reading link from %s to %s", sourceId, targetId), e); 195 } finally { 196 try { 197 if (rs != null) { 198 rs.close(); 199 } 200 if (ps != null) { 201 ps.close(); 202 } 203 } catch (SQLException sqle) { 204 throw new DirectoryException(sqle); 205 } 206 } 207 } 208 209 public void addLink(String sourceId, String targetId, SQLSession session, boolean checkExisting) 210 throws DirectoryException { 211 // OG: the following query should have avoided the round trips but 212 // does not work for some reason that might be related to a bug in the 213 // JDBC driver: 214 // 215 // String sql = String.format( 216 // "INSERT INTO %s (%s, %s) (SELECT ?, ? FROM %s WHERE %s = ? AND %s = 217 // ? HAVING COUNT(*) = 0)", tableName, sourceColumn, targetColumn, 218 // tableName, sourceColumn, targetColumn); 219 220 // first step: check that this link does not exist yet 221 if (checkExisting && exists(sourceId, targetId, session)) { 222 return; 223 } 224 225 // second step: add the link 226 227 // String insertSql = String.format( 228 // "INSERT INTO %s (%s, %s) VALUES (?, ?)", tableName, 229 // sourceColumn, targetColumn); 230 Table table = getTable(); 231 Insert insert = new Insert(table); 232 insert.addColumn(table.getColumn(sourceColumn)); 233 insert.addColumn(table.getColumn(targetColumn)); 234 String insertSql = insert.getStatement(); 235 if (session.logger.isLogEnabled()) { 236 session.logger.logSQL(insertSql, Arrays.<Serializable> asList(sourceId, targetId)); 237 } 238 239 PreparedStatement ps = null; 240 try { 241 ps = session.sqlConnection.prepareStatement(insertSql); 242 ps.setString(1, sourceId); 243 ps.setString(2, targetId); 244 ps.execute(); 245 } catch (SQLException e) { 246 throw new DirectoryException(String.format("error adding link from %s to %s", sourceId, targetId), e); 247 } finally { 248 try { 249 if (ps != null) { 250 ps.close(); 251 } 252 } catch (SQLException sqle) { 253 throw new DirectoryException(sqle); 254 } 255 } 256 } 257 258 protected List<String> getIdsFor(String valueColumn, String filterColumn, String filterValue) 259 throws DirectoryException { 260 try (SQLSession session = getSQLSession()) { 261 // String sql = String.format("SELECT %s FROM %s WHERE %s = ?", 262 // table.getColumn(valueColumn), tableName, filterColumn); 263 Table table = getTable(); 264 Select select = new Select(table); 265 select.setWhat(table.getColumn(valueColumn).getQuotedName()); 266 select.setFrom(table.getQuotedName()); 267 select.setWhere(table.getColumn(filterColumn).getQuotedName() + " = ?"); 268 269 String sql = select.getStatement(); 270 if (session.logger.isLogEnabled()) { 271 session.logger.logSQL(sql, Collections.<Serializable> singleton(filterValue)); 272 } 273 274 List<String> ids = new LinkedList<String>(); 275 try (PreparedStatement ps = session.sqlConnection.prepareStatement(sql)) { 276 ps.setString(1, filterValue); 277 try (ResultSet rs = ps.executeQuery()) { 278 while (rs.next()) { 279 ids.add(rs.getString(valueColumn)); 280 } 281 return ids; 282 } 283 } catch (SQLException e) { 284 throw new DirectoryException("error fetching reference values: ", e); 285 } 286 } 287 } 288 289 @Override 290 public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException { 291 return getIdsFor(sourceColumn, targetColumn, targetId); 292 } 293 294 @Override 295 public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException { 296 return getIdsFor(targetColumn, sourceColumn, sourceId); 297 } 298 299 public void removeLinksFor(String column, String entryId, SQLSession session) throws DirectoryException { 300 Table table = getTable(); 301 String sql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), table.getColumn(column) 302 .getQuotedName()); 303 if (session.logger.isLogEnabled()) { 304 session.logger.logSQL(sql, Collections.<Serializable> singleton(entryId)); 305 } 306 PreparedStatement ps = null; 307 try { 308 ps = session.sqlConnection.prepareStatement(sql); 309 ps.setString(1, entryId); 310 ps.execute(); 311 } catch (SQLException e) { 312 throw new DirectoryException("error remove links to " + entryId, e); 313 } finally { 314 try { 315 if (ps != null) { 316 ps.close(); 317 } 318 } catch (SQLException sqle) { 319 throw new DirectoryException(sqle); 320 } 321 } 322 } 323 324 public void removeLinksForSource(String sourceId, SQLSession session) throws DirectoryException { 325 maybeInitialize(session); 326 removeLinksFor(sourceColumn, sourceId, session); 327 } 328 329 public void removeLinksForTarget(String targetId, SQLSession session) throws DirectoryException { 330 maybeInitialize(session); 331 removeLinksFor(targetColumn, targetId, session); 332 } 333 334 @Override 335 public void removeLinksForSource(String sourceId) throws DirectoryException { 336 try (SQLSession session = getSQLSession()) { 337 removeLinksForSource(sourceId, session); 338 } 339 } 340 341 @Override 342 public void removeLinksForTarget(String targetId) throws DirectoryException { 343 try (SQLSession session = getSQLSession()) { 344 removeLinksForTarget(targetId, session); 345 } 346 } 347 348 public void setIdsFor(String idsColumn, List<String> ids, String filterColumn, String filterValue, 349 SQLSession session) throws DirectoryException { 350 351 List<String> idsToDelete = new LinkedList<String>(); 352 Set<String> idsToAdd = new HashSet<String>(); 353 if (ids != null) { // ids may be null 354 idsToAdd.addAll(ids); 355 } 356 Table table = getTable(); 357 358 // iterate over existing links to find what to add and what to remove 359 String selectSql = String.format("SELECT %s FROM %s WHERE %s = ?", table.getColumn(idsColumn).getQuotedName(), 360 table.getQuotedName(), table.getColumn(filterColumn).getQuotedName()); 361 PreparedStatement ps = null; 362 try { 363 ps = session.sqlConnection.prepareStatement(selectSql); 364 ps.setString(1, filterValue); 365 ResultSet rs = ps.executeQuery(); 366 while (rs.next()) { 367 String existingId = rs.getString(1); 368 if (idsToAdd.contains(existingId)) { 369 // to not add already existing ids 370 idsToAdd.remove(existingId); 371 } else { 372 // delete unwanted existing ids 373 idsToDelete.add(existingId); 374 } 375 } 376 rs.close(); 377 } catch (SQLException e) { 378 throw new DirectoryException("failed to fetch existing links for " + filterValue, e); 379 } finally { 380 try { 381 if (ps != null) { 382 ps.close(); 383 } 384 } catch (SQLException sqle) { 385 throw new DirectoryException(sqle); 386 } 387 } 388 389 if (!idsToDelete.isEmpty()) { 390 // remove unwanted links 391 392 // String deleteSql = String.format( 393 // "DELETE FROM %s WHERE %s = ? AND %s = ?", tableName, 394 // filterColumn, idsColumn); 395 Delete delete = new Delete(table); 396 String whereString = String.format("%s = ? AND %s = ?", table.getColumn(filterColumn).getQuotedName(), 397 table.getColumn(idsColumn).getQuotedName()); 398 delete.setWhere(whereString); 399 String deleteSql = delete.getStatement(); 400 401 try { 402 ps = session.sqlConnection.prepareStatement(deleteSql); 403 for (String unwantedId : idsToDelete) { 404 if (session.logger.isLogEnabled()) { 405 session.logger.logSQL(deleteSql, Arrays.<Serializable> asList(filterValue, unwantedId)); 406 } 407 ps.setString(1, filterValue); 408 ps.setString(2, unwantedId); 409 ps.execute(); 410 } 411 } catch (SQLException e) { 412 throw new DirectoryException("failed to remove unwanted links for " + filterValue, e); 413 } finally { 414 try { 415 if (ps != null) { 416 ps.close(); 417 } 418 } catch (SQLException sqle) { 419 throw new DirectoryException(sqle); 420 } 421 } 422 } 423 424 if (!idsToAdd.isEmpty()) { 425 // add missing links 426 if (filterColumn.equals(sourceColumn)) { 427 for (String missingId : idsToAdd) { 428 addLink(filterValue, missingId, session, false); 429 } 430 } else { 431 for (String missingId : idsToAdd) { 432 addLink(missingId, filterValue, session, false); 433 } 434 } 435 } 436 } 437 438 public void setSourceIdsForTarget(String targetId, List<String> sourceIds, SQLSession session) 439 throws DirectoryException { 440 setIdsFor(sourceColumn, sourceIds, targetColumn, targetId, session); 441 } 442 443 public void setTargetIdsForSource(String sourceId, List<String> targetIds, SQLSession session) 444 throws DirectoryException { 445 setIdsFor(targetColumn, targetIds, sourceColumn, sourceId, session); 446 } 447 448 @Override 449 public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException { 450 try (SQLSession session = getSQLSession()) { 451 setSourceIdsForTarget(targetId, sourceIds, session); 452 } 453 } 454 455 @Override 456 public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException { 457 try (SQLSession session = getSQLSession()) { 458 setTargetIdsForSource(sourceId, targetIds, session); 459 } 460 } 461 462 // TODO add support for the ListDiff type 463 464 protected SQLSession getSQLSession() throws DirectoryException { 465 if (!initialized) { 466 try (SQLSession sqlSession = (SQLSession) getSourceDirectory().getSession()) { 467 initialize(sqlSession); 468 initialized = true; 469 } 470 } 471 return (SQLSession) getSourceDirectory().getSession(); 472 } 473 474 /** 475 * Initialize if needed, using an existing session. 476 * 477 * @param sqlSession 478 * @throws DirectoryException 479 */ 480 protected void maybeInitialize(SQLSession sqlSession) throws DirectoryException { 481 if (!initialized) { 482 initialize(sqlSession); 483 initialized = true; 484 } 485 } 486 487 public Table getTable() throws DirectoryException { 488 if (table == null) { 489 boolean nativeCase = getSQLSourceDirectory().useNativeCase(); 490 table = SQLHelper.addTable(tableName, getDialect(), nativeCase); 491 SQLHelper.addColumn(table, sourceColumn, ColumnType.STRING, nativeCase); 492 SQLHelper.addColumn(table, targetColumn, ColumnType.STRING, nativeCase); 493 // index added for Azure 494 table.addIndex(null, IndexType.MAIN_NON_PRIMARY, sourceColumn); 495 } 496 return table; 497 } 498 499 private Dialect getDialect() throws DirectoryException { 500 if (dialect == null) { 501 dialect = getSQLSourceDirectory().getDialect(); 502 } 503 return dialect; 504 } 505 506 public String getSourceColumn() { 507 return sourceColumn; 508 } 509 510 public String getTargetColumn() { 511 return targetColumn; 512 } 513 514 public String getTargetDirectoryName() { 515 return targetDirectoryName; 516 } 517 518 public String getTableName() { 519 return tableName; 520 } 521 522 public String getDataFileName() { 523 return dataFileName; 524 } 525 526 /** 527 * @since 5.6 528 */ 529 @Override 530 public TableReference clone() { 531 TableReference clone = (TableReference) super.clone(); 532 // basic fields are already copied by super.clone() 533 return clone; 534 } 535 536}