001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023 024import java.io.File; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.PrintStream; 029import java.io.Serializable; 030import java.security.MessageDigest; 031import java.security.NoSuchAlgorithmException; 032import java.sql.Array; 033import java.sql.DatabaseMetaData; 034import java.sql.PreparedStatement; 035import java.sql.ResultSet; 036import java.sql.SQLDataException; 037import java.sql.SQLException; 038import java.sql.Statement; 039import java.sql.Types; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Calendar; 043import java.util.Collection; 044import java.util.Collections; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.Map; 050import java.util.Map.Entry; 051import java.util.Set; 052import java.util.UUID; 053import java.util.concurrent.ConcurrentHashMap; 054 055import javax.transaction.xa.XAException; 056import javax.transaction.xa.XAResource; 057import javax.transaction.xa.Xid; 058 059import org.apache.commons.logging.Log; 060import org.apache.commons.logging.LogFactory; 061import org.nuxeo.common.Environment; 062import org.nuxeo.ecm.core.api.IterableQueryResult; 063import org.nuxeo.ecm.core.api.Lock; 064import org.nuxeo.ecm.core.api.NuxeoException; 065import org.nuxeo.ecm.core.api.PartialList; 066import org.nuxeo.ecm.core.api.ScrollResult; 067import org.nuxeo.ecm.core.api.ScrollResultImpl; 068import org.nuxeo.ecm.core.blob.DocumentBlobManager; 069import org.nuxeo.ecm.core.model.LockManager; 070import org.nuxeo.ecm.core.query.QueryFilter; 071import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 072import org.nuxeo.ecm.core.storage.sql.ColumnType; 073import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 074import org.nuxeo.ecm.core.storage.sql.Invalidations; 075import org.nuxeo.ecm.core.storage.sql.Mapper; 076import org.nuxeo.ecm.core.storage.sql.Model; 077import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 078import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 079import org.nuxeo.ecm.core.storage.sql.Row; 080import org.nuxeo.ecm.core.storage.sql.RowId; 081import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 082import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector; 089import org.nuxeo.runtime.api.Framework; 090 091/** 092 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 093 * computes statements. 094 * <p> 095 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 096 * statements recorded in the {@link SQLInfo}. 097 */ 098public class JDBCMapper extends JDBCRowMapper implements Mapper { 099 100 private static final Log log = LogFactory.getLog(JDBCMapper.class); 101 102 public static Map<String, Serializable> testProps = new HashMap<>(); 103 104 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 105 106 public static final String TEST_UPGRADE = "testUpgrade"; 107 108 // property in sql.txt file 109 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 110 111 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 112 113 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 114 115 public static final String TEST_UPGRADE_SYS_CHANGE_TOKEN = "testUpgradeSysChangeToken"; 116 117 protected TableUpgrader tableUpgrader; 118 119 private final QueryMakerService queryMakerService; 120 121 private final PathResolver pathResolver; 122 123 private final RepositoryImpl repository; 124 125 protected boolean clusteringEnabled; 126 127 protected static final String NOSCROLL_ID = "noscroll"; 128 129 /** 130 * Creates a new Mapper. 131 * 132 * @param model the model 133 * @param pathResolver the path resolver (used for startswith queries) 134 * @param sqlInfo the sql info 135 * @param clusterInvalidator the cluster invalidator 136 * @param repository the repository 137 */ 138 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator, 139 RepositoryImpl repository) { 140 super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator()); 141 this.pathResolver = pathResolver; 142 this.repository = repository; 143 clusteringEnabled = clusterInvalidator != null; 144 queryMakerService = Framework.getService(QueryMakerService.class); 145 146 tableUpgrader = new TableUpgrader(this); 147 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 148 TEST_UPGRADE_VERSIONS); 149 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 150 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 151 tableUpgrader.add(Model.HIER_TABLE_NAME, Model.MAIN_SYS_CHANGE_TOKEN_KEY, "upgradeSysChangeToken", 152 TEST_UPGRADE_SYS_CHANGE_TOKEN); 153 } 154 155 @Override 156 public int getTableSize(String tableName) { 157 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 158 } 159 160 /* 161 * ----- Root ----- 162 */ 163 164 @Override 165 public void createDatabase(String ddlMode) { 166 // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it 167 try { 168 if (!connection.getAutoCommit()) { 169 throw new NuxeoException("connection should not run in transactional mode for DDL operations"); 170 } 171 createTables(ddlMode); 172 } catch (SQLException e) { 173 throw new NuxeoException(e); 174 } 175 } 176 177 protected String getTableName(String origName) { 178 179 if (dialect instanceof DialectOracle) { 180 if (origName.length() > 30) { 181 182 StringBuilder sb = new StringBuilder(origName.length()); 183 184 try { 185 MessageDigest digest = MessageDigest.getInstance("MD5"); 186 sb.append(origName.substring(0, 15)); 187 sb.append('_'); 188 189 digest.update(origName.getBytes()); 190 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 191 192 return sb.toString(); 193 194 } catch (NoSuchAlgorithmException e) { 195 throw new RuntimeException("Error", e); 196 } 197 } 198 } 199 200 return origName; 201 } 202 203 protected void createTables(String ddlMode) throws SQLException { 204 ListCollector ddlCollector = new ListCollector(); 205 206 sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category 207 sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector); 208 sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector); 209 if (testProps.containsKey(TEST_UPGRADE)) { 210 // create "old" tables 211 sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect 212 } 213 214 String schemaName = dialect.getConnectionSchema(connection); 215 DatabaseMetaData metadata = connection.getMetaData(); 216 Set<String> tableNames = findTableNames(metadata, schemaName); 217 Database database = sqlInfo.getDatabase(); 218 Map<String, List<Column>> added = new HashMap<>(); 219 220 for (Table table : database.getTables()) { 221 String tableName = getTableName(table.getPhysicalName()); 222 if (!tableNames.contains(tableName.toUpperCase())) { 223 224 /* 225 * Create missing table. 226 */ 227 228 ddlCollector.add(table.getCreateSql()); 229 ddlCollector.addAll(table.getPostCreateSqls(model)); 230 231 added.put(table.getKey(), null); // null = table created 232 233 sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE); 234 235 } else { 236 237 /* 238 * Get existing columns. 239 */ 240 241 Map<String, Integer> columnTypes = new HashMap<>(); 242 Map<String, String> columnTypeNames = new HashMap<>(); 243 Map<String, Integer> columnTypeSizes = new HashMap<>(); 244 try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) { 245 while (rs.next()) { 246 String schema = rs.getString("TABLE_SCHEM"); 247 if (schema != null) { // null for MySQL, doh! 248 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 249 // H2 returns some system tables (locks) 250 continue; 251 } 252 } 253 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 254 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 255 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 256 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 257 } 258 } 259 260 /* 261 * Update types and create missing columns. 262 */ 263 264 List<Column> addedColumns = new LinkedList<>(); 265 for (Column column : table.getColumns()) { 266 String upperName = column.getPhysicalName().toUpperCase(); 267 Integer type = columnTypes.remove(upperName); 268 if (type == null) { 269 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 270 ddlCollector.add(table.getAddColumnSql(column)); 271 ddlCollector.addAll(table.getPostAddSqls(column, model)); 272 addedColumns.add(column); 273 } else { 274 int expected = column.getJdbcType(); 275 int actual = type.intValue(); 276 String actualName = columnTypeNames.get(upperName); 277 Integer actualSize = columnTypeSizes.get(upperName); 278 if (!column.setJdbcType(actual, actualName, actualSize.intValue())) { 279 log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)", 280 column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, 281 actualSize)); 282 } 283 } 284 } 285 for (String col : dialect.getIgnoredColumns(table)) { 286 columnTypes.remove(col.toUpperCase()); 287 } 288 if (!columnTypes.isEmpty()) { 289 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 290 + String.join(", ", columnTypes.keySet())); 291 } 292 if (!addedColumns.isEmpty()) { 293 if (added.containsKey(table.getKey())) { 294 throw new AssertionError(); 295 } 296 added.put(table.getKey(), addedColumns); 297 } 298 } 299 } 300 301 if (testProps.containsKey(TEST_UPGRADE)) { 302 // create "old" content in tables 303 sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector); 304 } 305 306 // run upgrade for each table if added columns or test 307 for (Entry<String, List<Column>> en : added.entrySet()) { 308 List<Column> addedColumns = en.getValue(); 309 String tableKey = en.getKey(); 310 upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector); 311 } 312 313 sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector); 314 sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector); 315 316 // aclr_permission check for PostgreSQL 317 dialect.performAdditionalStatements(connection); 318 319 /* 320 * Execute all the collected DDL, or dump it if requested, depending on ddlMode 321 */ 322 323 // ddlMode may be: 324 // ignore (not treated here, nothing done) 325 // dump (implies execute) 326 // dump,execute 327 // dump,ignore (no execute) 328 // execute 329 // abort (implies dump) 330 // compat can be used instead of execute to always recreate stored procedures 331 332 List<String> ddl = ddlCollector.getStrings(); 333 boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE); 334 boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP); 335 boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT); 336 if (dump || abort) { 337 338 /* 339 * Dump DDL if not empty. 340 */ 341 342 if (!ddl.isEmpty()) { 343 File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql"); 344 try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) { 345 for (String sql : dialect.getDumpStart()) { 346 ps.println(sql); 347 } 348 for (String sql : ddl) { 349 sql = sql.trim(); 350 if (sql.endsWith(";")) { 351 sql = sql.substring(0, sql.length() - 1); 352 } 353 ps.println(dialect.getSQLForDump(sql)); 354 } 355 for (String sql : dialect.getDumpStop()) { 356 ps.println(sql); 357 } 358 } catch (IOException e) { 359 throw new NuxeoException(e); 360 } 361 362 /* 363 * Abort if requested. 364 */ 365 366 if (abort) { 367 log.error("Dumped DDL to: " + dumpFile); 368 throw new NuxeoException("Database initialization failed for: " + repository.getName() 369 + ", DDL must be executed: " + dumpFile); 370 } 371 } 372 } 373 if (!ignore) { 374 375 /* 376 * Execute DDL. 377 */ 378 379 try (Statement st = connection.createStatement()) { 380 for (String sql : ddl) { 381 logger.log(sql.replace("\n", "\n ")); // indented 382 try { 383 st.execute(sql); 384 } catch (SQLException e) { 385 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 386 } 387 countExecute(); 388 } 389 } 390 391 /* 392 * Execute post-DDL stuff. 393 */ 394 395 try (Statement st = connection.createStatement()) { 396 for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) { 397 logger.log(sql.replace("\n", "\n ")); // indented 398 try { 399 st.execute(sql); 400 } catch (SQLException e) { 401 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 402 } 403 countExecute(); 404 } 405 } 406 } 407 } 408 409 protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector) 410 throws SQLException { 411 tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector); 412 } 413 414 /** Finds uppercase table names. */ 415 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 416 Set<String> tableNames = new HashSet<>(); 417 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 418 while (rs.next()) { 419 String tableName = rs.getString("TABLE_NAME"); 420 tableNames.add(tableName.toUpperCase()); 421 } 422 rs.close(); 423 return tableNames; 424 } 425 426 @Override 427 public int getClusterNodeIdType() { 428 return sqlInfo.getClusterNodeIdType(); 429 } 430 431 @Override 432 public void createClusterNode(Serializable nodeId) { 433 Calendar now = Calendar.getInstance(); 434 String sql = sqlInfo.getCreateClusterNodeSql(); 435 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 436 try (PreparedStatement ps = connection.prepareStatement(sql)) { 437 if (logger.isLogEnabled()) { 438 logger.logSQL(sql, Arrays.asList(nodeId, now)); 439 } 440 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 441 columns.get(1).setToPreparedStatement(ps, 2, now); 442 ps.execute(); 443 444 } catch (SQLException e) { 445 throw new NuxeoException(e); 446 } 447 } 448 449 @Override 450 public void removeClusterNode(Serializable nodeId) { 451 // delete from cluster_nodes 452 String sql = sqlInfo.getDeleteClusterNodeSql(); 453 Column column = sqlInfo.getDeleteClusterNodeColumn(); 454 try (PreparedStatement ps = connection.prepareStatement(sql)) { 455 if (logger.isLogEnabled()) { 456 logger.logSQL(sql, Collections.singletonList(nodeId)); 457 } 458 column.setToPreparedStatement(ps, 1, nodeId); 459 ps.execute(); 460 // delete un-processed invals from cluster_invals 461 deleteClusterInvals(nodeId); 462 } catch (SQLException e) { 463 throw new NuxeoException(e); 464 } 465 } 466 467 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 468 String sql = sqlInfo.getDeleteClusterInvalsSql(); 469 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 470 try (PreparedStatement ps = connection.prepareStatement(sql)) { 471 if (logger.isLogEnabled()) { 472 logger.logSQL(sql, Collections.singletonList(nodeId)); 473 } 474 column.setToPreparedStatement(ps, 1, nodeId); 475 int n = ps.executeUpdate(); 476 countExecute(); 477 if (logger.isLogEnabled()) { 478 logger.logCount(n); 479 } 480 } 481 } 482 483 @Override 484 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 485 String sql = dialect.getClusterInsertInvalidations(); 486 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 487 try (PreparedStatement ps = connection.prepareStatement(sql)) { 488 int kind = Invalidations.MODIFIED; 489 while (true) { 490 Set<RowId> rowIds = invalidations.getKindSet(kind); 491 492 // reorganize by id 493 Map<Serializable, Set<String>> res = new HashMap<>(); 494 for (RowId rowId : rowIds) { 495 Set<String> tableNames = res.get(rowId.id); 496 if (tableNames == null) { 497 res.put(rowId.id, tableNames = new HashSet<>()); 498 } 499 tableNames.add(rowId.tableName); 500 } 501 502 // do inserts 503 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 504 Serializable id = en.getKey(); 505 String fragments = join(en.getValue(), ' '); 506 if (logger.isLogEnabled()) { 507 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 508 } 509 Serializable frags; 510 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 511 frags = fragments.split(" "); 512 } else { 513 frags = fragments; 514 } 515 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 516 columns.get(1).setToPreparedStatement(ps, 2, id); 517 columns.get(2).setToPreparedStatement(ps, 3, frags); 518 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 519 ps.execute(); 520 countExecute(); 521 } 522 if (kind == Invalidations.MODIFIED) { 523 kind = Invalidations.DELETED; 524 } else { 525 break; 526 } 527 } 528 } catch (SQLException e) { 529 throw new NuxeoException("Could not invalidate", e); 530 } 531 } 532 533 // join that works on a set 534 protected static String join(Collection<String> strings, char sep) { 535 if (strings.isEmpty()) { 536 throw new RuntimeException(); 537 } 538 if (strings.size() == 1) { 539 return strings.iterator().next(); 540 } 541 int size = 0; 542 for (String word : strings) { 543 size += word.length() + 1; 544 } 545 StringBuilder buf = new StringBuilder(size); 546 for (String word : strings) { 547 buf.append(word); 548 buf.append(sep); 549 } 550 buf.setLength(size - 1); 551 return buf.toString(); 552 } 553 554 @Override 555 public Invalidations getClusterInvalidations(Serializable nodeId) { 556 Invalidations invalidations = new Invalidations(); 557 String sql = dialect.getClusterGetInvalidations(); 558 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 559 if (logger.isLogEnabled()) { 560 logger.logSQL(sql, Collections.singletonList(nodeId)); 561 } 562 try (PreparedStatement ps = connection.prepareStatement(sql)) { 563 setToPreparedStatement(ps, 1, nodeId); 564 try (ResultSet rs = ps.executeQuery()) { 565 countExecute(); 566 while (rs.next()) { 567 // first column ignored, it's the node id 568 Serializable id = columns.get(1).getFromResultSet(rs, 1); 569 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 570 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 571 String[] fragments; 572 if (dialect.supportsArrays() && frags instanceof String[]) { 573 fragments = (String[]) frags; 574 } else { 575 fragments = ((String) frags).split(" "); 576 } 577 invalidations.add(id, fragments, kind); 578 } 579 } 580 if (logger.isLogEnabled()) { 581 // logCount(n); 582 logger.log(" -> " + invalidations); 583 } 584 if (dialect.isClusteringDeleteNeeded()) { 585 deleteClusterInvals(nodeId); 586 } 587 return invalidations; 588 } catch (SQLException e) { 589 throw new NuxeoException("Could not invalidate", e); 590 } 591 } 592 593 @Override 594 public Serializable getRootId(String repositoryId) { 595 String sql = sqlInfo.getSelectRootIdSql(); 596 if (logger.isLogEnabled()) { 597 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 598 } 599 try (PreparedStatement ps = connection.prepareStatement(sql)) { 600 ps.setString(1, repositoryId); 601 try (ResultSet rs = ps.executeQuery()) { 602 countExecute(); 603 if (!rs.next()) { 604 if (logger.isLogEnabled()) { 605 logger.log(" -> (none)"); 606 } 607 return null; 608 } 609 Column column = sqlInfo.getSelectRootIdWhatColumn(); 610 Serializable id = column.getFromResultSet(rs, 1); 611 if (logger.isLogEnabled()) { 612 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 613 } 614 // check that we didn't get several rows 615 if (rs.next()) { 616 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 617 } 618 return id; 619 } 620 } catch (SQLException e) { 621 throw new NuxeoException("Could not select: " + sql, e); 622 } 623 } 624 625 @Override 626 public void setRootId(Serializable repositoryId, Serializable id) { 627 String sql = sqlInfo.getInsertRootIdSql(); 628 try (PreparedStatement ps = connection.prepareStatement(sql)) { 629 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 630 List<Serializable> debugValues = null; 631 if (logger.isLogEnabled()) { 632 debugValues = new ArrayList<>(2); 633 } 634 int i = 0; 635 for (Column column : columns) { 636 i++; 637 String key = column.getKey(); 638 Serializable v; 639 if (key.equals(Model.MAIN_KEY)) { 640 v = id; 641 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 642 v = repositoryId; 643 } else { 644 throw new RuntimeException(key); 645 } 646 column.setToPreparedStatement(ps, i, v); 647 if (debugValues != null) { 648 debugValues.add(v); 649 } 650 } 651 if (debugValues != null) { 652 logger.logSQL(sql, debugValues); 653 debugValues.clear(); 654 } 655 ps.execute(); 656 countExecute(); 657 } catch (SQLException e) { 658 throw new NuxeoException("Could not insert: " + sql, e); 659 } 660 } 661 662 protected QueryMaker findQueryMaker(String queryType) { 663 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 664 QueryMaker queryMaker; 665 try { 666 queryMaker = klass.newInstance(); 667 } catch (ReflectiveOperationException e) { 668 throw new NuxeoException(e); 669 } 670 if (queryMaker.accepts(queryType)) { 671 return queryMaker; 672 } 673 } 674 return null; 675 } 676 677 protected void prepareUserReadAcls(QueryFilter queryFilter) { 678 String sql = dialect.getPrepareUserReadAclsSql(); 679 Serializable principals = queryFilter.getPrincipals(); 680 if (sql == null || principals == null) { 681 return; 682 } 683 if (!dialect.supportsArrays()) { 684 principals = String.join(Dialect.ARRAY_SEP, (String[]) principals); 685 } 686 try (PreparedStatement ps = connection.prepareStatement(sql)) { 687 if (logger.isLogEnabled()) { 688 logger.logSQL(sql, Collections.singleton(principals)); 689 } 690 setToPreparedStatement(ps, 1, principals); 691 ps.execute(); 692 countExecute(); 693 } catch (SQLException e) { 694 throw new NuxeoException("Failed to prepare user read acl cache", e); 695 } 696 } 697 698 @Override 699 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 700 boolean countTotal) { 701 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 702 } 703 704 @Override 705 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 706 PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo, 707 (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1)); 708 709 if (logger.isLogEnabled()) { 710 logger.logIds(result.list, countUpTo != 0, result.totalSize); 711 } 712 713 return result; 714 } 715 716 // queryFilter used for principals and permissions 717 @Override 718 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 719 boolean distinctDocuments, Object... params) { 720 if (dialect.needsPrepareUserReadAcls()) { 721 prepareUserReadAcls(queryFilter); 722 } 723 QueryMaker queryMaker = findQueryMaker(queryType); 724 if (queryMaker == null) { 725 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 726 } 727 query = computeDistinctDocuments(query, distinctDocuments); 728 try { 729 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 730 } catch (SQLException e) { 731 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 732 } 733 } 734 735 @Override 736 public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType, 737 QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) { 738 query = computeDistinctDocuments(query, distinctDocuments); 739 PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo, 740 (info, rs) -> info.mapMaker.makeMap(rs), params); 741 742 if (logger.isLogEnabled()) { 743 logger.logMaps(result.list, countUpTo != 0, result.totalSize); 744 } 745 746 return result; 747 } 748 749 protected String computeDistinctDocuments(String query, boolean distinctDocuments) { 750 if (distinctDocuments) { 751 String q = query.toLowerCase(); 752 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 753 // Replace "select" by "select distinct", split at "select ".length() index 754 query = "SELECT DISTINCT " + query.substring(7); 755 } 756 } 757 return query; 758 } 759 760 protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter, 761 long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) { 762 if (dialect.needsPrepareUserReadAcls()) { 763 prepareUserReadAcls(queryFilter); 764 } 765 QueryMaker queryMaker = findQueryMaker(queryType); 766 if (queryMaker == null) { 767 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 768 } 769 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params); 770 771 if (q == null) { 772 logger.log("Query cannot return anything due to conflicting clauses"); 773 return new PartialList<>(Collections.emptyList(), 0); 774 } 775 long limit = queryFilter.getLimit(); 776 long offset = queryFilter.getOffset(); 777 778 if (logger.isLogEnabled()) { 779 String sql = q.selectInfo.sql; 780 if (limit != 0) { 781 sql += " -- LIMIT " + limit + " OFFSET " + offset; 782 } 783 if (countUpTo != 0) { 784 sql += " -- COUNT TOTAL UP TO " + countUpTo; 785 } 786 logger.logSQL(sql, q.selectParams); 787 } 788 789 String sql = q.selectInfo.sql; 790 791 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 792 // full result set not needed for counting 793 sql = dialect.addPagingClause(sql, limit, offset); 794 limit = 0; 795 offset = 0; 796 } else if (countUpTo > 0 && dialect.supportsPaging()) { 797 // ask one more row 798 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 799 } 800 801 try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, 802 ResultSet.CONCUR_READ_ONLY)) { 803 int i = 1; 804 for (Serializable object : q.selectParams) { 805 setToPreparedStatement(ps, i++, object); 806 } 807 try (ResultSet rs = ps.executeQuery()) { 808 countExecute(); 809 810 // limit/offset 811 long totalSize = -1; 812 boolean available; 813 if ((limit == 0) || (offset == 0)) { 814 available = rs.first(); 815 if (!available) { 816 totalSize = 0; 817 } 818 if (limit == 0) { 819 limit = -1; // infinite 820 } 821 } else { 822 available = rs.absolute((int) offset + 1); 823 } 824 825 List<T> projections = new LinkedList<>(); 826 int rowNum = 0; 827 while (available && (limit != 0)) { 828 try { 829 T projection = extractor.apply(q.selectInfo, rs); 830 projections.add(projection); 831 rowNum = rs.getRow(); 832 available = rs.next(); 833 limit--; 834 } catch (SQLDataException e) { 835 // actually no data available, MariaDB Connector/J lied, stop now 836 available = false; 837 } 838 } 839 840 // total size 841 if (countUpTo != 0 && (totalSize == -1)) { 842 if (!available && (rowNum != 0)) { 843 // last row read was the actual last 844 totalSize = rowNum; 845 } else { 846 // available if limit reached with some left 847 // rowNum == 0 if skipped too far 848 rs.last(); 849 totalSize = rs.getRow(); 850 } 851 if (countUpTo > 0 && totalSize > countUpTo) { 852 // the result where truncated we don't know the total size 853 totalSize = -2; 854 } 855 } 856 857 return new PartialList<>(projections, totalSize); 858 } 859 } catch (SQLException e) { 860 throw new NuxeoException("Invalid query: " + query, e); 861 } 862 } 863 864 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 865 if (object instanceof Calendar) { 866 Calendar cal = (Calendar) object; 867 ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal); 868 } else if (object instanceof java.sql.Date) { 869 ps.setDate(i, (java.sql.Date) object); 870 } else if (object instanceof Long) { 871 ps.setLong(i, ((Long) object).longValue()); 872 } else if (object instanceof WrappedId) { 873 dialect.setId(ps, i, object.toString()); 874 } else if (object instanceof Object[]) { 875 int jdbcType; 876 if (object instanceof String[]) { 877 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 878 } else if (object instanceof Boolean[]) { 879 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 880 } else if (object instanceof Long[]) { 881 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 882 } else if (object instanceof Double[]) { 883 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 884 } else if (object instanceof java.sql.Date[]) { 885 jdbcType = Types.DATE; 886 } else if (object instanceof java.sql.Clob[]) { 887 jdbcType = Types.CLOB; 888 } else if (object instanceof Calendar[]) { 889 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 890 object = dialect.getTimestampFromCalendar((Calendar) object); 891 } else if (object instanceof Integer[]) { 892 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 893 } else { 894 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 895 } 896 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 897 ps.setArray(i, array); 898 } else { 899 ps.setObject(i, object); 900 } 901 return i; 902 } 903 904 @Override 905 public ScrollResult scroll(String query, int batchSize, int keepAliveSeconds) { 906 if (!dialect.supportsScroll()) { 907 return defaultScroll(query); 908 } 909 checkForTimedoutScroll(); 910 return scrollSearch(query, batchSize, keepAliveSeconds); 911 } 912 913 protected void checkForTimedoutScroll() { 914 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 915 } 916 917 protected ScrollResult scrollSearch(String query, int batchSize, int keepAliveSeconds) { 918 QueryMaker queryMaker = findQueryMaker("NXQL"); 919 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 920 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 921 if (q == null) { 922 logger.log("Query cannot return anything due to conflicting clauses"); 923 throw new NuxeoException("Query cannot return anything due to conflicting clauses"); 924 } 925 if (logger.isLogEnabled()) { 926 logger.logSQL(q.selectInfo.sql, q.selectParams); 927 } 928 try { 929 if (connection.getAutoCommit()) { 930 throw new NuxeoException("Scroll should be done inside a transaction"); 931 } 932 // ps MUST NOT be auto-closed because it's referenced by a cursor 933 PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY, 934 ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); 935 ps.setFetchSize(batchSize); 936 int i = 1; 937 for (Serializable object : q.selectParams) { 938 setToPreparedStatement(ps, i++, object); 939 } 940 // rs MUST NOT be auto-closed because it's referenced by a cursor 941 ResultSet rs = ps.executeQuery(); 942 String scrollId = UUID.randomUUID().toString(); 943 registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds); 944 return scroll(scrollId); 945 } catch (SQLException e) { 946 throw new NuxeoException("Error on query", e); 947 } 948 } 949 950 protected class CursorResult { 951 protected final int keepAliveSeconds; 952 953 protected final PreparedStatement preparedStatement; 954 955 protected final ResultSet resultSet; 956 957 protected final int batchSize; 958 959 protected long lastCallTimestamp; 960 961 CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) { 962 this.preparedStatement = preparedStatement; 963 this.resultSet = resultSet; 964 this.batchSize = batchSize; 965 this.keepAliveSeconds = keepAliveSeconds; 966 lastCallTimestamp = System.currentTimeMillis(); 967 } 968 969 boolean timedOut(String scrollId) { 970 long now = System.currentTimeMillis(); 971 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 972 if (unregisterCursor(scrollId)) { 973 log.warn("Scroll " + scrollId + " timed out"); 974 } 975 return true; 976 } 977 return false; 978 } 979 980 void touch() { 981 lastCallTimestamp = System.currentTimeMillis(); 982 } 983 984 synchronized void close() throws SQLException { 985 if (resultSet != null) { 986 resultSet.close(); 987 } 988 if (preparedStatement != null) { 989 preparedStatement.close(); 990 } 991 } 992 } 993 994 protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize, 995 int keepAliveSeconds) { 996 cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds)); 997 } 998 999 protected boolean unregisterCursor(String scrollId) { 1000 CursorResult cursor = cursorResults.remove(scrollId); 1001 if (cursor != null) { 1002 try { 1003 cursor.close(); 1004 return true; 1005 } catch (SQLException e) { 1006 log.error("Failed to close cursor for scroll: " + scrollId, e); 1007 // do not propagate exception on cleaning 1008 } 1009 } 1010 return false; 1011 } 1012 1013 protected ScrollResult defaultScroll(String query) { 1014 // the database has no proper support for cursor just return everything in one batch 1015 QueryMaker queryMaker = findQueryMaker("NXQL"); 1016 List<String> ids; 1017 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 1018 try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) { 1019 ids = new ArrayList<>((int) ret.size()); 1020 for (Map<String, Serializable> map : ret) { 1021 ids.add(map.get("ecm:uuid").toString()); 1022 } 1023 } catch (SQLException e) { 1024 throw new NuxeoException("Invalid scroll query: " + query, e); 1025 } 1026 return new ScrollResultImpl(NOSCROLL_ID, ids); 1027 } 1028 1029 @Override 1030 public ScrollResult scroll(String scrollId) { 1031 if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) { 1032 // there is only one batch in this case 1033 return emptyResult(); 1034 } 1035 CursorResult cursorResult = cursorResults.get(scrollId); 1036 if (cursorResult == null) { 1037 throw new NuxeoException("Unknown or timed out scrollId"); 1038 } else if (cursorResult.timedOut(scrollId)) { 1039 throw new NuxeoException("Timed out scrollId"); 1040 } 1041 cursorResult.touch(); 1042 List<String> ids = new ArrayList<>(cursorResult.batchSize); 1043 synchronized (cursorResult) { 1044 try { 1045 if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) { 1046 unregisterCursor(scrollId); 1047 return emptyResult(); 1048 } 1049 while (ids.size() < cursorResult.batchSize) { 1050 if (cursorResult.resultSet.next()) { 1051 ids.add(cursorResult.resultSet.getString(1)); 1052 } else { 1053 cursorResult.close(); 1054 if (ids.isEmpty()) { 1055 unregisterCursor(scrollId); 1056 } 1057 break; 1058 } 1059 } 1060 } catch (SQLException e) { 1061 throw new NuxeoException("Error during scroll", e); 1062 } 1063 } 1064 return new ScrollResultImpl(scrollId, ids); 1065 } 1066 1067 @Override 1068 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 1069 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 1070 if (select == null) { 1071 return getAncestorsIdsIterative(ids); 1072 } 1073 Serializable whereIds = newIdArray(ids); 1074 Set<Serializable> res = new HashSet<>(); 1075 if (logger.isLogEnabled()) { 1076 logger.logSQL(select.sql, Collections.singleton(whereIds)); 1077 } 1078 Column what = select.whatColumns.get(0); 1079 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1080 setToPreparedStatementIdArray(ps, 1, whereIds); 1081 try (ResultSet rs = ps.executeQuery()) { 1082 countExecute(); 1083 List<Serializable> debugIds = null; 1084 if (logger.isLogEnabled()) { 1085 debugIds = new LinkedList<>(); 1086 } 1087 while (rs.next()) { 1088 if (dialect.supportsArraysReturnInsteadOfRows()) { 1089 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 1090 for (Serializable id : resultIds) { 1091 if (id != null) { 1092 res.add(id); 1093 if (logger.isLogEnabled()) { 1094 debugIds.add(id); 1095 } 1096 } 1097 } 1098 } else { 1099 Serializable id = what.getFromResultSet(rs, 1); 1100 if (id != null) { 1101 res.add(id); 1102 if (logger.isLogEnabled()) { 1103 debugIds.add(id); 1104 } 1105 } 1106 } 1107 } 1108 if (logger.isLogEnabled()) { 1109 logger.logIds(debugIds, false, 0); 1110 } 1111 } 1112 return res; 1113 } catch (SQLException e) { 1114 throw new NuxeoException("Failed to get ancestors ids", e); 1115 } 1116 } 1117 1118 /** 1119 * Uses iterative parentid selection. 1120 */ 1121 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 1122 try { 1123 LinkedList<Serializable> todo = new LinkedList<>(ids); 1124 Set<Serializable> done = new HashSet<>(); 1125 Set<Serializable> res = new HashSet<>(); 1126 while (!todo.isEmpty()) { 1127 done.addAll(todo); 1128 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 1129 if (logger.isLogEnabled()) { 1130 logger.logSQL(select.sql, todo); 1131 } 1132 Column what = select.whatColumns.get(0); 1133 Column where = select.whereColumns.get(0); 1134 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1135 int i = 1; 1136 for (Serializable id : todo) { 1137 where.setToPreparedStatement(ps, i++, id); 1138 } 1139 try (ResultSet rs = ps.executeQuery()) { 1140 countExecute(); 1141 todo = new LinkedList<>(); 1142 List<Serializable> debugIds = null; 1143 if (logger.isLogEnabled()) { 1144 debugIds = new LinkedList<>(); 1145 } 1146 while (rs.next()) { 1147 Serializable id = what.getFromResultSet(rs, 1); 1148 if (id != null) { 1149 res.add(id); 1150 if (!done.contains(id)) { 1151 todo.add(id); 1152 } 1153 if (logger.isLogEnabled()) { 1154 debugIds.add(id); 1155 } 1156 } 1157 } 1158 if (logger.isLogEnabled()) { 1159 logger.logIds(debugIds, false, 0); 1160 } 1161 } 1162 } 1163 } 1164 return res; 1165 } catch (SQLException e) { 1166 throw new NuxeoException("Failed to get ancestors ids", e); 1167 } 1168 } 1169 1170 @Override 1171 public void updateReadAcls() { 1172 if (!dialect.supportsReadAcl()) { 1173 return; 1174 } 1175 if (log.isDebugEnabled()) { 1176 log.debug("updateReadAcls: updating"); 1177 } 1178 try (Statement st = connection.createStatement()) { 1179 String sql = dialect.getUpdateReadAclsSql(); 1180 if (logger.isLogEnabled()) { 1181 logger.log(sql); 1182 } 1183 st.execute(sql); 1184 countExecute(); 1185 } catch (SQLException e) { 1186 checkConcurrentUpdate(e); 1187 throw new NuxeoException("Failed to update read acls", e); 1188 } 1189 if (log.isDebugEnabled()) { 1190 log.debug("updateReadAcls: done."); 1191 } 1192 } 1193 1194 @Override 1195 public void rebuildReadAcls() { 1196 if (!dialect.supportsReadAcl()) { 1197 return; 1198 } 1199 log.debug("rebuildReadAcls: rebuilding ..."); 1200 try (Statement st = connection.createStatement()) { 1201 String sql = dialect.getRebuildReadAclsSql(); 1202 logger.log(sql); 1203 st.execute(sql); 1204 countExecute(); 1205 } catch (SQLException e) { 1206 throw new NuxeoException("Failed to rebuild read acls", e); 1207 } 1208 log.debug("rebuildReadAcls: done."); 1209 } 1210 1211 /* 1212 * ----- Locking ----- 1213 */ 1214 1215 @Override 1216 public Lock getLock(Serializable id) { 1217 if (log.isDebugEnabled()) { 1218 try { 1219 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1220 } catch (SQLException e) { 1221 throw new RuntimeException(e); 1222 } 1223 } 1224 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1225 Row row = readSimpleRow(rowId); 1226 return row == null ? null 1227 : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1228 } 1229 1230 @Override 1231 public Lock setLock(final Serializable id, final Lock lock) { 1232 if (log.isDebugEnabled()) { 1233 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1234 } 1235 Lock oldLock = getLock(id); 1236 if (oldLock == null) { 1237 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1238 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1239 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1240 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1241 } 1242 return oldLock; 1243 } 1244 1245 @Override 1246 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1247 if (log.isDebugEnabled()) { 1248 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1249 } 1250 Lock oldLock = force ? null : getLock(id); 1251 if (!force && owner != null) { 1252 if (oldLock == null) { 1253 // not locked, nothing to do 1254 return null; 1255 } 1256 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1257 // existing mismatched lock, flag failure 1258 return new Lock(oldLock, true); 1259 } 1260 } 1261 if (force || oldLock != null) { 1262 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1263 } 1264 return oldLock; 1265 } 1266 1267 @Override 1268 public void markReferencedBinaries() { 1269 log.debug("Starting binaries GC mark"); 1270 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 1271 String repositoryName = getRepositoryName(); 1272 try (Statement st = connection.createStatement()) { 1273 int i = -1; 1274 for (String sql : sqlInfo.getBinariesSql) { 1275 i++; 1276 Column col = sqlInfo.getBinariesColumns.get(i); 1277 if (logger.isLogEnabled()) { 1278 logger.log(sql); 1279 } 1280 try (ResultSet rs = st.executeQuery(sql)) { 1281 countExecute(); 1282 int n = 0; 1283 while (rs.next()) { 1284 n++; 1285 String key = (String) col.getFromResultSet(rs, 1); 1286 if (key != null) { 1287 blobManager.markReferencedBinary(key, repositoryName); 1288 } 1289 } 1290 if (logger.isLogEnabled()) { 1291 logger.logCount(n); 1292 } 1293 } 1294 } 1295 } catch (SQLException e) { 1296 throw new RuntimeException("Failed to mark binaries for gC", e); 1297 } 1298 log.debug("End of binaries GC mark"); 1299 } 1300 1301 /* 1302 * ----- XAResource ----- 1303 */ 1304 1305 protected static String systemToString(Object o) { 1306 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1307 } 1308 1309 @Override 1310 public void start(Xid xid, int flags) throws XAException { 1311 try { 1312 xaresource.start(xid, flags); 1313 if (logger.isLogEnabled()) { 1314 logger.log("XA start on " + systemToString(xid)); 1315 } 1316 } catch (NuxeoException e) { 1317 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1318 } catch (XAException e) { 1319 logger.error("XA start error on " + systemToString(xid), e); 1320 throw e; 1321 } 1322 } 1323 1324 @Override 1325 public void end(Xid xid, int flags) throws XAException { 1326 try { 1327 xaresource.end(xid, flags); 1328 if (logger.isLogEnabled()) { 1329 logger.log("XA end on " + systemToString(xid)); 1330 } 1331 } catch (NullPointerException e) { 1332 // H2 when no active transaction 1333 logger.error("XA end error on " + systemToString(xid), e); 1334 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1335 } catch (XAException e) { 1336 if (flags != XAResource.TMFAIL) { 1337 logger.error("XA end error on " + systemToString(xid), e); 1338 } 1339 throw e; 1340 } 1341 } 1342 1343 @Override 1344 public int prepare(Xid xid) throws XAException { 1345 try { 1346 return xaresource.prepare(xid); 1347 } catch (XAException e) { 1348 logger.error("XA prepare error on " + systemToString(xid), e); 1349 throw e; 1350 } 1351 } 1352 1353 @Override 1354 public void commit(Xid xid, boolean onePhase) throws XAException { 1355 try { 1356 xaresource.commit(xid, onePhase); 1357 } catch (XAException e) { 1358 logger.error("XA commit error on " + systemToString(xid), e); 1359 throw e; 1360 } 1361 } 1362 1363 // rollback interacts with caches so is in RowMapper 1364 1365 @Override 1366 public void forget(Xid xid) throws XAException { 1367 xaresource.forget(xid); 1368 } 1369 1370 @Override 1371 public Xid[] recover(int flag) throws XAException { 1372 return xaresource.recover(flag); 1373 } 1374 1375 @Override 1376 public boolean setTransactionTimeout(int seconds) throws XAException { 1377 return xaresource.setTransactionTimeout(seconds); 1378 } 1379 1380 @Override 1381 public int getTransactionTimeout() throws XAException { 1382 return xaresource.getTransactionTimeout(); 1383 } 1384 1385 @Override 1386 public boolean isSameRM(XAResource xares) throws XAException { 1387 throw new UnsupportedOperationException(); 1388 } 1389 1390 @Override 1391 public boolean isConnected() { 1392 return connection != null; 1393 } 1394 1395 @Override 1396 public void connect(boolean noSharing) { 1397 openConnections(noSharing); 1398 } 1399 1400 @Override 1401 public void disconnect() { 1402 closeConnections(); 1403 } 1404 1405 /** 1406 * @since 7.10-HF25, 8.10-HF06, 9.2 1407 */ 1408 @FunctionalInterface 1409 protected interface BiFunctionSQLException<T, U, R> { 1410 1411 /** 1412 * Applies this function to the given arguments. 1413 * 1414 * @param t the first function argument 1415 * @param u the second function argument 1416 * @return the function result 1417 */ 1418 R apply(T t, U u) throws SQLException; 1419 1420 } 1421 1422}