001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023 024import java.io.File; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.PrintStream; 029import java.io.Serializable; 030import java.security.MessageDigest; 031import java.security.NoSuchAlgorithmException; 032import java.sql.Array; 033import java.sql.DatabaseMetaData; 034import java.sql.PreparedStatement; 035import java.sql.ResultSet; 036import java.sql.SQLDataException; 037import java.sql.SQLException; 038import java.sql.Statement; 039import java.sql.Types; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Calendar; 043import java.util.Collection; 044import java.util.Collections; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.Map; 050import java.util.Map.Entry; 051import java.util.Set; 052import java.util.UUID; 053import java.util.concurrent.ConcurrentHashMap; 054 055import javax.transaction.xa.XAException; 056import javax.transaction.xa.XAResource; 057import javax.transaction.xa.Xid; 058 059import org.apache.commons.logging.Log; 060import org.apache.commons.logging.LogFactory; 061import org.nuxeo.common.Environment; 062import org.nuxeo.ecm.core.api.IterableQueryResult; 063import org.nuxeo.ecm.core.api.Lock; 064import org.nuxeo.ecm.core.api.NuxeoException; 065import org.nuxeo.ecm.core.api.PartialList; 066import org.nuxeo.ecm.core.api.ScrollResult; 067import org.nuxeo.ecm.core.api.ScrollResultImpl; 068import org.nuxeo.ecm.core.blob.BlobManager; 069import org.nuxeo.ecm.core.model.LockManager; 070import org.nuxeo.ecm.core.query.QueryFilter; 071import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 072import org.nuxeo.ecm.core.storage.sql.ColumnType; 073import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 074import org.nuxeo.ecm.core.storage.sql.Invalidations; 075import org.nuxeo.ecm.core.storage.sql.Mapper; 076import org.nuxeo.ecm.core.storage.sql.Model; 077import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 078import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 079import org.nuxeo.ecm.core.storage.sql.Row; 080import org.nuxeo.ecm.core.storage.sql.RowId; 081import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 082import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector; 089import org.nuxeo.runtime.api.Framework; 090 091/** 092 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 093 * computes statements. 094 * <p> 095 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 096 * statements recorded in the {@link SQLInfo}. 097 */ 098public class JDBCMapper extends JDBCRowMapper implements Mapper { 099 100 private static final Log log = LogFactory.getLog(JDBCMapper.class); 101 102 public static Map<String, Serializable> testProps = new HashMap<>(); 103 104 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 105 106 public static final String TEST_UPGRADE = "testUpgrade"; 107 108 // property in sql.txt file 109 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 110 111 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 112 113 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 114 115 protected TableUpgrader tableUpgrader; 116 117 private final QueryMakerService queryMakerService; 118 119 private final PathResolver pathResolver; 120 121 private final RepositoryImpl repository; 122 123 protected boolean clusteringEnabled; 124 125 protected static final String NOSCROLL_ID = "noscroll"; 126 127 /** 128 * Creates a new Mapper. 129 * 130 * @param model the model 131 * @param pathResolver the path resolver (used for startswith queries) 132 * @param sqlInfo the sql info 133 * @param clusterInvalidator the cluster invalidator 134 * @param noSharing whether to use no-sharing mode for the connection 135 * @param repository the repository 136 */ 137 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator, 138 RepositoryImpl repository) { 139 super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator()); 140 this.pathResolver = pathResolver; 141 this.repository = repository; 142 clusteringEnabled = clusterInvalidator != null; 143 queryMakerService = Framework.getService(QueryMakerService.class); 144 145 tableUpgrader = new TableUpgrader(this); 146 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 147 TEST_UPGRADE_VERSIONS); 148 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 149 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 150 151 } 152 153 @Override 154 public int getTableSize(String tableName) { 155 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 156 } 157 158 /* 159 * ----- Root ----- 160 */ 161 162 @Override 163 public void createDatabase(String ddlMode) { 164 // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it 165 try { 166 if (connection.getAutoCommit() == false) { 167 throw new NuxeoException("connection should not run in transactional mode for DDL operations"); 168 } 169 createTables(ddlMode); 170 } catch (SQLException e) { 171 throw new NuxeoException(e); 172 } 173 } 174 175 protected String getTableName(String origName) { 176 177 if (dialect instanceof DialectOracle) { 178 if (origName.length() > 30) { 179 180 StringBuilder sb = new StringBuilder(origName.length()); 181 182 try { 183 MessageDigest digest = MessageDigest.getInstance("MD5"); 184 sb.append(origName.substring(0, 15)); 185 sb.append('_'); 186 187 digest.update(origName.getBytes()); 188 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 189 190 return sb.toString(); 191 192 } catch (NoSuchAlgorithmException e) { 193 throw new RuntimeException("Error", e); 194 } 195 } 196 } 197 198 return origName; 199 } 200 201 protected void createTables(String ddlMode) throws SQLException { 202 ListCollector ddlCollector = new ListCollector(); 203 204 sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category 205 sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector); 206 sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector); 207 if (testProps.containsKey(TEST_UPGRADE)) { 208 // create "old" tables 209 sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect 210 } 211 212 String schemaName = dialect.getConnectionSchema(connection); 213 DatabaseMetaData metadata = connection.getMetaData(); 214 Set<String> tableNames = findTableNames(metadata, schemaName); 215 Database database = sqlInfo.getDatabase(); 216 Map<String, List<Column>> added = new HashMap<>(); 217 218 for (Table table : database.getTables()) { 219 String tableName = getTableName(table.getPhysicalName()); 220 if (!tableNames.contains(tableName.toUpperCase())) { 221 222 /* 223 * Create missing table. 224 */ 225 226 ddlCollector.add(table.getCreateSql()); 227 ddlCollector.addAll(table.getPostCreateSqls(model)); 228 229 added.put(table.getKey(), null); // null = table created 230 231 sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE); 232 233 } else { 234 235 /* 236 * Get existing columns. 237 */ 238 239 Map<String, Integer> columnTypes = new HashMap<>(); 240 Map<String, String> columnTypeNames = new HashMap<>(); 241 Map<String, Integer> columnTypeSizes = new HashMap<>(); 242 try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) { 243 while (rs.next()) { 244 String schema = rs.getString("TABLE_SCHEM"); 245 if (schema != null) { // null for MySQL, doh! 246 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 247 // H2 returns some system tables (locks) 248 continue; 249 } 250 } 251 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 252 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 253 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 254 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 255 } 256 } 257 258 /* 259 * Update types and create missing columns. 260 */ 261 262 List<Column> addedColumns = new LinkedList<>(); 263 for (Column column : table.getColumns()) { 264 String upperName = column.getPhysicalName().toUpperCase(); 265 Integer type = columnTypes.remove(upperName); 266 if (type == null) { 267 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 268 ddlCollector.add(table.getAddColumnSql(column)); 269 ddlCollector.addAll(table.getPostAddSqls(column, model)); 270 addedColumns.add(column); 271 } else { 272 int expected = column.getJdbcType(); 273 int actual = type.intValue(); 274 String actualName = columnTypeNames.get(upperName); 275 Integer actualSize = columnTypeSizes.get(upperName); 276 if (!column.setJdbcType(actual, actualName, actualSize.intValue())) { 277 log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)", 278 column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, 279 actualSize)); 280 } 281 } 282 } 283 for (String col : dialect.getIgnoredColumns(table)) { 284 columnTypes.remove(col.toUpperCase()); 285 } 286 if (!columnTypes.isEmpty()) { 287 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 288 + String.join(", ", columnTypes.keySet())); 289 } 290 if (!addedColumns.isEmpty()) { 291 if (added.containsKey(table.getKey())) { 292 throw new AssertionError(); 293 } 294 added.put(table.getKey(), addedColumns); 295 } 296 } 297 } 298 299 if (testProps.containsKey(TEST_UPGRADE)) { 300 // create "old" content in tables 301 sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector); 302 } 303 304 // run upgrade for each table if added columns or test 305 for (Entry<String, List<Column>> en : added.entrySet()) { 306 List<Column> addedColumns = en.getValue(); 307 String tableKey = en.getKey(); 308 upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector); 309 } 310 311 sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector); 312 sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector); 313 314 // aclr_permission check for PostgreSQL 315 dialect.performAdditionalStatements(connection); 316 317 /* 318 * Execute all the collected DDL, or dump it if requested, depending on ddlMode 319 */ 320 321 // ddlMode may be: 322 // ignore (not treated here, nothing done) 323 // dump (implies execute) 324 // dump,execute 325 // dump,ignore (no execute) 326 // execute 327 // abort (implies dump) 328 // compat can be used instead of execute to always recreate stored procedures 329 330 List<String> ddl = ddlCollector.getStrings(); 331 boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE); 332 boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP); 333 boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT); 334 if (dump || abort) { 335 336 /* 337 * Dump DDL if not empty. 338 */ 339 340 if (!ddl.isEmpty()) { 341 File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql"); 342 try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) { 343 for (String sql : dialect.getDumpStart()) { 344 ps.println(sql); 345 } 346 for (String sql : ddl) { 347 sql = sql.trim(); 348 if (sql.endsWith(";")) { 349 sql = sql.substring(0, sql.length() - 1); 350 } 351 ps.println(dialect.getSQLForDump(sql)); 352 } 353 for (String sql : dialect.getDumpStop()) { 354 ps.println(sql); 355 } 356 } catch (IOException e) { 357 throw new NuxeoException(e); 358 } 359 360 /* 361 * Abort if requested. 362 */ 363 364 if (abort) { 365 log.error("Dumped DDL to: " + dumpFile); 366 throw new NuxeoException("Database initialization failed for: " + repository.getName() 367 + ", DDL must be executed: " + dumpFile); 368 } 369 } 370 } 371 if (!ignore) { 372 373 /* 374 * Execute DDL. 375 */ 376 377 try (Statement st = connection.createStatement()) { 378 for (String sql : ddl) { 379 logger.log(sql.replace("\n", "\n ")); // indented 380 try { 381 st.execute(sql); 382 } catch (SQLException e) { 383 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 384 } 385 countExecute(); 386 } 387 } 388 389 /* 390 * Execute post-DDL stuff. 391 */ 392 393 try (Statement st = connection.createStatement()) { 394 for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) { 395 logger.log(sql.replace("\n", "\n ")); // indented 396 try { 397 st.execute(sql); 398 } catch (SQLException e) { 399 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 400 } 401 countExecute(); 402 } 403 } 404 } 405 } 406 407 protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector) 408 throws SQLException { 409 tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector); 410 } 411 412 /** Finds uppercase table names. */ 413 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 414 Set<String> tableNames = new HashSet<>(); 415 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 416 while (rs.next()) { 417 String tableName = rs.getString("TABLE_NAME"); 418 tableNames.add(tableName.toUpperCase()); 419 } 420 rs.close(); 421 return tableNames; 422 } 423 424 @Override 425 public int getClusterNodeIdType() { 426 return sqlInfo.getClusterNodeIdType(); 427 } 428 429 @Override 430 public void createClusterNode(Serializable nodeId) { 431 Calendar now = Calendar.getInstance(); 432 String sql = sqlInfo.getCreateClusterNodeSql(); 433 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 434 try (PreparedStatement ps = connection.prepareStatement(sql)) { 435 if (logger.isLogEnabled()) { 436 logger.logSQL(sql, Arrays.asList(nodeId, now)); 437 } 438 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 439 columns.get(1).setToPreparedStatement(ps, 2, now); 440 ps.execute(); 441 442 } catch (SQLException e) { 443 throw new NuxeoException(e); 444 } 445 } 446 447 @Override 448 public void removeClusterNode(Serializable nodeId) { 449 // delete from cluster_nodes 450 String sql = sqlInfo.getDeleteClusterNodeSql(); 451 Column column = sqlInfo.getDeleteClusterNodeColumn(); 452 try (PreparedStatement ps = connection.prepareStatement(sql)) { 453 if (logger.isLogEnabled()) { 454 logger.logSQL(sql, Collections.singletonList(nodeId)); 455 } 456 column.setToPreparedStatement(ps, 1, nodeId); 457 ps.execute(); 458 // delete un-processed invals from cluster_invals 459 deleteClusterInvals(nodeId); 460 } catch (SQLException e) { 461 throw new NuxeoException(e); 462 } 463 } 464 465 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 466 String sql = sqlInfo.getDeleteClusterInvalsSql(); 467 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 468 try (PreparedStatement ps = connection.prepareStatement(sql)) { 469 if (logger.isLogEnabled()) { 470 logger.logSQL(sql, Collections.singletonList(nodeId)); 471 } 472 column.setToPreparedStatement(ps, 1, nodeId); 473 int n = ps.executeUpdate(); 474 countExecute(); 475 if (logger.isLogEnabled()) { 476 logger.logCount(n); 477 } 478 } 479 } 480 481 @Override 482 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 483 String sql = dialect.getClusterInsertInvalidations(); 484 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 485 try (PreparedStatement ps = connection.prepareStatement(sql)) { 486 int kind = Invalidations.MODIFIED; 487 while (true) { 488 Set<RowId> rowIds = invalidations.getKindSet(kind); 489 490 // reorganize by id 491 Map<Serializable, Set<String>> res = new HashMap<>(); 492 for (RowId rowId : rowIds) { 493 Set<String> tableNames = res.get(rowId.id); 494 if (tableNames == null) { 495 res.put(rowId.id, tableNames = new HashSet<>()); 496 } 497 tableNames.add(rowId.tableName); 498 } 499 500 // do inserts 501 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 502 Serializable id = en.getKey(); 503 String fragments = join(en.getValue(), ' '); 504 if (logger.isLogEnabled()) { 505 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 506 } 507 Serializable frags; 508 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 509 frags = fragments.split(" "); 510 } else { 511 frags = fragments; 512 } 513 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 514 columns.get(1).setToPreparedStatement(ps, 2, id); 515 columns.get(2).setToPreparedStatement(ps, 3, frags); 516 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 517 ps.execute(); 518 countExecute(); 519 } 520 if (kind == Invalidations.MODIFIED) { 521 kind = Invalidations.DELETED; 522 } else { 523 break; 524 } 525 } 526 } catch (SQLException e) { 527 throw new NuxeoException("Could not invalidate", e); 528 } 529 } 530 531 // join that works on a set 532 protected static String join(Collection<String> strings, char sep) { 533 if (strings.isEmpty()) { 534 throw new RuntimeException(); 535 } 536 if (strings.size() == 1) { 537 return strings.iterator().next(); 538 } 539 int size = 0; 540 for (String word : strings) { 541 size += word.length() + 1; 542 } 543 StringBuilder buf = new StringBuilder(size); 544 for (String word : strings) { 545 buf.append(word); 546 buf.append(sep); 547 } 548 buf.setLength(size - 1); 549 return buf.toString(); 550 } 551 552 @Override 553 public Invalidations getClusterInvalidations(Serializable nodeId) { 554 Invalidations invalidations = new Invalidations(); 555 String sql = dialect.getClusterGetInvalidations(); 556 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 557 if (logger.isLogEnabled()) { 558 logger.logSQL(sql, Collections.singletonList(nodeId)); 559 } 560 try (PreparedStatement ps = connection.prepareStatement(sql)) { 561 setToPreparedStatement(ps, 1, nodeId); 562 try (ResultSet rs = ps.executeQuery()) { 563 countExecute(); 564 while (rs.next()) { 565 // first column ignored, it's the node id 566 Serializable id = columns.get(1).getFromResultSet(rs, 1); 567 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 568 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 569 String[] fragments; 570 if (dialect.supportsArrays() && frags instanceof String[]) { 571 fragments = (String[]) frags; 572 } else { 573 fragments = ((String) frags).split(" "); 574 } 575 invalidations.add(id, fragments, kind); 576 } 577 } 578 if (logger.isLogEnabled()) { 579 // logCount(n); 580 logger.log(" -> " + invalidations); 581 } 582 if (dialect.isClusteringDeleteNeeded()) { 583 deleteClusterInvals(nodeId); 584 } 585 return invalidations; 586 } catch (SQLException e) { 587 throw new NuxeoException("Could not invalidate", e); 588 } 589 } 590 591 @Override 592 public Serializable getRootId(String repositoryId) { 593 String sql = sqlInfo.getSelectRootIdSql(); 594 if (logger.isLogEnabled()) { 595 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 596 } 597 try (PreparedStatement ps = connection.prepareStatement(sql)) { 598 ps.setString(1, repositoryId); 599 try (ResultSet rs = ps.executeQuery()) { 600 countExecute(); 601 if (!rs.next()) { 602 if (logger.isLogEnabled()) { 603 logger.log(" -> (none)"); 604 } 605 return null; 606 } 607 Column column = sqlInfo.getSelectRootIdWhatColumn(); 608 Serializable id = column.getFromResultSet(rs, 1); 609 if (logger.isLogEnabled()) { 610 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 611 } 612 // check that we didn't get several rows 613 if (rs.next()) { 614 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 615 } 616 return id; 617 } 618 } catch (SQLException e) { 619 throw new NuxeoException("Could not select: " + sql, e); 620 } 621 } 622 623 @Override 624 public void setRootId(Serializable repositoryId, Serializable id) { 625 String sql = sqlInfo.getInsertRootIdSql(); 626 try (PreparedStatement ps = connection.prepareStatement(sql)) { 627 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 628 List<Serializable> debugValues = null; 629 if (logger.isLogEnabled()) { 630 debugValues = new ArrayList<>(2); 631 } 632 int i = 0; 633 for (Column column : columns) { 634 i++; 635 String key = column.getKey(); 636 Serializable v; 637 if (key.equals(Model.MAIN_KEY)) { 638 v = id; 639 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 640 v = repositoryId; 641 } else { 642 throw new RuntimeException(key); 643 } 644 column.setToPreparedStatement(ps, i, v); 645 if (debugValues != null) { 646 debugValues.add(v); 647 } 648 } 649 if (debugValues != null) { 650 logger.logSQL(sql, debugValues); 651 debugValues.clear(); 652 } 653 ps.execute(); 654 countExecute(); 655 } catch (SQLException e) { 656 throw new NuxeoException("Could not insert: " + sql, e); 657 } 658 } 659 660 protected QueryMaker findQueryMaker(String queryType) { 661 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 662 QueryMaker queryMaker; 663 try { 664 queryMaker = klass.newInstance(); 665 } catch (ReflectiveOperationException e) { 666 throw new NuxeoException(e); 667 } 668 if (queryMaker.accepts(queryType)) { 669 return queryMaker; 670 } 671 } 672 return null; 673 } 674 675 protected void prepareUserReadAcls(QueryFilter queryFilter) { 676 String sql = dialect.getPrepareUserReadAclsSql(); 677 Serializable principals = queryFilter.getPrincipals(); 678 if (sql == null || principals == null) { 679 return; 680 } 681 if (!dialect.supportsArrays()) { 682 principals = String.join(Dialect.ARRAY_SEP, (String[]) principals); 683 } 684 try (PreparedStatement ps = connection.prepareStatement(sql)) { 685 if (logger.isLogEnabled()) { 686 logger.logSQL(sql, Collections.singleton(principals)); 687 } 688 setToPreparedStatement(ps, 1, principals); 689 ps.execute(); 690 countExecute(); 691 } catch (SQLException e) { 692 throw new NuxeoException("Failed to prepare user read acl cache", e); 693 } 694 } 695 696 @Override 697 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 698 boolean countTotal) { 699 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 700 } 701 702 @Override 703 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 704 if (dialect.needsPrepareUserReadAcls()) { 705 prepareUserReadAcls(queryFilter); 706 } 707 QueryMaker queryMaker = findQueryMaker(queryType); 708 if (queryMaker == null) { 709 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 710 } 711 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 712 713 if (q == null) { 714 logger.log("Query cannot return anything due to conflicting clauses"); 715 return new PartialList<>(Collections.<Serializable> emptyList(), 0); 716 } 717 long limit = queryFilter.getLimit(); 718 long offset = queryFilter.getOffset(); 719 720 if (logger.isLogEnabled()) { 721 String sql = q.selectInfo.sql; 722 if (limit != 0) { 723 sql += " -- LIMIT " + limit + " OFFSET " + offset; 724 } 725 if (countUpTo != 0) { 726 sql += " -- COUNT TOTAL UP TO " + countUpTo; 727 } 728 logger.logSQL(sql, q.selectParams); 729 } 730 731 String sql = q.selectInfo.sql; 732 733 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 734 // full result set not needed for counting 735 sql = dialect.addPagingClause(sql, limit, offset); 736 limit = 0; 737 offset = 0; 738 } else if (countUpTo > 0 && dialect.supportsPaging()) { 739 // ask one more row 740 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 741 } 742 743 try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, 744 ResultSet.CONCUR_READ_ONLY)) { 745 int i = 1; 746 for (Serializable object : q.selectParams) { 747 setToPreparedStatement(ps, i++, object); 748 } 749 try (ResultSet rs = ps.executeQuery()) { 750 countExecute(); 751 752 // limit/offset 753 long totalSize = -1; 754 boolean available; 755 if ((limit == 0) || (offset == 0)) { 756 available = rs.first(); 757 if (!available) { 758 totalSize = 0; 759 } 760 if (limit == 0) { 761 limit = -1; // infinite 762 } 763 } else { 764 available = rs.absolute((int) offset + 1); 765 } 766 767 Column column = q.selectInfo.whatColumns.get(0); 768 List<Serializable> ids = new LinkedList<>(); 769 int rowNum = 0; 770 while (available && (limit != 0)) { 771 Serializable id; 772 try { 773 id = column.getFromResultSet(rs, 1); 774 } catch (SQLDataException e) { 775 // actually no data available, MariaDB Connector/J lied, stop now 776 available = false; 777 break; 778 } 779 ids.add(id); 780 rowNum = rs.getRow(); 781 available = rs.next(); 782 limit--; 783 } 784 785 // total size 786 if (countUpTo != 0 && (totalSize == -1)) { 787 if (!available && (rowNum != 0)) { 788 // last row read was the actual last 789 totalSize = rowNum; 790 } else { 791 // available if limit reached with some left 792 // rowNum == 0 if skipped too far 793 rs.last(); 794 totalSize = rs.getRow(); 795 } 796 if (countUpTo > 0 && totalSize > countUpTo) { 797 // the result where truncated we don't know the total size 798 totalSize = -2; 799 } 800 } 801 802 if (logger.isLogEnabled()) { 803 logger.logIds(ids, countUpTo != 0, totalSize); 804 } 805 806 return new PartialList<>(ids, totalSize); 807 } 808 } catch (SQLException e) { 809 throw new NuxeoException("Invalid query: " + query, e); 810 } 811 } 812 813 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 814 if (object instanceof Calendar) { 815 Calendar cal = (Calendar) object; 816 ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal); 817 } else if (object instanceof java.sql.Date) { 818 ps.setDate(i, (java.sql.Date) object); 819 } else if (object instanceof Long) { 820 ps.setLong(i, ((Long) object).longValue()); 821 } else if (object instanceof WrappedId) { 822 dialect.setId(ps, i, object.toString()); 823 } else if (object instanceof Object[]) { 824 int jdbcType; 825 if (object instanceof String[]) { 826 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 827 } else if (object instanceof Boolean[]) { 828 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 829 } else if (object instanceof Long[]) { 830 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 831 } else if (object instanceof Double[]) { 832 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 833 } else if (object instanceof java.sql.Date[]) { 834 jdbcType = Types.DATE; 835 } else if (object instanceof java.sql.Clob[]) { 836 jdbcType = Types.CLOB; 837 } else if (object instanceof Calendar[]) { 838 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 839 object = dialect.getTimestampFromCalendar((Calendar) object); 840 } else if (object instanceof Integer[]) { 841 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 842 } else { 843 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 844 } 845 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 846 ps.setArray(i, array); 847 } else { 848 ps.setObject(i, object); 849 } 850 return i; 851 } 852 853 // queryFilter used for principals and permissions 854 @Override 855 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 856 boolean distinctDocuments, Object... params) { 857 if (dialect.needsPrepareUserReadAcls()) { 858 prepareUserReadAcls(queryFilter); 859 } 860 QueryMaker queryMaker = findQueryMaker(queryType); 861 if (queryMaker == null) { 862 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 863 } 864 if (distinctDocuments) { 865 String q = query.toLowerCase(); 866 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 867 query = "SELECT DISTINCT " + query.substring("SELECT ".length()); 868 } 869 } 870 try { 871 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 872 } catch (SQLException e) { 873 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 874 } 875 } 876 877 @Override 878 public ScrollResult scroll(String query, int batchSize, int keepAliveSeconds) { 879 if (!dialect.supportsScroll()) { 880 return defaultScroll(query); 881 } 882 checkForTimedoutScroll(); 883 return scrollSearch(query, batchSize, keepAliveSeconds); 884 } 885 886 protected void checkForTimedoutScroll() { 887 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 888 } 889 890 protected ScrollResult scrollSearch(String query, int batchSize, int keepAliveSeconds) { 891 QueryMaker queryMaker = findQueryMaker("NXQL"); 892 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 893 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, null); 894 if (q == null) { 895 logger.log("Query cannot return anything due to conflicting clauses"); 896 throw new NuxeoException("Query cannot return anything due to conflicting clauses"); 897 } 898 if (logger.isLogEnabled()) { 899 logger.logSQL(q.selectInfo.sql, q.selectParams); 900 } 901 try { 902 if (connection.getAutoCommit()) { 903 throw new NuxeoException("Scroll should be done inside a transaction"); 904 } 905 // ps MUST NOT be auto-closed because it's referenced by a cursor 906 PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY, 907 ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); 908 ps.setFetchSize(batchSize); 909 int i = 1; 910 for (Serializable object : q.selectParams) { 911 setToPreparedStatement(ps, i++, object); 912 } 913 // rs MUST NOT be auto-closed because it's referenced by a cursor 914 ResultSet rs = ps.executeQuery(); 915 String scrollId = UUID.randomUUID().toString(); 916 registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds); 917 return scroll(scrollId); 918 } catch (SQLException e) { 919 throw new NuxeoException("Error on query", e); 920 } 921 } 922 923 protected class CursorResult { 924 protected final int keepAliveSeconds; 925 926 protected final PreparedStatement preparedStatement; 927 928 protected final ResultSet resultSet; 929 930 protected final int batchSize; 931 932 protected long lastCallTimestamp; 933 934 CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) { 935 this.preparedStatement = preparedStatement; 936 this.resultSet = resultSet; 937 this.batchSize = batchSize; 938 this.keepAliveSeconds = keepAliveSeconds; 939 lastCallTimestamp = System.currentTimeMillis(); 940 } 941 942 boolean timedOut(String scrollId) { 943 long now = System.currentTimeMillis(); 944 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 945 if (unregisterCursor(scrollId)) { 946 log.warn("Scroll " + scrollId + " timed out"); 947 } 948 return true; 949 } 950 return false; 951 } 952 953 void touch() { 954 lastCallTimestamp = System.currentTimeMillis(); 955 } 956 957 synchronized void close() throws SQLException { 958 if (resultSet != null) { 959 resultSet.close(); 960 } 961 if (preparedStatement != null) { 962 preparedStatement.close(); 963 } 964 } 965 } 966 967 protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize, 968 int keepAliveSeconds) { 969 cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds)); 970 } 971 972 protected boolean unregisterCursor(String scrollId) { 973 CursorResult cursor = cursorResults.remove(scrollId); 974 if (cursor != null) { 975 try { 976 cursor.close(); 977 return true; 978 } catch (SQLException e) { 979 log.error("Failed to close cursor for scroll: " + scrollId, e); 980 // do not propagate exception on cleaning 981 } 982 } 983 return false; 984 } 985 986 protected ScrollResult defaultScroll(String query) { 987 // the database has no proper support for cursor just return everything in one batch 988 QueryMaker queryMaker = findQueryMaker("NXQL"); 989 List<String> ids; 990 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 991 try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, 992 null)) { 993 ids = new ArrayList<>((int) ret.size()); 994 for (Map<String, Serializable> map : ret) { 995 ids.add(map.get("ecm:uuid").toString()); 996 } 997 } catch (SQLException e) { 998 throw new NuxeoException("Invalid scroll query: " + query, e); 999 } 1000 return new ScrollResultImpl(NOSCROLL_ID, ids); 1001 } 1002 1003 @Override 1004 public ScrollResult scroll(String scrollId) { 1005 if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) { 1006 // there is only one batch in this case 1007 return emptyResult(); 1008 } 1009 CursorResult cursorResult = cursorResults.get(scrollId); 1010 if (cursorResult == null) { 1011 throw new NuxeoException("Unknown or timed out scrollId"); 1012 } else if (cursorResult.timedOut(scrollId)) { 1013 throw new NuxeoException("Timed out scrollId"); 1014 } 1015 cursorResult.touch(); 1016 List<String> ids = new ArrayList<>(cursorResult.batchSize); 1017 synchronized (cursorResult) { 1018 try { 1019 if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) { 1020 unregisterCursor(scrollId); 1021 return emptyResult(); 1022 } 1023 while (ids.size() < cursorResult.batchSize) { 1024 if (cursorResult.resultSet.next()) { 1025 ids.add(cursorResult.resultSet.getString(1)); 1026 } else { 1027 cursorResult.close(); 1028 if (ids.isEmpty()) { 1029 unregisterCursor(scrollId); 1030 } 1031 break; 1032 } 1033 } 1034 } catch (SQLException e) { 1035 throw new NuxeoException("Error during scroll", e); 1036 } 1037 } 1038 return new ScrollResultImpl(scrollId, ids); 1039 } 1040 1041 @Override 1042 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 1043 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 1044 if (select == null) { 1045 return getAncestorsIdsIterative(ids); 1046 } 1047 Serializable whereIds = newIdArray(ids); 1048 Set<Serializable> res = new HashSet<>(); 1049 if (logger.isLogEnabled()) { 1050 logger.logSQL(select.sql, Collections.singleton(whereIds)); 1051 } 1052 Column what = select.whatColumns.get(0); 1053 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1054 setToPreparedStatementIdArray(ps, 1, whereIds); 1055 try (ResultSet rs = ps.executeQuery()) { 1056 countExecute(); 1057 List<Serializable> debugIds = null; 1058 if (logger.isLogEnabled()) { 1059 debugIds = new LinkedList<>(); 1060 } 1061 while (rs.next()) { 1062 if (dialect.supportsArraysReturnInsteadOfRows()) { 1063 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 1064 for (Serializable id : resultIds) { 1065 if (id != null) { 1066 res.add(id); 1067 if (logger.isLogEnabled()) { 1068 debugIds.add(id); 1069 } 1070 } 1071 } 1072 } else { 1073 Serializable id = what.getFromResultSet(rs, 1); 1074 if (id != null) { 1075 res.add(id); 1076 if (logger.isLogEnabled()) { 1077 debugIds.add(id); 1078 } 1079 } 1080 } 1081 } 1082 if (logger.isLogEnabled()) { 1083 logger.logIds(debugIds, false, 0); 1084 } 1085 } 1086 return res; 1087 } catch (SQLException e) { 1088 throw new NuxeoException("Failed to get ancestors ids", e); 1089 } 1090 } 1091 1092 /** 1093 * Uses iterative parentid selection. 1094 */ 1095 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 1096 try { 1097 LinkedList<Serializable> todo = new LinkedList<>(ids); 1098 Set<Serializable> done = new HashSet<>(); 1099 Set<Serializable> res = new HashSet<>(); 1100 while (!todo.isEmpty()) { 1101 done.addAll(todo); 1102 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 1103 if (logger.isLogEnabled()) { 1104 logger.logSQL(select.sql, todo); 1105 } 1106 Column what = select.whatColumns.get(0); 1107 Column where = select.whereColumns.get(0); 1108 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1109 int i = 1; 1110 for (Serializable id : todo) { 1111 where.setToPreparedStatement(ps, i++, id); 1112 } 1113 try (ResultSet rs = ps.executeQuery()) { 1114 countExecute(); 1115 todo = new LinkedList<>(); 1116 List<Serializable> debugIds = null; 1117 if (logger.isLogEnabled()) { 1118 debugIds = new LinkedList<>(); 1119 } 1120 while (rs.next()) { 1121 Serializable id = what.getFromResultSet(rs, 1); 1122 if (id != null) { 1123 res.add(id); 1124 if (!done.contains(id)) { 1125 todo.add(id); 1126 } 1127 if (logger.isLogEnabled()) { 1128 debugIds.add(id); 1129 } 1130 } 1131 } 1132 if (logger.isLogEnabled()) { 1133 logger.logIds(debugIds, false, 0); 1134 } 1135 } 1136 } 1137 } 1138 return res; 1139 } catch (SQLException e) { 1140 throw new NuxeoException("Failed to get ancestors ids", e); 1141 } 1142 } 1143 1144 @Override 1145 public void updateReadAcls() { 1146 if (!dialect.supportsReadAcl()) { 1147 return; 1148 } 1149 if (log.isDebugEnabled()) { 1150 log.debug("updateReadAcls: updating"); 1151 } 1152 try (Statement st = connection.createStatement()) { 1153 String sql = dialect.getUpdateReadAclsSql(); 1154 if (logger.isLogEnabled()) { 1155 logger.log(sql); 1156 } 1157 st.execute(sql); 1158 countExecute(); 1159 } catch (SQLException e) { 1160 checkConcurrentUpdate(e); 1161 throw new NuxeoException("Failed to update read acls", e); 1162 } 1163 if (log.isDebugEnabled()) { 1164 log.debug("updateReadAcls: done."); 1165 } 1166 } 1167 1168 @Override 1169 public void rebuildReadAcls() { 1170 if (!dialect.supportsReadAcl()) { 1171 return; 1172 } 1173 log.debug("rebuildReadAcls: rebuilding ..."); 1174 try (Statement st = connection.createStatement()) { 1175 String sql = dialect.getRebuildReadAclsSql(); 1176 logger.log(sql); 1177 st.execute(sql); 1178 countExecute(); 1179 } catch (SQLException e) { 1180 throw new NuxeoException("Failed to rebuild read acls", e); 1181 } 1182 log.debug("rebuildReadAcls: done."); 1183 } 1184 1185 /* 1186 * ----- Locking ----- 1187 */ 1188 1189 @Override 1190 public Lock getLock(Serializable id) { 1191 if (log.isDebugEnabled()) { 1192 try { 1193 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1194 } catch (SQLException e) { 1195 throw new RuntimeException(e); 1196 } 1197 } 1198 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1199 Row row = readSimpleRow(rowId); 1200 return row == null ? null 1201 : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1202 } 1203 1204 @Override 1205 public Lock setLock(final Serializable id, final Lock lock) { 1206 if (log.isDebugEnabled()) { 1207 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1208 } 1209 Lock oldLock = getLock(id); 1210 if (oldLock == null) { 1211 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1212 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1213 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1214 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1215 } 1216 return oldLock; 1217 } 1218 1219 @Override 1220 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1221 if (log.isDebugEnabled()) { 1222 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1223 } 1224 Lock oldLock = force ? null : getLock(id); 1225 if (!force && owner != null) { 1226 if (oldLock == null) { 1227 // not locked, nothing to do 1228 return null; 1229 } 1230 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1231 // existing mismatched lock, flag failure 1232 return new Lock(oldLock, true); 1233 } 1234 } 1235 if (force || oldLock != null) { 1236 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1237 } 1238 return oldLock; 1239 } 1240 1241 @Override 1242 public void markReferencedBinaries() { 1243 log.debug("Starting binaries GC mark"); 1244 BlobManager blobManager = Framework.getService(BlobManager.class); 1245 String repositoryName = getRepositoryName(); 1246 try (Statement st = connection.createStatement()) { 1247 int i = -1; 1248 for (String sql : sqlInfo.getBinariesSql) { 1249 i++; 1250 Column col = sqlInfo.getBinariesColumns.get(i); 1251 if (logger.isLogEnabled()) { 1252 logger.log(sql); 1253 } 1254 try (ResultSet rs = st.executeQuery(sql)) { 1255 countExecute(); 1256 int n = 0; 1257 while (rs.next()) { 1258 n++; 1259 String key = (String) col.getFromResultSet(rs, 1); 1260 if (key != null) { 1261 blobManager.markReferencedBinary(key, repositoryName); 1262 } 1263 } 1264 if (logger.isLogEnabled()) { 1265 logger.logCount(n); 1266 } 1267 } 1268 } 1269 } catch (SQLException e) { 1270 throw new RuntimeException("Failed to mark binaries for gC", e); 1271 } 1272 log.debug("End of binaries GC mark"); 1273 } 1274 1275 /* 1276 * ----- XAResource ----- 1277 */ 1278 1279 protected static String systemToString(Object o) { 1280 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1281 } 1282 1283 @Override 1284 public void start(Xid xid, int flags) throws XAException { 1285 try { 1286 xaresource.start(xid, flags); 1287 if (logger.isLogEnabled()) { 1288 logger.log("XA start on " + systemToString(xid)); 1289 } 1290 } catch (NuxeoException e) { 1291 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1292 } catch (XAException e) { 1293 logger.error("XA start error on " + systemToString(xid), e); 1294 throw e; 1295 } 1296 } 1297 1298 @Override 1299 public void end(Xid xid, int flags) throws XAException { 1300 try { 1301 xaresource.end(xid, flags); 1302 if (logger.isLogEnabled()) { 1303 logger.log("XA end on " + systemToString(xid)); 1304 } 1305 } catch (NullPointerException e) { 1306 // H2 when no active transaction 1307 logger.error("XA end error on " + systemToString(xid), e); 1308 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1309 } catch (XAException e) { 1310 if (flags != XAResource.TMFAIL) { 1311 logger.error("XA end error on " + systemToString(xid), e); 1312 } 1313 throw e; 1314 } 1315 } 1316 1317 @Override 1318 public int prepare(Xid xid) throws XAException { 1319 try { 1320 return xaresource.prepare(xid); 1321 } catch (XAException e) { 1322 logger.error("XA prepare error on " + systemToString(xid), e); 1323 throw e; 1324 } 1325 } 1326 1327 @Override 1328 public void commit(Xid xid, boolean onePhase) throws XAException { 1329 try { 1330 xaresource.commit(xid, onePhase); 1331 } catch (XAException e) { 1332 logger.error("XA commit error on " + systemToString(xid), e); 1333 throw e; 1334 } 1335 } 1336 1337 // rollback interacts with caches so is in RowMapper 1338 1339 @Override 1340 public void forget(Xid xid) throws XAException { 1341 xaresource.forget(xid); 1342 } 1343 1344 @Override 1345 public Xid[] recover(int flag) throws XAException { 1346 return xaresource.recover(flag); 1347 } 1348 1349 @Override 1350 public boolean setTransactionTimeout(int seconds) throws XAException { 1351 return xaresource.setTransactionTimeout(seconds); 1352 } 1353 1354 @Override 1355 public int getTransactionTimeout() throws XAException { 1356 return xaresource.getTransactionTimeout(); 1357 } 1358 1359 @Override 1360 public boolean isSameRM(XAResource xares) throws XAException { 1361 throw new UnsupportedOperationException(); 1362 } 1363 1364 @Override 1365 public boolean isConnected() { 1366 return connection != null; 1367 } 1368 1369 @Override 1370 public void connect(boolean noSharing) { 1371 openConnections(noSharing); 1372 } 1373 1374 @Override 1375 public void disconnect() { 1376 closeConnections(); 1377 } 1378 1379}