001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Olivier Grisel 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.directory.sql; 021 022import java.io.Serializable; 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.function.Consumer; 035 036import org.nuxeo.ecm.core.schema.types.SchemaImpl; 037import org.nuxeo.ecm.core.schema.types.primitives.StringType; 038import org.nuxeo.ecm.core.storage.sql.ColumnType; 039import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 040import org.nuxeo.ecm.core.storage.sql.jdbc.db.Delete; 041import org.nuxeo.ecm.core.storage.sql.jdbc.db.Insert; 042import org.nuxeo.ecm.core.storage.sql.jdbc.db.Select; 043import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table.IndexType; 045import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 046import org.nuxeo.ecm.directory.AbstractReference; 047import org.nuxeo.ecm.directory.BaseDirectoryDescriptor; 048import org.nuxeo.ecm.directory.Directory; 049import org.nuxeo.ecm.directory.DirectoryCSVLoader; 050import org.nuxeo.ecm.directory.DirectoryException; 051import org.nuxeo.ecm.directory.ReferenceDescriptor; 052import org.nuxeo.ecm.directory.Session; 053 054public class TableReference extends AbstractReference { 055 056 protected String tableName; 057 058 protected String sourceColumn; 059 060 protected String targetColumn; 061 062 protected String dataFileName; 063 064 private Table table; 065 066 private Dialect dialect; 067 068 private boolean initialized = false; 069 070 /** 071 * @since 9.2 072 */ 073 public TableReference(String fieldName, String directory, String tableName, String sourceColumn, 074 String targetColumn, String dataFileName) { 075 super(fieldName, directory); 076 this.tableName = tableName; 077 this.sourceColumn = sourceColumn; 078 this.targetColumn = targetColumn; 079 this.dataFileName = dataFileName; 080 } 081 082 /** 083 * @since 9.2 084 */ 085 public TableReference(TableReferenceDescriptor descriptor) { 086 this(descriptor.getFieldName(), descriptor.getTargetDirectoryName(), descriptor.getTableName(), 087 descriptor.getSourceColumn(), descriptor.getTargetColumn(), descriptor.getDataFileName()); 088 } 089 090 /** 091 * @since 9.2 092 */ 093 public TableReference(ReferenceDescriptor descriptor) { 094 this(descriptor.getFieldName(), descriptor.getDirectory(), descriptor.getReferenceName(), 095 descriptor.getSource(), descriptor.getTarget(), descriptor.getDataFileName()); 096 } 097 098 private SQLDirectory getSQLSourceDirectory() throws DirectoryException { 099 Directory dir = getSourceDirectory(); 100 return (SQLDirectory) dir; 101 } 102 103 private void initialize(SQLSession sqlSession) throws DirectoryException { 104 Connection connection = sqlSession.sqlConnection; 105 SQLDirectory directory = getSQLSourceDirectory(); 106 Table table = getTable(); 107 SQLHelper helper = new SQLHelper(connection, table, directory.getDescriptor().getCreateTablePolicy()); 108 boolean loadData = helper.setupTable(); 109 if (loadData && dataFileName != null) { 110 // fake schema for DirectoryCSVLoader.loadData 111 SchemaImpl schema = new SchemaImpl(tableName, null); 112 schema.addField(sourceColumn, StringType.INSTANCE, null, 0, Collections.emptySet()); 113 schema.addField(targetColumn, StringType.INSTANCE, null, 0, Collections.emptySet()); 114 Insert insert = new Insert(table); 115 for (Column column : table.getColumns()) { 116 insert.addColumn(column); 117 } 118 try (PreparedStatement ps = connection.prepareStatement(insert.getStatement())) { 119 Consumer<Map<String, Object>> loader = new Consumer<Map<String, Object>>() { 120 @Override 121 public void accept(Map<String, Object> map) { 122 try { 123 ps.setString(1, (String) map.get(sourceColumn)); 124 ps.setString(2, (String) map.get(targetColumn)); 125 ps.execute(); 126 } catch (SQLException e) { 127 throw new DirectoryException(e); 128 } 129 } 130 }; 131 DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR, 132 schema, loader); 133 } catch (SQLException e) { 134 throw new DirectoryException(String.format("Table '%s' initialization failed", tableName), e); 135 } 136 } 137 } 138 139 @Override 140 public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException { 141 if (targetIds == null) { 142 return; 143 } 144 try (SQLSession session = getSQLSession()) { 145 addLinks(sourceId, targetIds, session); 146 } 147 } 148 149 @Override 150 public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException { 151 if (sourceIds == null) { 152 return; 153 } 154 try (SQLSession session = getSQLSession()) { 155 addLinks(sourceIds, targetId, session); 156 } 157 } 158 159 @Override 160 public void addLinks(String sourceId, List<String> targetIds, Session session) throws DirectoryException { 161 if (targetIds == null) { 162 return; 163 } 164 SQLSession sqlSession = (SQLSession) session; 165 maybeInitialize(sqlSession); 166 for (String targetId : targetIds) { 167 addLink(sourceId, targetId, sqlSession, true); 168 } 169 } 170 171 @Override 172 public void addLinks(List<String> sourceIds, String targetId, Session session) throws DirectoryException { 173 if (sourceIds == null) { 174 return; 175 } 176 SQLSession sqlSession = (SQLSession) session; 177 maybeInitialize(sqlSession); 178 for (String sourceId : sourceIds) { 179 addLink(sourceId, targetId, sqlSession, true); 180 } 181 } 182 183 public boolean exists(String sourceId, String targetId, SQLSession session) throws DirectoryException { 184 // "SELECT COUNT(*) FROM %s WHERE %s = ? AND %s = ?", tableName, sourceColumn, targetColumn 185 186 Table table = getTable(); 187 Select select = new Select(table); 188 select.setFrom(table.getQuotedName()); 189 select.setWhat("count(*)"); 190 String whereString = String.format("%s = ? and %s = ?", table.getColumn(sourceColumn).getQuotedName(), 191 table.getColumn(targetColumn).getQuotedName()); 192 193 select.setWhere(whereString); 194 195 String selectSql = select.getStatement(); 196 if (session.logger.isLogEnabled()) { 197 session.logger.logSQL(selectSql, Arrays.<Serializable> asList(sourceId, targetId)); 198 } 199 200 try (PreparedStatement ps = session.sqlConnection.prepareStatement(selectSql)) { 201 ps.setString(1, sourceId); 202 ps.setString(2, targetId); 203 try (ResultSet rs = ps.executeQuery()) { 204 rs.next(); 205 return rs.getInt(1) > 0; 206 } 207 } catch (SQLException e) { 208 throw new DirectoryException(String.format("error reading link from %s to %s", sourceId, targetId), e); 209 } 210 } 211 212 public void addLink(String sourceId, String targetId, SQLSession session, boolean checkExisting) 213 throws DirectoryException { 214 // OG: the following query should have avoided the round trips but 215 // does not work for some reason that might be related to a bug in the 216 // JDBC driver: 217 // "INSERT INTO %s (%s, %s) (SELECT ?, ? FROM %s WHERE %s = ? AND %s = 218 // ? HAVING COUNT(*) = 0)", tableName, sourceColumn, targetColumn, 219 // tableName, sourceColumn, targetColumn 220 221 // first step: check that this link does not exist yet 222 if (checkExisting && exists(sourceId, targetId, session)) { 223 return; 224 } 225 226 // second step: add the link 227 228 // "INSERT INTO %s (%s, %s) VALUES (?, ?)", tableName, sourceColumn, targetColumn 229 Table table = getTable(); 230 Insert insert = new Insert(table); 231 insert.addColumn(table.getColumn(sourceColumn)); 232 insert.addColumn(table.getColumn(targetColumn)); 233 String insertSql = insert.getStatement(); 234 if (session.logger.isLogEnabled()) { 235 session.logger.logSQL(insertSql, Arrays.<Serializable> asList(sourceId, targetId)); 236 } 237 238 try (PreparedStatement ps = session.sqlConnection.prepareStatement(insertSql)) { 239 ps.setString(1, sourceId); 240 ps.setString(2, targetId); 241 ps.execute(); 242 } catch (SQLException e) { 243 throw new DirectoryException(String.format("error adding link from %s to %s", sourceId, targetId), e); 244 } 245 } 246 247 protected List<String> getIdsFor(String valueColumn, String filterColumn, String filterValue) 248 throws DirectoryException { 249 try (SQLSession session = getSQLSession()) { 250 // "SELECT %s FROM %s WHERE %s = ?", table.getColumn(valueColumn), tableName, filterColumn 251 Table table = getTable(); 252 Select select = new Select(table); 253 select.setWhat(table.getColumn(valueColumn).getQuotedName()); 254 select.setFrom(table.getQuotedName()); 255 select.setWhere(table.getColumn(filterColumn).getQuotedName() + " = ?"); 256 257 String sql = select.getStatement(); 258 if (session.logger.isLogEnabled()) { 259 session.logger.logSQL(sql, Collections.<Serializable> singleton(filterValue)); 260 } 261 262 List<String> ids = new LinkedList<String>(); 263 try (PreparedStatement ps = session.sqlConnection.prepareStatement(sql)) { 264 ps.setString(1, filterValue); 265 try (ResultSet rs = ps.executeQuery()) { 266 while (rs.next()) { 267 ids.add(rs.getString(valueColumn)); 268 } 269 return ids; 270 } 271 } catch (SQLException e) { 272 throw new DirectoryException("error fetching reference values: ", e); 273 } 274 } 275 } 276 277 @Override 278 public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException { 279 return getIdsFor(sourceColumn, targetColumn, targetId); 280 } 281 282 @Override 283 public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException { 284 return getIdsFor(targetColumn, sourceColumn, sourceId); 285 } 286 287 public void removeLinksFor(String column, String entryId, SQLSession session) throws DirectoryException { 288 Table table = getTable(); 289 String sql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), table.getColumn(column) 290 .getQuotedName()); 291 if (session.logger.isLogEnabled()) { 292 session.logger.logSQL(sql, Collections.<Serializable> singleton(entryId)); 293 } 294 try (PreparedStatement ps = session.sqlConnection.prepareStatement(sql)) { 295 ps.setString(1, entryId); 296 ps.execute(); 297 } catch (SQLException e) { 298 throw new DirectoryException("error remove links to " + entryId, e); 299 } 300 } 301 302 @Override 303 public void removeLinksForSource(String sourceId, Session session) throws DirectoryException { 304 SQLSession sqlSession = (SQLSession) session; 305 maybeInitialize(sqlSession); 306 removeLinksFor(sourceColumn, sourceId, sqlSession); 307 } 308 309 @Override 310 public void removeLinksForTarget(String targetId, Session session) throws DirectoryException { 311 SQLSession sqlSession = (SQLSession) session; 312 maybeInitialize(sqlSession); 313 removeLinksFor(targetColumn, targetId, sqlSession); 314 } 315 316 @Override 317 public void removeLinksForSource(String sourceId) throws DirectoryException { 318 try (SQLSession session = getSQLSession()) { 319 removeLinksForSource(sourceId, session); 320 } 321 } 322 323 @Override 324 public void removeLinksForTarget(String targetId) throws DirectoryException { 325 try (SQLSession session = getSQLSession()) { 326 removeLinksForTarget(targetId, session); 327 } 328 } 329 330 public void setIdsFor(String idsColumn, List<String> ids, String filterColumn, String filterValue, 331 SQLSession session) throws DirectoryException { 332 333 List<String> idsToDelete = new LinkedList<String>(); 334 Set<String> idsToAdd = new HashSet<String>(); 335 if (ids != null) { // ids may be null 336 idsToAdd.addAll(ids); 337 } 338 Table table = getTable(); 339 340 // iterate over existing links to find what to add and what to remove 341 String selectSql = String.format("SELECT %s FROM %s WHERE %s = ?", table.getColumn(idsColumn).getQuotedName(), 342 table.getQuotedName(), table.getColumn(filterColumn).getQuotedName()); 343 try (PreparedStatement ps = session.sqlConnection.prepareStatement(selectSql)) { 344 ps.setString(1, filterValue); 345 try (ResultSet rs = ps.executeQuery()) { 346 while (rs.next()) { 347 String existingId = rs.getString(1); 348 if (idsToAdd.contains(existingId)) { 349 // to not add already existing ids 350 idsToAdd.remove(existingId); 351 } else { 352 // delete unwanted existing ids 353 idsToDelete.add(existingId); 354 } 355 } 356 } 357 } catch (SQLException e) { 358 throw new DirectoryException("failed to fetch existing links for " + filterValue, e); 359 } 360 361 if (!idsToDelete.isEmpty()) { 362 // remove unwanted links 363 364 // "DELETE FROM %s WHERE %s = ? AND %s = ?", tableName, filterColumn, idsColumn); 365 Delete delete = new Delete(table); 366 String whereString = String.format("%s = ? AND %s = ?", table.getColumn(filterColumn).getQuotedName(), 367 table.getColumn(idsColumn).getQuotedName()); 368 delete.setWhere(whereString); 369 String deleteSql = delete.getStatement(); 370 371 try (PreparedStatement ps = session.sqlConnection.prepareStatement(deleteSql)) { 372 for (String unwantedId : idsToDelete) { 373 if (session.logger.isLogEnabled()) { 374 session.logger.logSQL(deleteSql, Arrays.<Serializable> asList(filterValue, unwantedId)); 375 } 376 ps.setString(1, filterValue); 377 ps.setString(2, unwantedId); 378 ps.execute(); 379 } 380 } catch (SQLException e) { 381 throw new DirectoryException("failed to remove unwanted links for " + filterValue, e); 382 } 383 } 384 385 if (!idsToAdd.isEmpty()) { 386 // add missing links 387 if (filterColumn.equals(sourceColumn)) { 388 for (String missingId : idsToAdd) { 389 addLink(filterValue, missingId, session, false); 390 } 391 } else { 392 for (String missingId : idsToAdd) { 393 addLink(missingId, filterValue, session, false); 394 } 395 } 396 } 397 } 398 399 public void setSourceIdsForTarget(String targetId, List<String> sourceIds, SQLSession session) 400 throws DirectoryException { 401 setIdsFor(sourceColumn, sourceIds, targetColumn, targetId, session); 402 } 403 404 public void setTargetIdsForSource(String sourceId, List<String> targetIds, SQLSession session) 405 throws DirectoryException { 406 setIdsFor(targetColumn, targetIds, sourceColumn, sourceId, session); 407 } 408 409 @Override 410 public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException { 411 try (SQLSession session = getSQLSession()) { 412 setSourceIdsForTarget(targetId, sourceIds, session); 413 } 414 } 415 416 @Override 417 public void setSourceIdsForTarget(String targetId, List<String> sourceIds, Session session) 418 throws DirectoryException { 419 SQLSession sqlSession = (SQLSession) session; 420 maybeInitialize(sqlSession); 421 setSourceIdsForTarget(targetId, sourceIds, sqlSession); 422 } 423 424 @Override 425 public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException { 426 try (SQLSession session = getSQLSession()) { 427 setTargetIdsForSource(sourceId, targetIds, session); 428 } 429 } 430 431 @Override 432 public void setTargetIdsForSource(String sourceId, List<String> targetIds, Session session) 433 throws DirectoryException { 434 SQLSession sqlSession = (SQLSession) session; 435 maybeInitialize(sqlSession); 436 setTargetIdsForSource(sourceId, targetIds, sqlSession); 437 } 438 439 // TODO add support for the ListDiff type 440 441 protected SQLSession getSQLSession() throws DirectoryException { 442 if (!initialized) { 443 try (SQLSession sqlSession = (SQLSession) getSourceDirectory().getSession()) { 444 initialize(sqlSession); 445 initialized = true; 446 } 447 } 448 return (SQLSession) getSourceDirectory().getSession(); 449 } 450 451 /** 452 * Initialize if needed, using an existing session. 453 * 454 * @param sqlSession 455 * @throws DirectoryException 456 */ 457 protected void maybeInitialize(SQLSession sqlSession) throws DirectoryException { 458 if (!initialized) { 459 initialize(sqlSession); 460 initialized = true; 461 } 462 } 463 464 public Table getTable() throws DirectoryException { 465 if (table == null) { 466 boolean nativeCase = getSQLSourceDirectory().useNativeCase(); 467 table = SQLHelper.addTable(tableName, getDialect(), nativeCase); 468 SQLHelper.addColumn(table, sourceColumn, ColumnType.STRING, nativeCase); 469 SQLHelper.addColumn(table, targetColumn, ColumnType.STRING, nativeCase); 470 // index added for Azure 471 table.addIndex(null, IndexType.MAIN_NON_PRIMARY, sourceColumn); 472 } 473 return table; 474 } 475 476 private Dialect getDialect() throws DirectoryException { 477 if (dialect == null) { 478 dialect = getSQLSourceDirectory().getDialect(); 479 } 480 return dialect; 481 } 482 483 public String getSourceColumn() { 484 return sourceColumn; 485 } 486 487 public String getTargetColumn() { 488 return targetColumn; 489 } 490 491 public String getTargetDirectoryName() { 492 return targetDirectoryName; 493 } 494 495 public String getTableName() { 496 return tableName; 497 } 498 499 public String getDataFileName() { 500 return dataFileName; 501 } 502 503}