001/* 002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * Florent Guillaume 011 * Benoit Delbosc 012 */ 013package org.nuxeo.ecm.core.storage.sql.jdbc; 014 015import java.io.Serializable; 016import java.security.MessageDigest; 017import java.security.NoSuchAlgorithmException; 018import java.sql.Array; 019import java.sql.Connection; 020import java.sql.DatabaseMetaData; 021import java.sql.PreparedStatement; 022import java.sql.ResultSet; 023import java.sql.SQLException; 024import java.sql.Statement; 025import java.sql.Types; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Calendar; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Set; 038import java.util.concurrent.Callable; 039 040import javax.sql.XADataSource; 041import javax.transaction.xa.XAException; 042import javax.transaction.xa.XAResource; 043import javax.transaction.xa.Xid; 044 045import org.apache.commons.logging.Log; 046import org.apache.commons.logging.LogFactory; 047import org.nuxeo.common.utils.StringUtils; 048import org.nuxeo.ecm.core.api.IterableQueryResult; 049import org.nuxeo.ecm.core.api.Lock; 050import org.nuxeo.ecm.core.api.NuxeoException; 051import org.nuxeo.ecm.core.api.PartialList; 052import org.nuxeo.ecm.core.blob.BlobManager; 053import org.nuxeo.ecm.core.model.LockManager; 054import org.nuxeo.ecm.core.query.QueryFilter; 055import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 056import org.nuxeo.ecm.core.storage.sql.ColumnType; 057import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 058import org.nuxeo.ecm.core.storage.sql.Invalidations; 059import org.nuxeo.ecm.core.storage.sql.Mapper; 060import org.nuxeo.ecm.core.storage.sql.Model; 061import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 062import org.nuxeo.ecm.core.storage.sql.Row; 063import org.nuxeo.ecm.core.storage.sql.RowId; 064import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 065import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 066import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 067import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 068import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 069import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 070import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 071import org.nuxeo.runtime.api.Framework; 072 073/** 074 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 075 * computes statements. 076 * <p> 077 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 078 * statements recorded in the {@link SQLInfo}. 079 */ 080public class JDBCMapper extends JDBCRowMapper implements Mapper { 081 082 private static final Log log = LogFactory.getLog(JDBCMapper.class); 083 084 public static Map<String, Serializable> testProps = new HashMap<String, Serializable>(); 085 086 public static final String TEST_UPGRADE = "testUpgrade"; 087 088 // property in sql.txt file 089 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 090 091 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 092 093 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 094 095 public static final String TEST_UPGRADE_FULLTEXT = "testUpgradeFulltext"; 096 097 protected TableUpgrader tableUpgrader; 098 099 private final QueryMakerService queryMakerService; 100 101 private final PathResolver pathResolver; 102 103 private final RepositoryImpl repository; 104 105 protected boolean clusteringEnabled; 106 107 /** 108 * Creates a new Mapper. 109 * 110 * @param model the model 111 * @param pathResolver the path resolver (used for startswith queries) 112 * @param sqlInfo the sql info 113 * @param xadatasource the XA datasource to use to get connections 114 * @param clusterInvalidator the cluster invalidator 115 * @param repository 116 */ 117 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, XADataSource xadatasource, 118 ClusterInvalidator clusterInvalidator, boolean noSharing, RepositoryImpl repository) { 119 super(model, sqlInfo, xadatasource, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing); 120 this.pathResolver = pathResolver; 121 this.repository = repository; 122 clusteringEnabled = clusterInvalidator != null; 123 queryMakerService = Framework.getService(QueryMakerService.class); 124 125 tableUpgrader = new TableUpgrader(this); 126 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 127 TEST_UPGRADE_VERSIONS); 128 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 129 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 130 131 } 132 133 @Override 134 public int getTableSize(String tableName) { 135 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 136 } 137 138 /* 139 * ----- Root ----- 140 */ 141 142 @Override 143 public void createDatabase() { 144 try { 145 createTables(); 146 } catch (SQLException e) { 147 throw new NuxeoException(e); 148 } 149 } 150 151 protected String getTableName(String origName) { 152 153 if (dialect instanceof DialectOracle) { 154 if (origName.length() > 30) { 155 156 StringBuilder sb = new StringBuilder(origName.length()); 157 158 try { 159 MessageDigest digest = MessageDigest.getInstance("MD5"); 160 sb.append(origName.substring(0, 15)); 161 sb.append('_'); 162 163 digest.update(origName.getBytes()); 164 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 165 166 return sb.toString(); 167 168 } catch (NoSuchAlgorithmException e) { 169 throw new RuntimeException("Error", e); 170 } 171 } 172 } 173 174 return origName; 175 } 176 177 protected void createTables() throws SQLException { 178 sqlInfo.executeSQLStatements(null, this); // for missing category 179 sqlInfo.executeSQLStatements("first", this); 180 sqlInfo.executeSQLStatements("beforeTableCreation", this); 181 if (testProps.containsKey(TEST_UPGRADE)) { 182 // create "old" tables 183 sqlInfo.executeSQLStatements("testUpgrade", this); 184 } 185 186 String schemaName = dialect.getConnectionSchema(connection); 187 DatabaseMetaData metadata = connection.getMetaData(); 188 Set<String> tableNames = findTableNames(metadata, schemaName); 189 Database database = sqlInfo.getDatabase(); 190 Map<String, List<Column>> added = new HashMap<String, List<Column>>(); 191 192 Statement st = null; 193 ResultSet rs = null; 194 try { 195 st = connection.createStatement(); 196 for (Table table : database.getTables()) { 197 String tableName = getTableName(table.getPhysicalName()); 198 if (tableNames.contains(tableName.toUpperCase())) { 199 dialect.existingTableDetected(connection, table, model, sqlInfo.database); 200 } else { 201 202 /* 203 * Create missing table. 204 */ 205 206 boolean create = dialect.preCreateTable(connection, table, model, sqlInfo.database); 207 if (!create) { 208 log.warn("Creation skipped for table: " + tableName); 209 continue; 210 } 211 212 String sql = table.getCreateSql(); 213 logger.log(sql); 214 try { 215 st.execute(sql); 216 countExecute(); 217 } catch (SQLException e) { 218 try { 219 closeStatement(st); 220 } finally { 221 throw new SQLException("Error creating table: " + sql + " : " + e.getMessage(), e); 222 } 223 } 224 225 for (String s : table.getPostCreateSqls(model)) { 226 logger.log(s); 227 try { 228 st.execute(s); 229 countExecute(); 230 } catch (SQLException e) { 231 throw new SQLException("Error post creating table: " + s + " : " + e.getMessage(), e); 232 } 233 } 234 for (String s : dialect.getPostCreateTableSqls(table, model, sqlInfo.database)) { 235 logger.log(s); 236 try { 237 st.execute(s); 238 countExecute(); 239 } catch (SQLException e) { 240 throw new SQLException("Error post creating table: " + s + " : " + e.getMessage(), e); 241 } 242 } 243 added.put(table.getKey(), null); // null = table created 244 } 245 246 /* 247 * Get existing columns. 248 */ 249 250 rs = metadata.getColumns(null, schemaName, tableName, "%"); 251 Map<String, Integer> columnTypes = new HashMap<String, Integer>(); 252 Map<String, String> columnTypeNames = new HashMap<String, String>(); 253 Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>(); 254 while (rs.next()) { 255 String schema = rs.getString("TABLE_SCHEM"); 256 if (schema != null) { // null for MySQL, doh! 257 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 258 // H2 returns some system tables (locks) 259 continue; 260 } 261 } 262 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 263 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 264 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 265 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 266 } 267 rs.close(); 268 /* 269 * Update types and create missing columns. 270 */ 271 272 List<Column> addedColumns = new LinkedList<Column>(); 273 for (Column column : table.getColumns()) { 274 String upperName = column.getPhysicalName().toUpperCase(); 275 Integer type = columnTypes.remove(upperName); 276 if (type == null) { 277 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 278 String sql = table.getAddColumnSql(column); 279 logger.log(sql); 280 try { 281 st.execute(sql); 282 countExecute(); 283 } catch (SQLException e) { 284 throw new SQLException("Error adding column: " + sql + " : " + e.getMessage(), e); 285 } 286 for (String s : table.getPostAddSqls(column, model)) { 287 logger.log(s); 288 try { 289 st.execute(s); 290 countExecute(); 291 } catch (SQLException e) { 292 throw new SQLException("Error post adding column: " + s + " : " + e.getMessage(), e); 293 } 294 } 295 addedColumns.add(column); 296 } else { 297 int expected = column.getJdbcType(); 298 int actual = type.intValue(); 299 String actualName = columnTypeNames.get(upperName); 300 Integer actualSize = columnTypeSizes.get(upperName); 301 if (!column.setJdbcType(actual, actualName, actualSize.intValue())) { 302 log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)", 303 column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, actualSize)); 304 } 305 } 306 } 307 for (String col : dialect.getIgnoredColumns(table)) { 308 columnTypes.remove(col.toUpperCase()); 309 } 310 if (!columnTypes.isEmpty()) { 311 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 312 + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", ")); 313 } 314 if (!addedColumns.isEmpty()) { 315 if (added.containsKey(table.getKey())) { 316 throw new AssertionError(); 317 } 318 added.put(table.getKey(), addedColumns); 319 } 320 } 321 } finally { 322 try { 323 closeStatement(st, rs); 324 } catch (SQLException e) { 325 log.error(e.getMessage(), e); 326 } 327 } 328 329 if (testProps.containsKey(TEST_UPGRADE)) { 330 // create "old" content in tables 331 sqlInfo.executeSQLStatements("testUpgradeOldTables", this); 332 } 333 334 // run upgrade for each table if added columns or test 335 for (Entry<String, List<Column>> en : added.entrySet()) { 336 List<Column> addedColumns = en.getValue(); 337 String tableKey = en.getKey(); 338 upgradeTable(tableKey, addedColumns); 339 } 340 sqlInfo.executeSQLStatements("afterTableCreation", this); 341 sqlInfo.executeSQLStatements("last", this); 342 dialect.performAdditionalStatements(connection); 343 } 344 345 protected void upgradeTable(String tableKey, List<Column> addedColumns) throws SQLException { 346 tableUpgrader.upgrade(tableKey, addedColumns); 347 } 348 349 /** Finds uppercase table names. */ 350 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 351 Set<String> tableNames = new HashSet<String>(); 352 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 353 while (rs.next()) { 354 String tableName = rs.getString("TABLE_NAME"); 355 tableNames.add(tableName.toUpperCase()); 356 } 357 rs.close(); 358 return tableNames; 359 } 360 361 @Override 362 public int getClusterNodeIdType() { 363 return sqlInfo.getClusterNodeIdType(); 364 } 365 366 @Override 367 public void createClusterNode(Serializable nodeId) { 368 Calendar now = Calendar.getInstance(); 369 try { 370 String sql = sqlInfo.getCreateClusterNodeSql(); 371 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 372 PreparedStatement ps = connection.prepareStatement(sql); 373 try { 374 if (logger.isLogEnabled()) { 375 logger.logSQL(sql, Arrays.asList(nodeId, now)); 376 } 377 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 378 columns.get(1).setToPreparedStatement(ps, 2, now); 379 ps.execute(); 380 } finally { 381 closeStatement(ps); 382 } 383 } catch (SQLException e) { 384 throw new NuxeoException(e); 385 } 386 } 387 388 @Override 389 public void removeClusterNode(Serializable nodeId) { 390 try { 391 // delete from cluster_nodes 392 String sql = sqlInfo.getDeleteClusterNodeSql(); 393 Column column = sqlInfo.getDeleteClusterNodeColumn(); 394 PreparedStatement ps = connection.prepareStatement(sql); 395 try { 396 if (logger.isLogEnabled()) { 397 logger.logSQL(sql, Arrays.asList(nodeId)); 398 } 399 column.setToPreparedStatement(ps, 1, nodeId); 400 ps.execute(); 401 } finally { 402 closeStatement(ps); 403 } 404 // delete un-processed invals from cluster_invals 405 deleteClusterInvals(nodeId); 406 } catch (SQLException e) { 407 throw new NuxeoException(e); 408 } 409 } 410 411 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 412 String sql = sqlInfo.getDeleteClusterInvalsSql(); 413 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 414 PreparedStatement ps = connection.prepareStatement(sql); 415 try { 416 if (logger.isLogEnabled()) { 417 logger.logSQL(sql, Arrays.asList(nodeId)); 418 } 419 column.setToPreparedStatement(ps, 1, nodeId); 420 int n = ps.executeUpdate(); 421 countExecute(); 422 if (logger.isLogEnabled()) { 423 logger.logCount(n); 424 } 425 } finally { 426 try { 427 closeStatement(ps); 428 } catch (SQLException e) { 429 log.error("deleteClusterInvals: " + e.getMessage(), e); 430 } 431 } 432 } 433 434 @Override 435 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 436 String sql = dialect.getClusterInsertInvalidations(); 437 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 438 PreparedStatement ps = null; 439 try { 440 ps = connection.prepareStatement(sql); 441 int kind = Invalidations.MODIFIED; 442 while (true) { 443 Set<RowId> rowIds = invalidations.getKindSet(kind); 444 445 // reorganize by id 446 Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>(); 447 for (RowId rowId : rowIds) { 448 Set<String> tableNames = res.get(rowId.id); 449 if (tableNames == null) { 450 res.put(rowId.id, tableNames = new HashSet<String>()); 451 } 452 tableNames.add(rowId.tableName); 453 } 454 455 // do inserts 456 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 457 Serializable id = en.getKey(); 458 String fragments = join(en.getValue(), ' '); 459 if (logger.isLogEnabled()) { 460 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 461 } 462 Serializable frags; 463 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 464 frags = fragments.split(" "); 465 } else { 466 frags = fragments; 467 } 468 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 469 columns.get(1).setToPreparedStatement(ps, 2, id); 470 columns.get(2).setToPreparedStatement(ps, 3, frags); 471 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 472 ps.execute(); 473 countExecute(); 474 } 475 if (kind == Invalidations.MODIFIED) { 476 kind = Invalidations.DELETED; 477 } else { 478 break; 479 } 480 } 481 } catch (SQLException e) { 482 throw new NuxeoException("Could not invalidate", e); 483 } finally { 484 try { 485 closeStatement(ps); 486 } catch (SQLException e) { 487 log.error(e.getMessage(), e); 488 } 489 } 490 } 491 492 // join that works on a set 493 protected static final String join(Collection<String> strings, char sep) { 494 if (strings.isEmpty()) { 495 throw new RuntimeException(); 496 } 497 if (strings.size() == 1) { 498 return strings.iterator().next(); 499 } 500 int size = 0; 501 for (String word : strings) { 502 size += word.length() + 1; 503 } 504 StringBuilder buf = new StringBuilder(size); 505 for (String word : strings) { 506 buf.append(word); 507 buf.append(sep); 508 } 509 buf.setLength(size - 1); 510 return buf.toString(); 511 } 512 513 @Override 514 public Invalidations getClusterInvalidations(Serializable nodeId) { 515 Invalidations invalidations = new Invalidations(); 516 String sql = dialect.getClusterGetInvalidations(); 517 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 518 try { 519 if (logger.isLogEnabled()) { 520 logger.logSQL(sql, Arrays.asList(nodeId)); 521 } 522 PreparedStatement ps = connection.prepareStatement(sql); 523 ResultSet rs = null; 524 try { 525 setToPreparedStatement(ps, 1, nodeId); 526 rs = ps.executeQuery(); 527 countExecute(); 528 while (rs.next()) { 529 // first column ignored, it's the node id 530 Serializable id = columns.get(1).getFromResultSet(rs, 1); 531 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 532 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 533 String[] fragments; 534 if (dialect.supportsArrays() && frags instanceof String[]) { 535 fragments = (String[]) frags; 536 } else { 537 fragments = ((String) frags).split(" "); 538 } 539 invalidations.add(id, fragments, kind); 540 } 541 } finally { 542 closeStatement(ps, rs); 543 } 544 if (logger.isLogEnabled()) { 545 // logCount(n); 546 logger.log(" -> " + invalidations); 547 } 548 if (dialect.isClusteringDeleteNeeded()) { 549 deleteClusterInvals(nodeId); 550 } 551 return invalidations; 552 } catch (SQLException e) { 553 throw new NuxeoException("Could not invalidate", e); 554 } 555 } 556 557 @Override 558 public Serializable getRootId(String repositoryId) { 559 String sql = sqlInfo.getSelectRootIdSql(); 560 try { 561 if (logger.isLogEnabled()) { 562 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 563 } 564 PreparedStatement ps = connection.prepareStatement(sql); 565 ResultSet rs = null; 566 try { 567 ps.setString(1, repositoryId); 568 rs = ps.executeQuery(); 569 countExecute(); 570 if (!rs.next()) { 571 if (logger.isLogEnabled()) { 572 logger.log(" -> (none)"); 573 } 574 return null; 575 } 576 Column column = sqlInfo.getSelectRootIdWhatColumn(); 577 Serializable id = column.getFromResultSet(rs, 1); 578 if (logger.isLogEnabled()) { 579 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 580 } 581 // check that we didn't get several rows 582 if (rs.next()) { 583 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 584 } 585 return id; 586 } finally { 587 closeStatement(ps, rs); 588 } 589 } catch (SQLException e) { 590 throw new NuxeoException("Could not select: " + sql, e); 591 } 592 } 593 594 @Override 595 public void setRootId(Serializable repositoryId, Serializable id) { 596 String sql = sqlInfo.getInsertRootIdSql(); 597 try { 598 PreparedStatement ps = connection.prepareStatement(sql); 599 try { 600 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 601 List<Serializable> debugValues = null; 602 if (logger.isLogEnabled()) { 603 debugValues = new ArrayList<Serializable>(2); 604 } 605 int i = 0; 606 for (Column column : columns) { 607 i++; 608 String key = column.getKey(); 609 Serializable v; 610 if (key.equals(Model.MAIN_KEY)) { 611 v = id; 612 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 613 v = repositoryId; 614 } else { 615 throw new RuntimeException(key); 616 } 617 column.setToPreparedStatement(ps, i, v); 618 if (debugValues != null) { 619 debugValues.add(v); 620 } 621 } 622 if (debugValues != null) { 623 logger.logSQL(sql, debugValues); 624 debugValues.clear(); 625 } 626 ps.execute(); 627 countExecute(); 628 } finally { 629 closeStatement(ps); 630 } 631 } catch (SQLException e) { 632 throw new NuxeoException("Could not insert: " + sql, e); 633 } 634 } 635 636 protected QueryMaker findQueryMaker(String queryType) { 637 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 638 QueryMaker queryMaker; 639 try { 640 queryMaker = klass.newInstance(); 641 } catch (ReflectiveOperationException e) { 642 throw new NuxeoException(e); 643 } 644 if (queryMaker.accepts(queryType)) { 645 return queryMaker; 646 } 647 } 648 return null; 649 } 650 651 protected void prepareUserReadAcls(QueryFilter queryFilter) { 652 String sql = dialect.getPrepareUserReadAclsSql(); 653 Serializable principals = queryFilter.getPrincipals(); 654 if (sql == null || principals == null) { 655 return; 656 } 657 if (!dialect.supportsArrays()) { 658 principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP); 659 } 660 PreparedStatement ps = null; 661 try { 662 ps = connection.prepareStatement(sql); 663 if (logger.isLogEnabled()) { 664 logger.logSQL(sql, Collections.singleton(principals)); 665 } 666 setToPreparedStatement(ps, 1, principals); 667 ps.execute(); 668 countExecute(); 669 } catch (SQLException e) { 670 throw new NuxeoException("Failed to prepare user read acl cache", e); 671 } finally { 672 try { 673 closeStatement(ps); 674 } catch (SQLException e) { 675 log.error(e.getMessage(), e); 676 } 677 } 678 } 679 680 @Override 681 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 682 boolean countTotal) { 683 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 684 } 685 686 @Override 687 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 688 if (dialect.needsPrepareUserReadAcls()) { 689 prepareUserReadAcls(queryFilter); 690 } 691 QueryMaker queryMaker = findQueryMaker(queryType); 692 if (queryMaker == null) { 693 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 694 } 695 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 696 697 if (q == null) { 698 logger.log("Query cannot return anything due to conflicting clauses"); 699 return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0); 700 } 701 long limit = queryFilter.getLimit(); 702 long offset = queryFilter.getOffset(); 703 704 if (logger.isLogEnabled()) { 705 String sql = q.selectInfo.sql; 706 if (limit != 0) { 707 sql += " -- LIMIT " + limit + " OFFSET " + offset; 708 } 709 if (countUpTo != 0) { 710 sql += " -- COUNT TOTAL UP TO " + countUpTo; 711 } 712 logger.logSQL(sql, q.selectParams); 713 } 714 715 String sql = q.selectInfo.sql; 716 717 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 718 // full result set not needed for counting 719 sql = dialect.addPagingClause(sql, limit, offset); 720 limit = 0; 721 offset = 0; 722 } else if (countUpTo > 0 && dialect.supportsPaging()) { 723 // ask one more row 724 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 725 } 726 727 PreparedStatement ps = null; 728 ResultSet rs = null; 729 try { 730 ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 731 int i = 1; 732 for (Serializable object : q.selectParams) { 733 setToPreparedStatement(ps, i++, object); 734 } 735 rs = ps.executeQuery(); 736 countExecute(); 737 738 // limit/offset 739 long totalSize = -1; 740 boolean available; 741 if ((limit == 0) || (offset == 0)) { 742 available = rs.first(); 743 if (!available) { 744 totalSize = 0; 745 } 746 if (limit == 0) { 747 limit = -1; // infinite 748 } 749 } else { 750 available = rs.absolute((int) offset + 1); 751 } 752 753 Column column = q.selectInfo.whatColumns.get(0); 754 List<Serializable> ids = new LinkedList<Serializable>(); 755 int rowNum = 0; 756 while (available && (limit != 0)) { 757 Serializable id = column.getFromResultSet(rs, 1); 758 ids.add(id); 759 rowNum = rs.getRow(); 760 available = rs.next(); 761 limit--; 762 } 763 764 // total size 765 if (countUpTo != 0 && (totalSize == -1)) { 766 if (!available && (rowNum != 0)) { 767 // last row read was the actual last 768 totalSize = rowNum; 769 } else { 770 // available if limit reached with some left 771 // rowNum == 0 if skipped too far 772 rs.last(); 773 totalSize = rs.getRow(); 774 } 775 if (countUpTo > 0 && totalSize > countUpTo) { 776 // the result where truncated we don't know the total size 777 totalSize = -2; 778 } 779 } 780 781 if (logger.isLogEnabled()) { 782 logger.logIds(ids, countUpTo != 0, totalSize); 783 } 784 785 return new PartialList<Serializable>(ids, totalSize); 786 } catch (SQLException e) { 787 throw new NuxeoException("Invalid query: " + query, e); 788 } finally { 789 try { 790 closeStatement(ps, rs); 791 } catch (SQLException e) { 792 log.error("Cannot close connection", e); 793 } 794 } 795 } 796 797 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 798 if (object instanceof Calendar) { 799 Calendar cal = (Calendar) object; 800 ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal); 801 } else if (object instanceof java.sql.Date) { 802 ps.setDate(i, (java.sql.Date) object); 803 } else if (object instanceof Long) { 804 ps.setLong(i, ((Long) object).longValue()); 805 } else if (object instanceof WrappedId) { 806 dialect.setId(ps, i, object.toString()); 807 } else if (object instanceof Object[]) { 808 int jdbcType; 809 if (object instanceof String[]) { 810 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 811 } else if (object instanceof Boolean[]) { 812 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 813 } else if (object instanceof Long[]) { 814 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 815 } else if (object instanceof Double[]) { 816 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 817 } else if (object instanceof java.sql.Date[]) { 818 jdbcType = Types.DATE; 819 } else if (object instanceof java.sql.Clob[]) { 820 jdbcType = Types.CLOB; 821 } else if (object instanceof Calendar[]) { 822 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 823 object = dialect.getTimestampFromCalendar((Calendar) object); 824 } else if (object instanceof Integer[]) { 825 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 826 } else { 827 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 828 } 829 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 830 ps.setArray(i, array); 831 } else { 832 ps.setObject(i, object); 833 } 834 return i; 835 } 836 837 // queryFilter used for principals and permissions 838 @Override 839 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 840 Object... params) { 841 if (dialect.needsPrepareUserReadAcls()) { 842 prepareUserReadAcls(queryFilter); 843 } 844 QueryMaker queryMaker = findQueryMaker(queryType); 845 if (queryMaker == null) { 846 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 847 } 848 try { 849 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 850 } catch (SQLException e) { 851 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 852 } 853 } 854 855 @Override 856 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 857 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 858 if (select == null) { 859 return getAncestorsIdsIterative(ids); 860 } 861 Serializable whereIds = newIdArray(ids); 862 Set<Serializable> res = new HashSet<Serializable>(); 863 PreparedStatement ps = null; 864 ResultSet rs = null; 865 try { 866 if (logger.isLogEnabled()) { 867 logger.logSQL(select.sql, Collections.singleton(whereIds)); 868 } 869 Column what = select.whatColumns.get(0); 870 ps = connection.prepareStatement(select.sql); 871 setToPreparedStatementIdArray(ps, 1, whereIds); 872 rs = ps.executeQuery(); 873 countExecute(); 874 List<Serializable> debugIds = null; 875 if (logger.isLogEnabled()) { 876 debugIds = new LinkedList<Serializable>(); 877 } 878 while (rs.next()) { 879 if (dialect.supportsArraysReturnInsteadOfRows()) { 880 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 881 for (Serializable id : resultIds) { 882 if (id != null) { 883 res.add(id); 884 if (logger.isLogEnabled()) { 885 debugIds.add(id); 886 } 887 } 888 } 889 } else { 890 Serializable id = what.getFromResultSet(rs, 1); 891 if (id != null) { 892 res.add(id); 893 if (logger.isLogEnabled()) { 894 debugIds.add(id); 895 } 896 } 897 } 898 } 899 if (logger.isLogEnabled()) { 900 logger.logIds(debugIds, false, 0); 901 } 902 return res; 903 } catch (SQLException e) { 904 throw new NuxeoException("Failed to get ancestors ids", e); 905 } finally { 906 try { 907 closeStatement(ps, rs); 908 } catch (SQLException e) { 909 log.error(e.getMessage(), e); 910 } 911 } 912 } 913 914 /** 915 * Uses iterative parentid selection. 916 */ 917 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 918 PreparedStatement ps = null; 919 ResultSet rs = null; 920 try { 921 LinkedList<Serializable> todo = new LinkedList<Serializable>(ids); 922 Set<Serializable> done = new HashSet<Serializable>(); 923 Set<Serializable> res = new HashSet<Serializable>(); 924 while (!todo.isEmpty()) { 925 done.addAll(todo); 926 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 927 if (logger.isLogEnabled()) { 928 logger.logSQL(select.sql, todo); 929 } 930 Column what = select.whatColumns.get(0); 931 Column where = select.whereColumns.get(0); 932 ps = connection.prepareStatement(select.sql); 933 int i = 1; 934 for (Serializable id : todo) { 935 where.setToPreparedStatement(ps, i++, id); 936 } 937 rs = ps.executeQuery(); 938 countExecute(); 939 todo = new LinkedList<Serializable>(); 940 List<Serializable> debugIds = null; 941 if (logger.isLogEnabled()) { 942 debugIds = new LinkedList<Serializable>(); 943 } 944 while (rs.next()) { 945 Serializable id = what.getFromResultSet(rs, 1); 946 if (id != null) { 947 res.add(id); 948 if (!done.contains(id)) { 949 todo.add(id); 950 } 951 if (logger.isLogEnabled()) { 952 debugIds.add(id); 953 } 954 } 955 } 956 if (logger.isLogEnabled()) { 957 logger.logIds(debugIds, false, 0); 958 } 959 rs.close(); 960 ps.close(); 961 } 962 return res; 963 } catch (SQLException e) { 964 throw new NuxeoException("Failed to get ancestors ids", e); 965 } finally { 966 try { 967 closeStatement(ps, rs); 968 } catch (SQLException e) { 969 log.error(e.getMessage(), e); 970 } 971 } 972 } 973 974 @Override 975 public void updateReadAcls() { 976 if (!dialect.supportsReadAcl()) { 977 return; 978 } 979 if (log.isDebugEnabled()) { 980 log.debug("updateReadAcls: updating"); 981 } 982 Statement st = null; 983 try { 984 st = connection.createStatement(); 985 String sql = dialect.getUpdateReadAclsSql(); 986 if (logger.isLogEnabled()) { 987 logger.log(sql); 988 } 989 st.execute(sql); 990 countExecute(); 991 } catch (SQLException e) { 992 throw new NuxeoException("Failed to update read acls", e); 993 } finally { 994 try { 995 closeStatement(st); 996 } catch (SQLException e) { 997 log.error(e.getMessage(), e); 998 } 999 } 1000 if (log.isDebugEnabled()) { 1001 log.debug("updateReadAcls: done."); 1002 } 1003 } 1004 1005 @Override 1006 public void rebuildReadAcls() { 1007 if (!dialect.supportsReadAcl()) { 1008 return; 1009 } 1010 log.debug("rebuildReadAcls: rebuilding ..."); 1011 Statement st = null; 1012 try { 1013 st = connection.createStatement(); 1014 String sql = dialect.getRebuildReadAclsSql(); 1015 logger.log(sql); 1016 st.execute(sql); 1017 countExecute(); 1018 } catch (SQLException e) { 1019 throw new NuxeoException("Failed to rebuild read acls", e); 1020 } finally { 1021 try { 1022 closeStatement(st); 1023 } catch (SQLException e) { 1024 log.error(e.getMessage(), e); 1025 } 1026 } 1027 log.debug("rebuildReadAcls: done."); 1028 } 1029 1030 /* 1031 * ----- Locking ----- 1032 */ 1033 1034 protected Connection connection(boolean autocommit) { 1035 try { 1036 connection.setAutoCommit(autocommit); 1037 } catch (SQLException e) { 1038 throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e); 1039 } 1040 return connection; 1041 } 1042 1043 /** 1044 * Calls the callable, inside a transaction if in cluster mode. 1045 * <p> 1046 * Called under {@link #serializationLock}. 1047 */ 1048 protected Lock callInTransaction(LockCallable callable, boolean tx) { 1049 boolean ok = false; 1050 try { 1051 if (log.isDebugEnabled()) { 1052 log.debug("callInTransaction setAutoCommit " + !tx); 1053 } 1054 connection.setAutoCommit(!tx); 1055 } catch (SQLException e) { 1056 throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e); 1057 } 1058 try { 1059 Lock result = callable.call(); 1060 ok = true; 1061 return result; 1062 } finally { 1063 if (tx) { 1064 try { 1065 try { 1066 if (ok) { 1067 if (log.isDebugEnabled()) { 1068 log.debug("callInTransaction commit"); 1069 } 1070 connection.commit(); 1071 } else { 1072 if (log.isDebugEnabled()) { 1073 log.debug("callInTransaction rollback"); 1074 } 1075 connection.rollback(); 1076 } 1077 } finally { 1078 // restore autoCommit=true 1079 if (log.isDebugEnabled()) { 1080 log.debug("callInTransaction restoring autoCommit=true"); 1081 } 1082 connection.setAutoCommit(true); 1083 } 1084 } catch (SQLException e) { 1085 throw new NuxeoException(e); 1086 } 1087 } 1088 } 1089 } 1090 1091 public interface LockCallable extends Callable<Lock> { 1092 @Override 1093 public Lock call(); 1094 } 1095 1096 @Override 1097 public Lock getLock(Serializable id) { 1098 if (log.isDebugEnabled()) { 1099 try { 1100 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1101 } catch (SQLException e) { 1102 throw new RuntimeException(e); 1103 } 1104 } 1105 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1106 Row row = readSimpleRow(rowId); 1107 return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY), 1108 (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1109 } 1110 1111 @Override 1112 public Lock setLock(final Serializable id, final Lock lock) { 1113 if (log.isDebugEnabled()) { 1114 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1115 } 1116 SetLock call = new SetLock(id, lock); 1117 return callInTransaction(call, clusteringEnabled); 1118 } 1119 1120 protected class SetLock implements LockCallable { 1121 protected final Serializable id; 1122 1123 protected final Lock lock; 1124 1125 protected SetLock(Serializable id, Lock lock) { 1126 super(); 1127 this.id = id; 1128 this.lock = lock; 1129 } 1130 1131 @Override 1132 public Lock call() { 1133 Lock oldLock = getLock(id); 1134 if (oldLock == null) { 1135 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1136 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1137 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1138 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1139 } 1140 return oldLock; 1141 } 1142 } 1143 1144 @Override 1145 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1146 if (log.isDebugEnabled()) { 1147 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1148 } 1149 RemoveLock call = new RemoveLock(id, owner, force); 1150 return callInTransaction(call, !force); 1151 } 1152 1153 protected class RemoveLock implements LockCallable { 1154 protected final Serializable id; 1155 1156 protected final String owner; 1157 1158 protected final boolean force; 1159 1160 protected RemoveLock(Serializable id, String owner, boolean force) { 1161 super(); 1162 this.id = id; 1163 this.owner = owner; 1164 this.force = force; 1165 } 1166 1167 @Override 1168 public Lock call() { 1169 Lock oldLock = force ? null : getLock(id); 1170 if (!force && owner != null) { 1171 if (oldLock == null) { 1172 // not locked, nothing to do 1173 return null; 1174 } 1175 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1176 // existing mismatched lock, flag failure 1177 return new Lock(oldLock, true); 1178 } 1179 } 1180 if (force || oldLock != null) { 1181 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1182 } 1183 return oldLock; 1184 } 1185 } 1186 1187 @Override 1188 public void markReferencedBinaries() { 1189 log.debug("Starting binaries GC mark"); 1190 Statement st = null; 1191 ResultSet rs = null; 1192 BlobManager blobManager = Framework.getService(BlobManager.class); 1193 String repositoryName = getRepositoryName(); 1194 try { 1195 st = connection.createStatement(); 1196 int i = -1; 1197 for (String sql : sqlInfo.getBinariesSql) { 1198 i++; 1199 Column col = sqlInfo.getBinariesColumns.get(i); 1200 if (logger.isLogEnabled()) { 1201 logger.log(sql); 1202 } 1203 rs = st.executeQuery(sql); 1204 countExecute(); 1205 int n = 0; 1206 while (rs.next()) { 1207 n++; 1208 String key = (String) col.getFromResultSet(rs, 1); 1209 if (key != null) { 1210 blobManager.markReferencedBinary(key, repositoryName); 1211 } 1212 } 1213 if (logger.isLogEnabled()) { 1214 logger.logCount(n); 1215 } 1216 rs.close(); 1217 } 1218 } catch (SQLException e) { 1219 throw new RuntimeException("Failed to mark binaries for gC", e); 1220 } finally { 1221 try { 1222 closeStatement(st, rs); 1223 } catch (SQLException e) { 1224 log.error(e.getMessage(), e); 1225 } 1226 } 1227 log.debug("End of binaries GC mark"); 1228 } 1229 1230 /* 1231 * ----- XAResource ----- 1232 */ 1233 1234 protected static String systemToString(Object o) { 1235 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1236 } 1237 1238 @Override 1239 public void start(Xid xid, int flags) throws XAException { 1240 try { 1241 xaresource.start(xid, flags); 1242 if (logger.isLogEnabled()) { 1243 logger.log("XA start on " + systemToString(xid)); 1244 } 1245 } catch (NuxeoException e) { 1246 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1247 } catch (XAException e) { 1248 logger.error("XA start error on " + systemToString(xid), e); 1249 throw e; 1250 } 1251 } 1252 1253 @Override 1254 public void end(Xid xid, int flags) throws XAException { 1255 try { 1256 xaresource.end(xid, flags); 1257 if (logger.isLogEnabled()) { 1258 logger.log("XA end on " + systemToString(xid)); 1259 } 1260 } catch (NullPointerException e) { 1261 // H2 when no active transaction 1262 logger.error("XA end error on " + systemToString(xid), e); 1263 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1264 } catch (XAException e) { 1265 if (flags != XAResource.TMFAIL) { 1266 logger.error("XA end error on " + systemToString(xid), e); 1267 } 1268 throw e; 1269 } 1270 } 1271 1272 @Override 1273 public int prepare(Xid xid) throws XAException { 1274 try { 1275 return xaresource.prepare(xid); 1276 } catch (XAException e) { 1277 logger.error("XA prepare error on " + systemToString(xid), e); 1278 throw e; 1279 } 1280 } 1281 1282 @Override 1283 public void commit(Xid xid, boolean onePhase) throws XAException { 1284 try { 1285 xaresource.commit(xid, onePhase); 1286 } catch (XAException e) { 1287 logger.error("XA commit error on " + systemToString(xid), e); 1288 throw e; 1289 } 1290 } 1291 1292 // rollback interacts with caches so is in RowMapper 1293 1294 @Override 1295 public void forget(Xid xid) throws XAException { 1296 xaresource.forget(xid); 1297 } 1298 1299 @Override 1300 public Xid[] recover(int flag) throws XAException { 1301 return xaresource.recover(flag); 1302 } 1303 1304 @Override 1305 public boolean setTransactionTimeout(int seconds) throws XAException { 1306 return xaresource.setTransactionTimeout(seconds); 1307 } 1308 1309 @Override 1310 public int getTransactionTimeout() throws XAException { 1311 return xaresource.getTransactionTimeout(); 1312 } 1313 1314 @Override 1315 public boolean isSameRM(XAResource xares) throws XAException { 1316 throw new UnsupportedOperationException(); 1317 } 1318 1319 @Override 1320 public boolean isConnected() { 1321 return connection != null; 1322 } 1323 1324 @Override 1325 public void connect() { 1326 openConnections(); 1327 } 1328 1329 @Override 1330 public void disconnect() { 1331 closeConnections(); 1332 } 1333 1334}