001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023 024import java.io.File; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.PrintStream; 029import java.io.Serializable; 030import java.security.MessageDigest; 031import java.security.NoSuchAlgorithmException; 032import java.sql.Array; 033import java.sql.DatabaseMetaData; 034import java.sql.PreparedStatement; 035import java.sql.ResultSet; 036import java.sql.SQLDataException; 037import java.sql.SQLException; 038import java.sql.Statement; 039import java.sql.Types; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Calendar; 043import java.util.Collection; 044import java.util.Collections; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.Map; 050import java.util.Map.Entry; 051import java.util.Set; 052import java.util.UUID; 053import java.util.concurrent.ConcurrentHashMap; 054 055import javax.transaction.xa.XAException; 056import javax.transaction.xa.XAResource; 057import javax.transaction.xa.Xid; 058 059import org.apache.commons.logging.Log; 060import org.apache.commons.logging.LogFactory; 061import org.nuxeo.common.Environment; 062import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 063import org.nuxeo.ecm.core.api.IterableQueryResult; 064import org.nuxeo.ecm.core.api.Lock; 065import org.nuxeo.ecm.core.api.NuxeoException; 066import org.nuxeo.ecm.core.api.PartialList; 067import org.nuxeo.ecm.core.api.ScrollResult; 068import org.nuxeo.ecm.core.api.ScrollResultImpl; 069import org.nuxeo.ecm.core.blob.DocumentBlobManager; 070import org.nuxeo.ecm.core.model.LockManager; 071import org.nuxeo.ecm.core.query.QueryFilter; 072import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 073import org.nuxeo.ecm.core.storage.sql.ColumnType; 074import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 075import org.nuxeo.ecm.core.storage.sql.Invalidations; 076import org.nuxeo.ecm.core.storage.sql.Mapper; 077import org.nuxeo.ecm.core.storage.sql.Model; 078import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 079import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 080import org.nuxeo.ecm.core.storage.sql.Row; 081import org.nuxeo.ecm.core.storage.sql.RowId; 082import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 083import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 086import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 089import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector; 090import org.nuxeo.runtime.api.Framework; 091 092/** 093 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 094 * computes statements. 095 * <p> 096 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 097 * statements recorded in the {@link SQLInfo}. 098 */ 099public class JDBCMapper extends JDBCRowMapper implements Mapper { 100 101 private static final Log log = LogFactory.getLog(JDBCMapper.class); 102 103 public static Map<String, Serializable> testProps = new HashMap<>(); 104 105 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 106 107 public static final String TEST_UPGRADE = "testUpgrade"; 108 109 // property in sql.txt file 110 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 111 112 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 113 114 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 115 116 public static final String TEST_UPGRADE_SYS_CHANGE_TOKEN = "testUpgradeSysChangeToken"; 117 118 protected TableUpgrader tableUpgrader; 119 120 private final QueryMakerService queryMakerService; 121 122 private final PathResolver pathResolver; 123 124 private final RepositoryImpl repository; 125 126 protected boolean clusteringEnabled; 127 128 protected static final String NOSCROLL_ID = "noscroll"; 129 130 /** 131 * Creates a new Mapper. 132 * 133 * @param model the model 134 * @param pathResolver the path resolver (used for startswith queries) 135 * @param sqlInfo the sql info 136 * @param clusterInvalidator the cluster invalidator 137 * @param repository the repository 138 */ 139 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator, 140 RepositoryImpl repository) { 141 super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator()); 142 this.pathResolver = pathResolver; 143 this.repository = repository; 144 clusteringEnabled = clusterInvalidator != null; 145 queryMakerService = Framework.getService(QueryMakerService.class); 146 147 tableUpgrader = new TableUpgrader(this); 148 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 149 TEST_UPGRADE_VERSIONS); 150 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 151 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 152 tableUpgrader.add(Model.HIER_TABLE_NAME, Model.MAIN_SYS_CHANGE_TOKEN_KEY, "upgradeSysChangeToken", 153 TEST_UPGRADE_SYS_CHANGE_TOKEN); 154 } 155 156 @Override 157 public int getTableSize(String tableName) { 158 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 159 } 160 161 /* 162 * ----- Root ----- 163 */ 164 165 @Override 166 public void createDatabase(String ddlMode) { 167 // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it 168 try { 169 if (!connection.getAutoCommit()) { 170 throw new NuxeoException("connection should not run in transactional mode for DDL operations"); 171 } 172 createTables(ddlMode); 173 } catch (SQLException e) { 174 throw new NuxeoException(e); 175 } 176 } 177 178 protected String getTableName(String origName) { 179 180 if (dialect instanceof DialectOracle) { 181 if (origName.length() > 30) { 182 183 StringBuilder sb = new StringBuilder(origName.length()); 184 185 try { 186 MessageDigest digest = MessageDigest.getInstance("MD5"); 187 sb.append(origName.substring(0, 15)); 188 sb.append('_'); 189 190 digest.update(origName.getBytes()); 191 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 192 193 return sb.toString(); 194 195 } catch (NoSuchAlgorithmException e) { 196 throw new RuntimeException("Error", e); 197 } 198 } 199 } 200 201 return origName; 202 } 203 204 protected void createTables(String ddlMode) throws SQLException { 205 ListCollector ddlCollector = new ListCollector(); 206 207 sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category 208 sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector); 209 sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector); 210 if (testProps.containsKey(TEST_UPGRADE)) { 211 // create "old" tables 212 sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect 213 } 214 215 String schemaName = dialect.getConnectionSchema(connection); 216 DatabaseMetaData metadata = connection.getMetaData(); 217 Set<String> tableNames = findTableNames(metadata, schemaName); 218 Database database = sqlInfo.getDatabase(); 219 Map<String, List<Column>> added = new HashMap<>(); 220 221 for (Table table : database.getTables()) { 222 String tableName = getTableName(table.getPhysicalName()); 223 if (!tableNames.contains(tableName.toUpperCase())) { 224 225 /* 226 * Create missing table. 227 */ 228 229 ddlCollector.add(table.getCreateSql()); 230 ddlCollector.addAll(table.getPostCreateSqls(model)); 231 232 added.put(table.getKey(), null); // null = table created 233 234 sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE); 235 236 } else { 237 238 /* 239 * Get existing columns. 240 */ 241 242 Map<String, Integer> columnTypes = new HashMap<>(); 243 Map<String, String> columnTypeNames = new HashMap<>(); 244 Map<String, Integer> columnTypeSizes = new HashMap<>(); 245 try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) { 246 while (rs.next()) { 247 String schema = rs.getString("TABLE_SCHEM"); 248 if (schema != null) { // null for MySQL, doh! 249 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 250 // H2 returns some system tables (locks) 251 continue; 252 } 253 } 254 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 255 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 256 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 257 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 258 } 259 } 260 261 /* 262 * Update types and create missing columns. 263 */ 264 265 List<Column> addedColumns = new LinkedList<>(); 266 for (Column column : table.getColumns()) { 267 String upperName = column.getPhysicalName().toUpperCase(); 268 Integer type = columnTypes.remove(upperName); 269 if (type == null) { 270 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 271 ddlCollector.add(table.getAddColumnSql(column)); 272 ddlCollector.addAll(table.getPostAddSqls(column, model)); 273 addedColumns.add(column); 274 } else { 275 int expected = column.getJdbcType(); 276 int actual = type.intValue(); 277 String actualName = columnTypeNames.get(upperName); 278 Integer actualSize = columnTypeSizes.get(upperName); 279 if (!column.setJdbcType(actual, actualName, actualSize.intValue())) { 280 log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)", 281 column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, 282 actualSize)); 283 } 284 } 285 } 286 for (String col : dialect.getIgnoredColumns(table)) { 287 columnTypes.remove(col.toUpperCase()); 288 } 289 if (!columnTypes.isEmpty()) { 290 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 291 + String.join(", ", columnTypes.keySet())); 292 } 293 if (!addedColumns.isEmpty()) { 294 if (added.containsKey(table.getKey())) { 295 throw new AssertionError(); 296 } 297 added.put(table.getKey(), addedColumns); 298 } 299 } 300 } 301 302 if (testProps.containsKey(TEST_UPGRADE)) { 303 // create "old" content in tables 304 sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector); 305 } 306 307 // run upgrade for each table if added columns or test 308 for (Entry<String, List<Column>> en : added.entrySet()) { 309 List<Column> addedColumns = en.getValue(); 310 String tableKey = en.getKey(); 311 upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector); 312 } 313 314 sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector); 315 sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector); 316 317 // aclr_permission check for PostgreSQL 318 dialect.performAdditionalStatements(connection); 319 320 /* 321 * Execute all the collected DDL, or dump it if requested, depending on ddlMode 322 */ 323 324 // ddlMode may be: 325 // ignore (not treated here, nothing done) 326 // dump (implies execute) 327 // dump,execute 328 // dump,ignore (no execute) 329 // execute 330 // abort (implies dump) 331 // compat can be used instead of execute to always recreate stored procedures 332 333 List<String> ddl = ddlCollector.getStrings(); 334 boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE); 335 boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP); 336 boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT); 337 if (dump || abort) { 338 339 /* 340 * Dump DDL if not empty. 341 */ 342 343 if (!ddl.isEmpty()) { 344 File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql"); 345 try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) { 346 for (String sql : dialect.getDumpStart()) { 347 ps.println(sql); 348 } 349 for (String sql : ddl) { 350 sql = sql.trim(); 351 if (sql.endsWith(";")) { 352 sql = sql.substring(0, sql.length() - 1); 353 } 354 ps.println(dialect.getSQLForDump(sql)); 355 } 356 for (String sql : dialect.getDumpStop()) { 357 ps.println(sql); 358 } 359 } catch (IOException e) { 360 throw new NuxeoException(e); 361 } 362 363 /* 364 * Abort if requested. 365 */ 366 367 if (abort) { 368 log.error("Dumped DDL to: " + dumpFile); 369 throw new NuxeoException("Database initialization failed for: " + repository.getName() 370 + ", DDL must be executed: " + dumpFile); 371 } 372 } 373 } 374 if (!ignore) { 375 376 /* 377 * Execute DDL. 378 */ 379 380 try (Statement st = connection.createStatement()) { 381 for (String sql : ddl) { 382 logger.log(sql.replace("\n", "\n ")); // indented 383 try { 384 st.execute(sql); 385 } catch (SQLException e) { 386 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 387 } 388 countExecute(); 389 } 390 } 391 392 /* 393 * Execute post-DDL stuff. 394 */ 395 396 try (Statement st = connection.createStatement()) { 397 for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) { 398 logger.log(sql.replace("\n", "\n ")); // indented 399 try { 400 st.execute(sql); 401 } catch (SQLException e) { 402 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 403 } 404 countExecute(); 405 } 406 } 407 } 408 } 409 410 protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector) 411 throws SQLException { 412 tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector); 413 } 414 415 /** Finds uppercase table names. */ 416 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 417 Set<String> tableNames = new HashSet<>(); 418 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 419 while (rs.next()) { 420 String tableName = rs.getString("TABLE_NAME"); 421 tableNames.add(tableName.toUpperCase()); 422 } 423 rs.close(); 424 return tableNames; 425 } 426 427 @Override 428 public int getClusterNodeIdType() { 429 return sqlInfo.getClusterNodeIdType(); 430 } 431 432 @Override 433 public void createClusterNode(Serializable nodeId) { 434 Calendar now = Calendar.getInstance(); 435 String sql = sqlInfo.getCreateClusterNodeSql(); 436 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 437 try (PreparedStatement ps = connection.prepareStatement(sql)) { 438 if (logger.isLogEnabled()) { 439 logger.logSQL(sql, Arrays.asList(nodeId, now)); 440 } 441 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 442 columns.get(1).setToPreparedStatement(ps, 2, now); 443 ps.execute(); 444 445 } catch (SQLException e) { 446 try { 447 checkConcurrentUpdate(e); 448 } catch (ConcurrentUpdateException cue) { 449 cue.addInfo( 450 "Duplicate cluster node with id: " + nodeId 451 + " (a crashed node must be cleaned up, or the repository.clustering.id configuration fixed)"); 452 throw cue; 453 } 454 throw new NuxeoException(e); 455 } 456 } 457 458 @Override 459 public void removeClusterNode(Serializable nodeId) { 460 // delete from cluster_nodes 461 String sql = sqlInfo.getDeleteClusterNodeSql(); 462 Column column = sqlInfo.getDeleteClusterNodeColumn(); 463 try (PreparedStatement ps = connection.prepareStatement(sql)) { 464 if (logger.isLogEnabled()) { 465 logger.logSQL(sql, Collections.singletonList(nodeId)); 466 } 467 column.setToPreparedStatement(ps, 1, nodeId); 468 ps.execute(); 469 // delete un-processed invals from cluster_invals 470 deleteClusterInvals(nodeId); 471 } catch (SQLException e) { 472 throw new NuxeoException(e); 473 } 474 } 475 476 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 477 String sql = sqlInfo.getDeleteClusterInvalsSql(); 478 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 479 try (PreparedStatement ps = connection.prepareStatement(sql)) { 480 if (logger.isLogEnabled()) { 481 logger.logSQL(sql, Collections.singletonList(nodeId)); 482 } 483 column.setToPreparedStatement(ps, 1, nodeId); 484 int n = ps.executeUpdate(); 485 countExecute(); 486 if (logger.isLogEnabled()) { 487 logger.logCount(n); 488 } 489 } 490 } 491 492 @Override 493 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 494 String sql = dialect.getClusterInsertInvalidations(); 495 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 496 try (PreparedStatement ps = connection.prepareStatement(sql)) { 497 int kind = Invalidations.MODIFIED; 498 while (true) { 499 Set<RowId> rowIds = invalidations.getKindSet(kind); 500 501 // reorganize by id 502 Map<Serializable, Set<String>> res = new HashMap<>(); 503 for (RowId rowId : rowIds) { 504 Set<String> tableNames = res.get(rowId.id); 505 if (tableNames == null) { 506 res.put(rowId.id, tableNames = new HashSet<>()); 507 } 508 tableNames.add(rowId.tableName); 509 } 510 511 // do inserts 512 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 513 Serializable id = en.getKey(); 514 String fragments = join(en.getValue(), ' '); 515 if (logger.isLogEnabled()) { 516 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 517 } 518 Serializable frags; 519 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 520 frags = fragments.split(" "); 521 } else { 522 frags = fragments; 523 } 524 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 525 columns.get(1).setToPreparedStatement(ps, 2, id); 526 columns.get(2).setToPreparedStatement(ps, 3, frags); 527 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 528 ps.execute(); 529 countExecute(); 530 } 531 if (kind == Invalidations.MODIFIED) { 532 kind = Invalidations.DELETED; 533 } else { 534 break; 535 } 536 } 537 } catch (SQLException e) { 538 throw new NuxeoException("Could not invalidate", e); 539 } 540 } 541 542 // join that works on a set 543 protected static String join(Collection<String> strings, char sep) { 544 if (strings.isEmpty()) { 545 throw new RuntimeException(); 546 } 547 if (strings.size() == 1) { 548 return strings.iterator().next(); 549 } 550 int size = 0; 551 for (String word : strings) { 552 size += word.length() + 1; 553 } 554 StringBuilder buf = new StringBuilder(size); 555 for (String word : strings) { 556 buf.append(word); 557 buf.append(sep); 558 } 559 buf.setLength(size - 1); 560 return buf.toString(); 561 } 562 563 @Override 564 public Invalidations getClusterInvalidations(Serializable nodeId) { 565 Invalidations invalidations = new Invalidations(); 566 String sql = dialect.getClusterGetInvalidations(); 567 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 568 if (logger.isLogEnabled()) { 569 logger.logSQL(sql, Collections.singletonList(nodeId)); 570 } 571 try (PreparedStatement ps = connection.prepareStatement(sql)) { 572 setToPreparedStatement(ps, 1, nodeId); 573 try (ResultSet rs = ps.executeQuery()) { 574 countExecute(); 575 while (rs.next()) { 576 // first column ignored, it's the node id 577 Serializable id = columns.get(1).getFromResultSet(rs, 1); 578 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 579 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 580 String[] fragments; 581 if (dialect.supportsArrays() && frags instanceof String[]) { 582 fragments = (String[]) frags; 583 } else { 584 fragments = ((String) frags).split(" "); 585 } 586 invalidations.add(id, fragments, kind); 587 } 588 } 589 if (logger.isLogEnabled()) { 590 // logCount(n); 591 logger.log(" -> " + invalidations); 592 } 593 if (dialect.isClusteringDeleteNeeded()) { 594 deleteClusterInvals(nodeId); 595 } 596 return invalidations; 597 } catch (SQLException e) { 598 throw new NuxeoException("Could not invalidate", e); 599 } 600 } 601 602 @Override 603 public Serializable getRootId(String repositoryId) { 604 String sql = sqlInfo.getSelectRootIdSql(); 605 if (logger.isLogEnabled()) { 606 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 607 } 608 try (PreparedStatement ps = connection.prepareStatement(sql)) { 609 ps.setString(1, repositoryId); 610 try (ResultSet rs = ps.executeQuery()) { 611 countExecute(); 612 if (!rs.next()) { 613 if (logger.isLogEnabled()) { 614 logger.log(" -> (none)"); 615 } 616 return null; 617 } 618 Column column = sqlInfo.getSelectRootIdWhatColumn(); 619 Serializable id = column.getFromResultSet(rs, 1); 620 if (logger.isLogEnabled()) { 621 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 622 } 623 // check that we didn't get several rows 624 if (rs.next()) { 625 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 626 } 627 return id; 628 } 629 } catch (SQLException e) { 630 throw new NuxeoException("Could not select: " + sql, e); 631 } 632 } 633 634 @Override 635 public void setRootId(Serializable repositoryId, Serializable id) { 636 String sql = sqlInfo.getInsertRootIdSql(); 637 try (PreparedStatement ps = connection.prepareStatement(sql)) { 638 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 639 List<Serializable> debugValues = null; 640 if (logger.isLogEnabled()) { 641 debugValues = new ArrayList<>(2); 642 } 643 int i = 0; 644 for (Column column : columns) { 645 i++; 646 String key = column.getKey(); 647 Serializable v; 648 if (key.equals(Model.MAIN_KEY)) { 649 v = id; 650 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 651 v = repositoryId; 652 } else { 653 throw new RuntimeException(key); 654 } 655 column.setToPreparedStatement(ps, i, v); 656 if (debugValues != null) { 657 debugValues.add(v); 658 } 659 } 660 if (debugValues != null) { 661 logger.logSQL(sql, debugValues); 662 debugValues.clear(); 663 } 664 ps.execute(); 665 countExecute(); 666 } catch (SQLException e) { 667 throw new NuxeoException("Could not insert: " + sql, e); 668 } 669 } 670 671 protected QueryMaker findQueryMaker(String queryType) { 672 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 673 QueryMaker queryMaker; 674 try { 675 queryMaker = klass.newInstance(); 676 } catch (ReflectiveOperationException e) { 677 throw new NuxeoException(e); 678 } 679 if (queryMaker.accepts(queryType)) { 680 return queryMaker; 681 } 682 } 683 return null; 684 } 685 686 protected void prepareUserReadAcls(QueryFilter queryFilter) { 687 String sql = dialect.getPrepareUserReadAclsSql(); 688 Serializable principals = queryFilter.getPrincipals(); 689 if (sql == null || principals == null) { 690 return; 691 } 692 if (!dialect.supportsArrays()) { 693 principals = String.join(Dialect.ARRAY_SEP, (String[]) principals); 694 } 695 try (PreparedStatement ps = connection.prepareStatement(sql)) { 696 if (logger.isLogEnabled()) { 697 logger.logSQL(sql, Collections.singleton(principals)); 698 } 699 setToPreparedStatement(ps, 1, principals); 700 ps.execute(); 701 countExecute(); 702 } catch (SQLException e) { 703 throw new NuxeoException("Failed to prepare user read acl cache", e); 704 } 705 } 706 707 @Override 708 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 709 boolean countTotal) { 710 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 711 } 712 713 @Override 714 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 715 PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo, 716 (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1)); 717 718 if (logger.isLogEnabled()) { 719 logger.logIds(result, countUpTo != 0, result.totalSize()); 720 } 721 722 return result; 723 } 724 725 // queryFilter used for principals and permissions 726 @Override 727 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 728 boolean distinctDocuments, Object... params) { 729 if (dialect.needsPrepareUserReadAcls()) { 730 prepareUserReadAcls(queryFilter); 731 } 732 QueryMaker queryMaker = findQueryMaker(queryType); 733 if (queryMaker == null) { 734 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 735 } 736 query = computeDistinctDocuments(query, distinctDocuments); 737 try { 738 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 739 } catch (SQLException e) { 740 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 741 } 742 } 743 744 @Override 745 public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType, 746 QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) { 747 query = computeDistinctDocuments(query, distinctDocuments); 748 PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo, 749 (info, rs) -> info.mapMaker.makeMap(rs), params); 750 751 if (logger.isLogEnabled()) { 752 logger.logMaps(result, countUpTo != 0, result.totalSize()); 753 } 754 755 return result; 756 } 757 758 protected String computeDistinctDocuments(String query, boolean distinctDocuments) { 759 if (distinctDocuments) { 760 String q = query.toLowerCase(); 761 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 762 // Replace "select" by "select distinct", split at "select ".length() index 763 query = "SELECT DISTINCT " + query.substring(7); 764 } 765 } 766 return query; 767 } 768 769 protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter, 770 long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) { 771 if (dialect.needsPrepareUserReadAcls()) { 772 prepareUserReadAcls(queryFilter); 773 } 774 QueryMaker queryMaker = findQueryMaker(queryType); 775 if (queryMaker == null) { 776 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 777 } 778 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params); 779 780 if (q == null) { 781 logger.log("Query cannot return anything due to conflicting clauses"); 782 return new PartialList<>(Collections.emptyList(), 0); 783 } 784 long limit = queryFilter.getLimit(); 785 long offset = queryFilter.getOffset(); 786 787 if (logger.isLogEnabled()) { 788 String sql = q.selectInfo.sql; 789 if (limit != 0) { 790 sql += " -- LIMIT " + limit + " OFFSET " + offset; 791 } 792 if (countUpTo != 0) { 793 sql += " -- COUNT TOTAL UP TO " + countUpTo; 794 } 795 logger.logSQL(sql, q.selectParams); 796 } 797 798 String sql = q.selectInfo.sql; 799 800 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 801 // full result set not needed for counting 802 sql = dialect.addPagingClause(sql, limit, offset); 803 limit = 0; 804 offset = 0; 805 } else if (countUpTo > 0 && dialect.supportsPaging()) { 806 // ask one more row 807 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 808 } 809 810 try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, 811 ResultSet.CONCUR_READ_ONLY)) { 812 int i = 1; 813 for (Serializable object : q.selectParams) { 814 setToPreparedStatement(ps, i++, object); 815 } 816 try (ResultSet rs = ps.executeQuery()) { 817 countExecute(); 818 819 // limit/offset 820 long totalSize = -1; 821 boolean available; 822 if ((limit == 0) || (offset == 0)) { 823 available = rs.first(); 824 if (!available) { 825 totalSize = 0; 826 } 827 if (limit == 0) { 828 limit = -1; // infinite 829 } 830 } else { 831 available = rs.absolute((int) offset + 1); 832 } 833 834 List<T> projections = new LinkedList<>(); 835 int rowNum = 0; 836 while (available && (limit != 0)) { 837 try { 838 T projection = extractor.apply(q.selectInfo, rs); 839 projections.add(projection); 840 rowNum = rs.getRow(); 841 available = rs.next(); 842 limit--; 843 } catch (SQLDataException e) { 844 // actually no data available, MariaDB Connector/J lied, stop now 845 available = false; 846 } 847 } 848 849 // total size 850 if (countUpTo != 0 && (totalSize == -1)) { 851 if (!available && (rowNum != 0)) { 852 // last row read was the actual last 853 totalSize = rowNum; 854 } else { 855 // available if limit reached with some left 856 // rowNum == 0 if skipped too far 857 rs.last(); 858 totalSize = rs.getRow(); 859 } 860 if (countUpTo > 0 && totalSize > countUpTo) { 861 // the result where truncated we don't know the total size 862 totalSize = -2; 863 } 864 } 865 866 return new PartialList<>(projections, totalSize); 867 } 868 } catch (SQLException e) { 869 throw new NuxeoException("Invalid query: " + query, e); 870 } 871 } 872 873 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 874 if (object instanceof Calendar) { 875 dialect.setToPreparedStatementTimestamp(ps, i, object, null); 876 } else if (object instanceof java.sql.Date) { 877 ps.setDate(i, (java.sql.Date) object); 878 } else if (object instanceof Long) { 879 ps.setLong(i, ((Long) object).longValue()); 880 } else if (object instanceof WrappedId) { 881 dialect.setId(ps, i, object.toString()); 882 } else if (object instanceof Object[]) { 883 int jdbcType; 884 if (object instanceof String[]) { 885 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 886 } else if (object instanceof Boolean[]) { 887 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 888 } else if (object instanceof Long[]) { 889 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 890 } else if (object instanceof Double[]) { 891 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 892 } else if (object instanceof java.sql.Date[]) { 893 jdbcType = Types.DATE; 894 } else if (object instanceof java.sql.Clob[]) { 895 jdbcType = Types.CLOB; 896 } else if (object instanceof Calendar[]) { 897 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 898 object = dialect.getTimestampFromCalendar((Calendar[]) object); 899 } else if (object instanceof Integer[]) { 900 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 901 } else { 902 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 903 } 904 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 905 ps.setArray(i, array); 906 } else { 907 ps.setObject(i, object); 908 } 909 return i; 910 } 911 912 @Override 913 public ScrollResult<String> scroll(String query, int batchSize, int keepAliveSeconds) { 914 if (!dialect.supportsScroll()) { 915 return defaultScroll(query); 916 } 917 checkForTimedoutScroll(); 918 return scrollSearch(query, batchSize, keepAliveSeconds); 919 } 920 921 protected void checkForTimedoutScroll() { 922 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 923 } 924 925 protected ScrollResult<String> scrollSearch(String query, int batchSize, int keepAliveSeconds) { 926 QueryMaker queryMaker = findQueryMaker("NXQL"); 927 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 928 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 929 if (q == null) { 930 logger.log("Query cannot return anything due to conflicting clauses"); 931 throw new NuxeoException("Query cannot return anything due to conflicting clauses"); 932 } 933 if (logger.isLogEnabled()) { 934 logger.logSQL(q.selectInfo.sql, q.selectParams); 935 } 936 try { 937 if (connection.getAutoCommit()) { 938 throw new NuxeoException("Scroll should be done inside a transaction"); 939 } 940 // ps MUST NOT be auto-closed because it's referenced by a cursor 941 PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY, 942 ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); 943 ps.setFetchSize(batchSize); 944 int i = 1; 945 for (Serializable object : q.selectParams) { 946 setToPreparedStatement(ps, i++, object); 947 } 948 // rs MUST NOT be auto-closed because it's referenced by a cursor 949 ResultSet rs = ps.executeQuery(); 950 String scrollId = UUID.randomUUID().toString(); 951 registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds); 952 return scroll(scrollId); 953 } catch (SQLException e) { 954 throw new NuxeoException("Error on query", e); 955 } 956 } 957 958 protected class CursorResult { 959 protected final int keepAliveSeconds; 960 961 protected final PreparedStatement preparedStatement; 962 963 protected final ResultSet resultSet; 964 965 protected final int batchSize; 966 967 protected long lastCallTimestamp; 968 969 CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) { 970 this.preparedStatement = preparedStatement; 971 this.resultSet = resultSet; 972 this.batchSize = batchSize; 973 this.keepAliveSeconds = keepAliveSeconds; 974 lastCallTimestamp = System.currentTimeMillis(); 975 } 976 977 boolean timedOut(String scrollId) { 978 long now = System.currentTimeMillis(); 979 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 980 if (unregisterCursor(scrollId)) { 981 log.warn("Scroll " + scrollId + " timed out"); 982 } 983 return true; 984 } 985 return false; 986 } 987 988 void touch() { 989 lastCallTimestamp = System.currentTimeMillis(); 990 } 991 992 synchronized void close() throws SQLException { 993 if (resultSet != null) { 994 resultSet.close(); 995 } 996 if (preparedStatement != null) { 997 preparedStatement.close(); 998 } 999 } 1000 } 1001 1002 protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize, 1003 int keepAliveSeconds) { 1004 cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds)); 1005 } 1006 1007 protected boolean unregisterCursor(String scrollId) { 1008 CursorResult cursor = cursorResults.remove(scrollId); 1009 if (cursor != null) { 1010 try { 1011 cursor.close(); 1012 return true; 1013 } catch (SQLException e) { 1014 log.error("Failed to close cursor for scroll: " + scrollId, e); 1015 // do not propagate exception on cleaning 1016 } 1017 } 1018 return false; 1019 } 1020 1021 protected ScrollResult<String> defaultScroll(String query) { 1022 // the database has no proper support for cursor just return everything in one batch 1023 QueryMaker queryMaker = findQueryMaker("NXQL"); 1024 List<String> ids; 1025 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 1026 try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) { 1027 ids = new ArrayList<>((int) ret.size()); 1028 for (Map<String, Serializable> map : ret) { 1029 ids.add(map.get("ecm:uuid").toString()); 1030 } 1031 } catch (SQLException e) { 1032 throw new NuxeoException("Invalid scroll query: " + query, e); 1033 } 1034 return new ScrollResultImpl<>(NOSCROLL_ID, ids); 1035 } 1036 1037 @Override 1038 public ScrollResult<String> scroll(String scrollId) { 1039 if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) { 1040 // there is only one batch in this case 1041 return emptyResult(); 1042 } 1043 CursorResult cursorResult = cursorResults.get(scrollId); 1044 if (cursorResult == null) { 1045 throw new NuxeoException("Unknown or timed out scrollId"); 1046 } else if (cursorResult.timedOut(scrollId)) { 1047 throw new NuxeoException("Timed out scrollId"); 1048 } 1049 cursorResult.touch(); 1050 List<String> ids = new ArrayList<>(cursorResult.batchSize); 1051 synchronized (cursorResult) { 1052 try { 1053 if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) { 1054 unregisterCursor(scrollId); 1055 return emptyResult(); 1056 } 1057 while (ids.size() < cursorResult.batchSize) { 1058 if (cursorResult.resultSet.next()) { 1059 ids.add(cursorResult.resultSet.getString(1)); 1060 } else { 1061 cursorResult.close(); 1062 if (ids.isEmpty()) { 1063 unregisterCursor(scrollId); 1064 } 1065 break; 1066 } 1067 } 1068 } catch (SQLException e) { 1069 throw new NuxeoException("Error during scroll", e); 1070 } 1071 } 1072 return new ScrollResultImpl<>(scrollId, ids); 1073 } 1074 1075 @Override 1076 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 1077 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 1078 if (select == null) { 1079 return getAncestorsIdsIterative(ids); 1080 } 1081 Serializable whereIds = newIdArray(ids); 1082 Set<Serializable> res = new HashSet<>(); 1083 if (logger.isLogEnabled()) { 1084 logger.logSQL(select.sql, Collections.singleton(whereIds)); 1085 } 1086 Column what = select.whatColumns.get(0); 1087 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1088 setToPreparedStatementIdArray(ps, 1, whereIds); 1089 try (ResultSet rs = ps.executeQuery()) { 1090 countExecute(); 1091 List<Serializable> debugIds = null; 1092 if (logger.isLogEnabled()) { 1093 debugIds = new LinkedList<>(); 1094 } 1095 while (rs.next()) { 1096 if (dialect.supportsArraysReturnInsteadOfRows()) { 1097 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 1098 for (Serializable id : resultIds) { 1099 if (id != null) { 1100 res.add(id); 1101 if (logger.isLogEnabled()) { 1102 debugIds.add(id); 1103 } 1104 } 1105 } 1106 } else { 1107 Serializable id = what.getFromResultSet(rs, 1); 1108 if (id != null) { 1109 res.add(id); 1110 if (logger.isLogEnabled()) { 1111 debugIds.add(id); 1112 } 1113 } 1114 } 1115 } 1116 if (logger.isLogEnabled()) { 1117 logger.logIds(debugIds, false, 0); 1118 } 1119 } 1120 return res; 1121 } catch (SQLException e) { 1122 throw new NuxeoException("Failed to get ancestors ids", e); 1123 } 1124 } 1125 1126 /** 1127 * Uses iterative parentid selection. 1128 */ 1129 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 1130 try { 1131 LinkedList<Serializable> todo = new LinkedList<>(ids); 1132 Set<Serializable> done = new HashSet<>(); 1133 Set<Serializable> res = new HashSet<>(); 1134 while (!todo.isEmpty()) { 1135 done.addAll(todo); 1136 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 1137 if (logger.isLogEnabled()) { 1138 logger.logSQL(select.sql, todo); 1139 } 1140 Column what = select.whatColumns.get(0); 1141 Column where = select.whereColumns.get(0); 1142 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1143 int i = 1; 1144 for (Serializable id : todo) { 1145 where.setToPreparedStatement(ps, i++, id); 1146 } 1147 try (ResultSet rs = ps.executeQuery()) { 1148 countExecute(); 1149 todo = new LinkedList<>(); 1150 List<Serializable> debugIds = null; 1151 if (logger.isLogEnabled()) { 1152 debugIds = new LinkedList<>(); 1153 } 1154 while (rs.next()) { 1155 Serializable id = what.getFromResultSet(rs, 1); 1156 if (id != null) { 1157 res.add(id); 1158 if (!done.contains(id)) { 1159 todo.add(id); 1160 } 1161 if (logger.isLogEnabled()) { 1162 debugIds.add(id); 1163 } 1164 } 1165 } 1166 if (logger.isLogEnabled()) { 1167 logger.logIds(debugIds, false, 0); 1168 } 1169 } 1170 } 1171 } 1172 return res; 1173 } catch (SQLException e) { 1174 throw new NuxeoException("Failed to get ancestors ids", e); 1175 } 1176 } 1177 1178 @Override 1179 public void updateReadAcls() { 1180 if (!dialect.supportsReadAcl()) { 1181 return; 1182 } 1183 if (log.isDebugEnabled()) { 1184 log.debug("updateReadAcls: updating"); 1185 } 1186 try (Statement st = connection.createStatement()) { 1187 String sql = dialect.getUpdateReadAclsSql(); 1188 if (logger.isLogEnabled()) { 1189 logger.log(sql); 1190 } 1191 st.execute(sql); 1192 countExecute(); 1193 } catch (SQLException e) { 1194 checkConcurrentUpdate(e); 1195 throw new NuxeoException("Failed to update read acls", e); 1196 } 1197 if (log.isDebugEnabled()) { 1198 log.debug("updateReadAcls: done."); 1199 } 1200 } 1201 1202 @Override 1203 public void rebuildReadAcls() { 1204 if (!dialect.supportsReadAcl()) { 1205 return; 1206 } 1207 log.debug("rebuildReadAcls: rebuilding ..."); 1208 try (Statement st = connection.createStatement()) { 1209 String sql = dialect.getRebuildReadAclsSql(); 1210 logger.log(sql); 1211 st.execute(sql); 1212 countExecute(); 1213 } catch (SQLException e) { 1214 throw new NuxeoException("Failed to rebuild read acls", e); 1215 } 1216 log.debug("rebuildReadAcls: done."); 1217 } 1218 1219 /* 1220 * ----- Locking ----- 1221 */ 1222 1223 @Override 1224 public Lock getLock(Serializable id) { 1225 if (log.isDebugEnabled()) { 1226 try { 1227 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1228 } catch (SQLException e) { 1229 throw new RuntimeException(e); 1230 } 1231 } 1232 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1233 Row row = readSimpleRow(rowId); 1234 return row == null ? null 1235 : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1236 } 1237 1238 @Override 1239 public Lock setLock(final Serializable id, final Lock lock) { 1240 if (log.isDebugEnabled()) { 1241 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1242 } 1243 Lock oldLock = getLock(id); 1244 if (oldLock == null) { 1245 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1246 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1247 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1248 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1249 } 1250 return oldLock; 1251 } 1252 1253 @Override 1254 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1255 if (log.isDebugEnabled()) { 1256 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1257 } 1258 Lock oldLock = force ? null : getLock(id); 1259 if (!force && owner != null) { 1260 if (oldLock == null) { 1261 // not locked, nothing to do 1262 return null; 1263 } 1264 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1265 // existing mismatched lock, flag failure 1266 return new Lock(oldLock, true); 1267 } 1268 } 1269 if (force || oldLock != null) { 1270 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1271 } 1272 return oldLock; 1273 } 1274 1275 @Override 1276 public void markReferencedBinaries() { 1277 log.debug("Starting binaries GC mark"); 1278 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 1279 String repositoryName = getRepositoryName(); 1280 try (Statement st = connection.createStatement()) { 1281 int i = -1; 1282 for (String sql : sqlInfo.getBinariesSql) { 1283 i++; 1284 Column col = sqlInfo.getBinariesColumns.get(i); 1285 if (logger.isLogEnabled()) { 1286 logger.log(sql); 1287 } 1288 try (ResultSet rs = st.executeQuery(sql)) { 1289 countExecute(); 1290 int n = 0; 1291 while (rs.next()) { 1292 n++; 1293 String key = (String) col.getFromResultSet(rs, 1); 1294 if (key != null) { 1295 blobManager.markReferencedBinary(key, repositoryName); 1296 } 1297 } 1298 if (logger.isLogEnabled()) { 1299 logger.logCount(n); 1300 } 1301 } 1302 } 1303 } catch (SQLException e) { 1304 throw new RuntimeException("Failed to mark binaries for gC", e); 1305 } 1306 log.debug("End of binaries GC mark"); 1307 } 1308 1309 /* 1310 * ----- XAResource ----- 1311 */ 1312 1313 protected static String systemToString(Object o) { 1314 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1315 } 1316 1317 @Override 1318 public void start(Xid xid, int flags) throws XAException { 1319 try { 1320 xaresource.start(xid, flags); 1321 if (logger.isLogEnabled()) { 1322 logger.log("XA start on " + systemToString(xid)); 1323 } 1324 } catch (NuxeoException e) { 1325 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1326 } catch (XAException e) { 1327 logger.error("XA start error on " + systemToString(xid), e); 1328 throw e; 1329 } 1330 } 1331 1332 @Override 1333 public void end(Xid xid, int flags) throws XAException { 1334 try { 1335 xaresource.end(xid, flags); 1336 if (logger.isLogEnabled()) { 1337 logger.log("XA end on " + systemToString(xid)); 1338 } 1339 } catch (NullPointerException e) { 1340 // H2 when no active transaction 1341 logger.error("XA end error on " + systemToString(xid), e); 1342 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1343 } catch (XAException e) { 1344 if (flags != XAResource.TMFAIL) { 1345 logger.error("XA end error on " + systemToString(xid), e); 1346 } 1347 throw e; 1348 } 1349 } 1350 1351 @Override 1352 public int prepare(Xid xid) throws XAException { 1353 try { 1354 return xaresource.prepare(xid); 1355 } catch (XAException e) { 1356 logger.error("XA prepare error on " + systemToString(xid), e); 1357 throw e; 1358 } 1359 } 1360 1361 @Override 1362 public void commit(Xid xid, boolean onePhase) throws XAException { 1363 try { 1364 xaresource.commit(xid, onePhase); 1365 } catch (XAException e) { 1366 logger.error("XA commit error on " + systemToString(xid), e); 1367 throw e; 1368 } 1369 } 1370 1371 // rollback interacts with caches so is in RowMapper 1372 1373 @Override 1374 public void forget(Xid xid) throws XAException { 1375 xaresource.forget(xid); 1376 } 1377 1378 @Override 1379 public Xid[] recover(int flag) throws XAException { 1380 return xaresource.recover(flag); 1381 } 1382 1383 @Override 1384 public boolean setTransactionTimeout(int seconds) throws XAException { 1385 return xaresource.setTransactionTimeout(seconds); 1386 } 1387 1388 @Override 1389 public int getTransactionTimeout() throws XAException { 1390 return xaresource.getTransactionTimeout(); 1391 } 1392 1393 @Override 1394 public boolean isSameRM(XAResource xares) throws XAException { 1395 throw new UnsupportedOperationException(); 1396 } 1397 1398 @Override 1399 public boolean isConnected() { 1400 return connection != null; 1401 } 1402 1403 @Override 1404 public void connect(boolean noSharing) { 1405 openConnections(noSharing); 1406 } 1407 1408 @Override 1409 public void disconnect() { 1410 closeConnections(); 1411 } 1412 1413 /** 1414 * @since 7.10-HF25, 8.10-HF06, 9.2 1415 */ 1416 @FunctionalInterface 1417 protected interface BiFunctionSQLException<T, U, R> { 1418 1419 /** 1420 * Applies this function to the given arguments. 1421 * 1422 * @param t the first function argument 1423 * @param u the second function argument 1424 * @return the function result 1425 */ 1426 R apply(T t, U u) throws SQLException; 1427 1428 } 1429 1430}