001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 023 024import java.io.File; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.PrintStream; 029import java.io.Serializable; 030import java.security.MessageDigest; 031import java.security.NoSuchAlgorithmException; 032import java.sql.Array; 033import java.sql.DatabaseMetaData; 034import java.sql.PreparedStatement; 035import java.sql.ResultSet; 036import java.sql.SQLDataException; 037import java.sql.SQLException; 038import java.sql.Statement; 039import java.sql.Types; 040import java.util.ArrayList; 041import java.util.Arrays; 042import java.util.Calendar; 043import java.util.Collection; 044import java.util.Collections; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.Map; 050import java.util.Map.Entry; 051import java.util.Set; 052import java.util.UUID; 053import java.util.concurrent.ConcurrentHashMap; 054 055import javax.transaction.xa.XAException; 056import javax.transaction.xa.XAResource; 057import javax.transaction.xa.Xid; 058 059import org.apache.commons.logging.Log; 060import org.apache.commons.logging.LogFactory; 061import org.nuxeo.common.Environment; 062import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 063import org.nuxeo.ecm.core.api.IterableQueryResult; 064import org.nuxeo.ecm.core.api.Lock; 065import org.nuxeo.ecm.core.api.NuxeoException; 066import org.nuxeo.ecm.core.api.PartialList; 067import org.nuxeo.ecm.core.api.ScrollResult; 068import org.nuxeo.ecm.core.api.ScrollResultImpl; 069import org.nuxeo.ecm.core.blob.DocumentBlobManager; 070import org.nuxeo.ecm.core.model.LockManager; 071import org.nuxeo.ecm.core.query.QueryFilter; 072import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 073import org.nuxeo.ecm.core.storage.sql.ColumnType; 074import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 075import org.nuxeo.ecm.core.storage.sql.Invalidations; 076import org.nuxeo.ecm.core.storage.sql.Mapper; 077import org.nuxeo.ecm.core.storage.sql.Model; 078import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 079import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 080import org.nuxeo.ecm.core.storage.sql.Row; 081import org.nuxeo.ecm.core.storage.sql.RowId; 082import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 083import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 086import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 089import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector; 090import org.nuxeo.runtime.api.Framework; 091 092/** 093 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 094 * computes statements. 095 * <p> 096 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 097 * statements recorded in the {@link SQLInfo}. 098 */ 099public class JDBCMapper extends JDBCRowMapper implements Mapper { 100 101 private static final Log log = LogFactory.getLog(JDBCMapper.class); 102 103 public static Map<String, Serializable> testProps = new HashMap<>(); 104 105 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 106 107 public static final String TEST_UPGRADE = "testUpgrade"; 108 109 // property in sql.txt file 110 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 111 112 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 113 114 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 115 116 public static final String TEST_UPGRADE_SYS_CHANGE_TOKEN = "testUpgradeSysChangeToken"; 117 118 protected TableUpgrader tableUpgrader; 119 120 private final QueryMakerService queryMakerService; 121 122 private final PathResolver pathResolver; 123 124 private final RepositoryImpl repository; 125 126 protected boolean clusteringEnabled; 127 128 protected static final String NOSCROLL_ID = "noscroll"; 129 130 /** 131 * Creates a new Mapper. 132 * 133 * @param model the model 134 * @param pathResolver the path resolver (used for startswith queries) 135 * @param sqlInfo the sql info 136 * @param clusterInvalidator the cluster invalidator 137 * @param repository the repository 138 */ 139 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator, 140 RepositoryImpl repository) { 141 super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator()); 142 this.pathResolver = pathResolver; 143 this.repository = repository; 144 clusteringEnabled = clusterInvalidator != null; 145 queryMakerService = Framework.getService(QueryMakerService.class); 146 147 tableUpgrader = new TableUpgrader(this); 148 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 149 TEST_UPGRADE_VERSIONS); 150 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 151 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 152 tableUpgrader.add(Model.HIER_TABLE_NAME, Model.MAIN_SYS_CHANGE_TOKEN_KEY, "upgradeSysChangeToken", 153 TEST_UPGRADE_SYS_CHANGE_TOKEN); 154 } 155 156 @Override 157 public int getTableSize(String tableName) { 158 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 159 } 160 161 /* 162 * ----- Root ----- 163 */ 164 165 @Override 166 public void createDatabase(String ddlMode) { 167 // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it 168 try { 169 if (!connection.getAutoCommit()) { 170 throw new NuxeoException("connection should not run in transactional mode for DDL operations"); 171 } 172 createTables(ddlMode); 173 } catch (SQLException e) { 174 throw new NuxeoException(e); 175 } 176 } 177 178 protected String getTableName(String origName) { 179 180 if (dialect instanceof DialectOracle) { 181 if (origName.length() > 30) { 182 183 StringBuilder sb = new StringBuilder(origName.length()); 184 185 try { 186 MessageDigest digest = MessageDigest.getInstance("MD5"); 187 sb.append(origName.substring(0, 15)); 188 sb.append('_'); 189 190 digest.update(origName.getBytes()); 191 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 192 193 return sb.toString(); 194 195 } catch (NoSuchAlgorithmException e) { 196 throw new RuntimeException("Error", e); 197 } 198 } 199 } 200 201 return origName; 202 } 203 204 protected void createTables(String ddlMode) throws SQLException { 205 ListCollector ddlCollector = new ListCollector(); 206 207 sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category 208 sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector); 209 sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector); 210 if (testProps.containsKey(TEST_UPGRADE)) { 211 // create "old" tables 212 sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect 213 } 214 215 String schemaName = dialect.getConnectionSchema(connection); 216 DatabaseMetaData metadata = connection.getMetaData(); 217 Set<String> tableNames = findTableNames(metadata, schemaName); 218 Database database = sqlInfo.getDatabase(); 219 Map<String, List<Column>> added = new HashMap<>(); 220 221 for (Table table : database.getTables()) { 222 String tableName = getTableName(table.getPhysicalName()); 223 if (!tableNames.contains(tableName.toUpperCase())) { 224 225 /* 226 * Create missing table. 227 */ 228 229 ddlCollector.add(table.getCreateSql()); 230 ddlCollector.addAll(table.getPostCreateSqls(model)); 231 232 added.put(table.getKey(), null); // null = table created 233 234 sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE); 235 236 } else { 237 238 /* 239 * Get existing columns. 240 */ 241 242 Map<String, Integer> columnTypes = new HashMap<>(); 243 Map<String, String> columnTypeNames = new HashMap<>(); 244 Map<String, Integer> columnTypeSizes = new HashMap<>(); 245 try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) { 246 while (rs.next()) { 247 String schema = rs.getString("TABLE_SCHEM"); 248 if (schema != null) { // null for MySQL, doh! 249 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 250 // H2 returns some system tables (locks) 251 continue; 252 } 253 } 254 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 255 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 256 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 257 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 258 } 259 } 260 261 /* 262 * Update types and create missing columns. 263 */ 264 265 List<Column> addedColumns = new LinkedList<>(); 266 for (Column column : table.getColumns()) { 267 String upperName = column.getPhysicalName().toUpperCase(); 268 Integer type = columnTypes.remove(upperName); 269 if (type == null) { 270 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 271 ddlCollector.add(table.getAddColumnSql(column)); 272 ddlCollector.addAll(table.getPostAddSqls(column, model)); 273 addedColumns.add(column); 274 } else { 275 String actualName = columnTypeNames.get(upperName); 276 Integer actualSize = columnTypeSizes.get(upperName); 277 String message = column.checkJdbcType(type, actualName, actualSize); 278 if (message != null) { 279 log.error(message); 280 Framework.getRuntime().getMessageHandler().addError(message); 281 } 282 } 283 } 284 for (String col : dialect.getIgnoredColumns(table)) { 285 columnTypes.remove(col.toUpperCase()); 286 } 287 if (!columnTypes.isEmpty()) { 288 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 289 + String.join(", ", columnTypes.keySet())); 290 } 291 if (!addedColumns.isEmpty()) { 292 if (added.containsKey(table.getKey())) { 293 throw new AssertionError(); 294 } 295 added.put(table.getKey(), addedColumns); 296 } 297 } 298 } 299 300 if (testProps.containsKey(TEST_UPGRADE)) { 301 // create "old" content in tables 302 sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector); 303 } 304 305 // run upgrade for each table if added columns or test 306 for (Entry<String, List<Column>> en : added.entrySet()) { 307 List<Column> addedColumns = en.getValue(); 308 String tableKey = en.getKey(); 309 upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector); 310 } 311 312 sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector); 313 sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector); 314 315 // aclr_permission check for PostgreSQL 316 dialect.performAdditionalStatements(connection); 317 318 /* 319 * Execute all the collected DDL, or dump it if requested, depending on ddlMode 320 */ 321 322 // ddlMode may be: 323 // ignore (not treated here, nothing done) 324 // dump (implies execute) 325 // dump,execute 326 // dump,ignore (no execute) 327 // execute 328 // abort (implies dump) 329 // compat can be used instead of execute to always recreate stored procedures 330 331 List<String> ddl = ddlCollector.getStrings(); 332 boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE); 333 boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP); 334 boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT); 335 if (dump || abort) { 336 337 /* 338 * Dump DDL if not empty. 339 */ 340 341 if (!ddl.isEmpty()) { 342 File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql"); 343 try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) { 344 for (String sql : dialect.getDumpStart()) { 345 ps.println(sql); 346 } 347 for (String sql : ddl) { 348 sql = sql.trim(); 349 if (sql.endsWith(";")) { 350 sql = sql.substring(0, sql.length() - 1); 351 } 352 ps.println(dialect.getSQLForDump(sql)); 353 } 354 for (String sql : dialect.getDumpStop()) { 355 ps.println(sql); 356 } 357 } catch (IOException e) { 358 throw new NuxeoException(e); 359 } 360 361 /* 362 * Abort if requested. 363 */ 364 365 if (abort) { 366 log.error("Dumped DDL to: " + dumpFile); 367 throw new NuxeoException("Database initialization failed for: " + repository.getName() 368 + ", DDL must be executed: " + dumpFile); 369 } 370 } 371 } 372 if (!ignore) { 373 374 /* 375 * Execute DDL. 376 */ 377 378 try (Statement st = connection.createStatement()) { 379 for (String sql : ddl) { 380 logger.log(sql.replace("\n", "\n ")); // indented 381 try { 382 st.execute(sql); 383 } catch (SQLException e) { 384 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 385 } 386 countExecute(); 387 } 388 } 389 390 /* 391 * Execute post-DDL stuff. 392 */ 393 394 try (Statement st = connection.createStatement()) { 395 for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) { 396 logger.log(sql.replace("\n", "\n ")); // indented 397 try { 398 st.execute(sql); 399 } catch (SQLException e) { 400 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 401 } 402 countExecute(); 403 } 404 } 405 } 406 } 407 408 protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector) 409 throws SQLException { 410 tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector); 411 } 412 413 /** Finds uppercase table names. */ 414 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 415 Set<String> tableNames = new HashSet<>(); 416 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 417 while (rs.next()) { 418 String tableName = rs.getString("TABLE_NAME"); 419 tableNames.add(tableName.toUpperCase()); 420 } 421 rs.close(); 422 return tableNames; 423 } 424 425 @Override 426 public int getClusterNodeIdType() { 427 return sqlInfo.getClusterNodeIdType(); 428 } 429 430 @Override 431 public void createClusterNode(Serializable nodeId) { 432 Calendar now = Calendar.getInstance(); 433 String sql = sqlInfo.getCreateClusterNodeSql(); 434 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 435 try (PreparedStatement ps = connection.prepareStatement(sql)) { 436 if (logger.isLogEnabled()) { 437 logger.logSQL(sql, Arrays.asList(nodeId, now)); 438 } 439 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 440 columns.get(1).setToPreparedStatement(ps, 2, now); 441 ps.execute(); 442 443 } catch (SQLException e) { 444 try { 445 checkConcurrentUpdate(e); 446 } catch (ConcurrentUpdateException cue) { 447 cue.addInfo( 448 "Duplicate cluster node with id: " + nodeId 449 + " (a crashed node must be cleaned up, or the repository.clustering.id configuration fixed)"); 450 throw cue; 451 } 452 throw new NuxeoException(e); 453 } 454 } 455 456 @Override 457 public void removeClusterNode(Serializable nodeId) { 458 // delete from cluster_nodes 459 String sql = sqlInfo.getDeleteClusterNodeSql(); 460 Column column = sqlInfo.getDeleteClusterNodeColumn(); 461 try (PreparedStatement ps = connection.prepareStatement(sql)) { 462 if (logger.isLogEnabled()) { 463 logger.logSQL(sql, Collections.singletonList(nodeId)); 464 } 465 column.setToPreparedStatement(ps, 1, nodeId); 466 ps.execute(); 467 // delete un-processed invals from cluster_invals 468 deleteClusterInvals(nodeId); 469 } catch (SQLException e) { 470 throw new NuxeoException(e); 471 } 472 } 473 474 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 475 String sql = sqlInfo.getDeleteClusterInvalsSql(); 476 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 477 try (PreparedStatement ps = connection.prepareStatement(sql)) { 478 if (logger.isLogEnabled()) { 479 logger.logSQL(sql, Collections.singletonList(nodeId)); 480 } 481 column.setToPreparedStatement(ps, 1, nodeId); 482 int n = ps.executeUpdate(); 483 countExecute(); 484 if (logger.isLogEnabled()) { 485 logger.logCount(n); 486 } 487 } 488 } 489 490 @Override 491 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 492 String sql = dialect.getClusterInsertInvalidations(); 493 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 494 try (PreparedStatement ps = connection.prepareStatement(sql)) { 495 int kind = Invalidations.MODIFIED; 496 while (true) { 497 Set<RowId> rowIds = invalidations.getKindSet(kind); 498 499 // reorganize by id 500 Map<Serializable, Set<String>> res = new HashMap<>(); 501 for (RowId rowId : rowIds) { 502 Set<String> tableNames = res.get(rowId.id); 503 if (tableNames == null) { 504 res.put(rowId.id, tableNames = new HashSet<>()); 505 } 506 tableNames.add(rowId.tableName); 507 } 508 509 // do inserts 510 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 511 Serializable id = en.getKey(); 512 String fragments = join(en.getValue(), ' '); 513 if (logger.isLogEnabled()) { 514 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 515 } 516 Serializable frags; 517 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 518 frags = fragments.split(" "); 519 } else { 520 frags = fragments; 521 } 522 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 523 columns.get(1).setToPreparedStatement(ps, 2, id); 524 columns.get(2).setToPreparedStatement(ps, 3, frags); 525 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 526 ps.execute(); 527 countExecute(); 528 } 529 if (kind == Invalidations.MODIFIED) { 530 kind = Invalidations.DELETED; 531 } else { 532 break; 533 } 534 } 535 } catch (SQLException e) { 536 throw new NuxeoException("Could not invalidate", e); 537 } 538 } 539 540 // join that works on a set 541 protected static String join(Collection<String> strings, char sep) { 542 if (strings.isEmpty()) { 543 throw new RuntimeException(); 544 } 545 if (strings.size() == 1) { 546 return strings.iterator().next(); 547 } 548 int size = 0; 549 for (String word : strings) { 550 size += word.length() + 1; 551 } 552 StringBuilder buf = new StringBuilder(size); 553 for (String word : strings) { 554 buf.append(word); 555 buf.append(sep); 556 } 557 buf.setLength(size - 1); 558 return buf.toString(); 559 } 560 561 @Override 562 public Invalidations getClusterInvalidations(Serializable nodeId) { 563 Invalidations invalidations = new Invalidations(); 564 String sql = dialect.getClusterGetInvalidations(); 565 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 566 if (logger.isLogEnabled()) { 567 logger.logSQL(sql, Collections.singletonList(nodeId)); 568 } 569 try (PreparedStatement ps = connection.prepareStatement(sql)) { 570 setToPreparedStatement(ps, 1, nodeId); 571 try (ResultSet rs = ps.executeQuery()) { 572 countExecute(); 573 while (rs.next()) { 574 // first column ignored, it's the node id 575 Serializable id = columns.get(1).getFromResultSet(rs, 1); 576 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 577 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 578 String[] fragments; 579 if (dialect.supportsArrays() && frags instanceof String[]) { 580 fragments = (String[]) frags; 581 } else { 582 fragments = ((String) frags).split(" "); 583 } 584 invalidations.add(id, fragments, kind); 585 } 586 } 587 if (logger.isLogEnabled()) { 588 // logCount(n); 589 logger.log(" -> " + invalidations); 590 } 591 if (dialect.isClusteringDeleteNeeded()) { 592 deleteClusterInvals(nodeId); 593 } 594 return invalidations; 595 } catch (SQLException e) { 596 throw new NuxeoException("Could not invalidate", e); 597 } 598 } 599 600 @Override 601 public Serializable getRootId(String repositoryId) { 602 String sql = sqlInfo.getSelectRootIdSql(); 603 if (logger.isLogEnabled()) { 604 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 605 } 606 try (PreparedStatement ps = connection.prepareStatement(sql)) { 607 ps.setString(1, repositoryId); 608 try (ResultSet rs = ps.executeQuery()) { 609 countExecute(); 610 if (!rs.next()) { 611 if (logger.isLogEnabled()) { 612 logger.log(" -> (none)"); 613 } 614 return null; 615 } 616 Column column = sqlInfo.getSelectRootIdWhatColumn(); 617 Serializable id = column.getFromResultSet(rs, 1); 618 if (logger.isLogEnabled()) { 619 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 620 } 621 // check that we didn't get several rows 622 if (rs.next()) { 623 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 624 } 625 return id; 626 } 627 } catch (SQLException e) { 628 throw new NuxeoException("Could not select: " + sql, e); 629 } 630 } 631 632 @Override 633 public void setRootId(Serializable repositoryId, Serializable id) { 634 String sql = sqlInfo.getInsertRootIdSql(); 635 try (PreparedStatement ps = connection.prepareStatement(sql)) { 636 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 637 List<Serializable> debugValues = null; 638 if (logger.isLogEnabled()) { 639 debugValues = new ArrayList<>(2); 640 } 641 int i = 0; 642 for (Column column : columns) { 643 i++; 644 String key = column.getKey(); 645 Serializable v; 646 if (key.equals(Model.MAIN_KEY)) { 647 v = id; 648 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 649 v = repositoryId; 650 } else { 651 throw new RuntimeException(key); 652 } 653 column.setToPreparedStatement(ps, i, v); 654 if (debugValues != null) { 655 debugValues.add(v); 656 } 657 } 658 if (debugValues != null) { 659 logger.logSQL(sql, debugValues); 660 debugValues.clear(); 661 } 662 ps.execute(); 663 countExecute(); 664 } catch (SQLException e) { 665 throw new NuxeoException("Could not insert: " + sql, e); 666 } 667 } 668 669 protected QueryMaker findQueryMaker(String queryType) { 670 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 671 QueryMaker queryMaker; 672 try { 673 queryMaker = klass.newInstance(); 674 } catch (ReflectiveOperationException e) { 675 throw new NuxeoException(e); 676 } 677 if (queryMaker.accepts(queryType)) { 678 return queryMaker; 679 } 680 } 681 return null; 682 } 683 684 protected void prepareUserReadAcls(QueryFilter queryFilter) { 685 String sql = dialect.getPrepareUserReadAclsSql(); 686 Serializable principals = queryFilter.getPrincipals(); 687 if (sql == null || principals == null) { 688 return; 689 } 690 if (!dialect.supportsArrays()) { 691 principals = String.join(Dialect.ARRAY_SEP, (String[]) principals); 692 } 693 try (PreparedStatement ps = connection.prepareStatement(sql)) { 694 if (logger.isLogEnabled()) { 695 logger.logSQL(sql, Collections.singleton(principals)); 696 } 697 setToPreparedStatement(ps, 1, principals); 698 ps.execute(); 699 countExecute(); 700 } catch (SQLException e) { 701 throw new NuxeoException("Failed to prepare user read acl cache", e); 702 } 703 } 704 705 @Override 706 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 707 boolean countTotal) { 708 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 709 } 710 711 @Override 712 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 713 PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo, 714 (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1)); 715 716 if (logger.isLogEnabled()) { 717 logger.logIds(result, countUpTo != 0, result.totalSize()); 718 } 719 720 return result; 721 } 722 723 // queryFilter used for principals and permissions 724 @Override 725 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 726 boolean distinctDocuments, Object... params) { 727 if (dialect.needsPrepareUserReadAcls()) { 728 prepareUserReadAcls(queryFilter); 729 } 730 QueryMaker queryMaker = findQueryMaker(queryType); 731 if (queryMaker == null) { 732 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 733 } 734 query = computeDistinctDocuments(query, distinctDocuments); 735 try { 736 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 737 } catch (SQLException e) { 738 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 739 } 740 } 741 742 @Override 743 public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType, 744 QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) { 745 query = computeDistinctDocuments(query, distinctDocuments); 746 PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo, 747 (info, rs) -> info.mapMaker.makeMap(rs), params); 748 749 if (logger.isLogEnabled()) { 750 logger.logMaps(result, countUpTo != 0, result.totalSize()); 751 } 752 753 return result; 754 } 755 756 protected String computeDistinctDocuments(String query, boolean distinctDocuments) { 757 if (distinctDocuments) { 758 String q = query.toLowerCase(); 759 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 760 // Replace "select" by "select distinct", split at "select ".length() index 761 query = "SELECT DISTINCT " + query.substring(7); 762 } 763 } 764 return query; 765 } 766 767 protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter, 768 long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) { 769 if (dialect.needsPrepareUserReadAcls()) { 770 prepareUserReadAcls(queryFilter); 771 } 772 QueryMaker queryMaker = findQueryMaker(queryType); 773 if (queryMaker == null) { 774 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 775 } 776 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params); 777 778 if (q == null) { 779 logger.log("Query cannot return anything due to conflicting clauses"); 780 return new PartialList<>(Collections.emptyList(), 0); 781 } 782 long limit = queryFilter.getLimit(); 783 long offset = queryFilter.getOffset(); 784 785 if (logger.isLogEnabled()) { 786 String sql = q.selectInfo.sql; 787 if (limit != 0) { 788 sql += " -- LIMIT " + limit + " OFFSET " + offset; 789 } 790 if (countUpTo != 0) { 791 sql += " -- COUNT TOTAL UP TO " + countUpTo; 792 } 793 logger.logSQL(sql, q.selectParams); 794 } 795 796 String sql = q.selectInfo.sql; 797 798 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 799 // full result set not needed for counting 800 sql = dialect.addPagingClause(sql, limit, offset); 801 limit = 0; 802 offset = 0; 803 } else if (countUpTo > 0 && dialect.supportsPaging()) { 804 // ask one more row 805 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 806 } 807 808 try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, 809 ResultSet.CONCUR_READ_ONLY)) { 810 int i = 1; 811 for (Serializable object : q.selectParams) { 812 setToPreparedStatement(ps, i++, object); 813 } 814 try (ResultSet rs = ps.executeQuery()) { 815 countExecute(); 816 817 // limit/offset 818 long totalSize = -1; 819 boolean available; 820 if ((limit == 0) || (offset == 0)) { 821 available = rs.first(); 822 if (!available) { 823 totalSize = 0; 824 } 825 if (limit == 0) { 826 limit = -1; // infinite 827 } 828 } else { 829 available = rs.absolute((int) offset + 1); 830 } 831 832 List<T> projections = new LinkedList<>(); 833 int rowNum = 0; 834 while (available && (limit != 0)) { 835 try { 836 T projection = extractor.apply(q.selectInfo, rs); 837 projections.add(projection); 838 rowNum = rs.getRow(); 839 available = rs.next(); 840 limit--; 841 } catch (SQLDataException e) { 842 // actually no data available, MariaDB Connector/J lied, stop now 843 available = false; 844 } 845 } 846 847 // total size 848 if (countUpTo != 0 && (totalSize == -1)) { 849 if (!available && (rowNum != 0)) { 850 // last row read was the actual last 851 totalSize = rowNum; 852 } else { 853 // available if limit reached with some left 854 // rowNum == 0 if skipped too far 855 rs.last(); 856 totalSize = rs.getRow(); 857 } 858 if (countUpTo > 0 && totalSize > countUpTo) { 859 // the result where truncated we don't know the total size 860 totalSize = -2; 861 } 862 } 863 864 return new PartialList<>(projections, totalSize); 865 } 866 } catch (SQLException e) { 867 throw new NuxeoException("Invalid query: " + query, e); 868 } 869 } 870 871 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 872 if (object instanceof Calendar) { 873 dialect.setToPreparedStatementTimestamp(ps, i, object, null); 874 } else if (object instanceof java.sql.Date) { 875 ps.setDate(i, (java.sql.Date) object); 876 } else if (object instanceof Long) { 877 ps.setLong(i, ((Long) object).longValue()); 878 } else if (object instanceof WrappedId) { 879 dialect.setId(ps, i, object.toString()); 880 } else if (object instanceof Object[]) { 881 int jdbcType; 882 if (object instanceof String[]) { 883 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 884 } else if (object instanceof Boolean[]) { 885 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 886 } else if (object instanceof Long[]) { 887 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 888 } else if (object instanceof Double[]) { 889 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 890 } else if (object instanceof java.sql.Date[]) { 891 jdbcType = Types.DATE; 892 } else if (object instanceof java.sql.Clob[]) { 893 jdbcType = Types.CLOB; 894 } else if (object instanceof Calendar[]) { 895 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 896 object = dialect.getTimestampFromCalendar((Calendar[]) object); 897 } else if (object instanceof Integer[]) { 898 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 899 } else { 900 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 901 } 902 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 903 ps.setArray(i, array); 904 } else { 905 ps.setObject(i, object); 906 } 907 return i; 908 } 909 910 @Override 911 public ScrollResult<String> scroll(String query, int batchSize, int keepAliveSeconds) { 912 if (!dialect.supportsScroll()) { 913 return defaultScroll(query); 914 } 915 checkForTimedoutScroll(); 916 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 917 return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds); 918 } 919 920 @Override 921 public ScrollResult<String> scroll(String query, QueryFilter queryFilter, int batchSize, int keepAliveSeconds) { 922 if (!dialect.supportsScroll()) { 923 return defaultScroll(query); 924 } 925 if (dialect.needsPrepareUserReadAcls()) { 926 prepareUserReadAcls(queryFilter); 927 } 928 checkForTimedoutScroll(); 929 return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds); 930 } 931 932 protected void checkForTimedoutScroll() { 933 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 934 } 935 936 protected ScrollResult<String> scrollSearch(String query, QueryFilter queryFilter, int batchSize, int keepAliveSeconds) { 937 QueryMaker queryMaker = findQueryMaker("NXQL"); 938 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 939 if (q == null) { 940 logger.log("Query cannot return anything due to conflicting clauses"); 941 throw new NuxeoException("Query cannot return anything due to conflicting clauses"); 942 } 943 if (logger.isLogEnabled()) { 944 logger.logSQL(q.selectInfo.sql, q.selectParams); 945 } 946 try { 947 if (connection.getAutoCommit()) { 948 throw new NuxeoException("Scroll should be done inside a transaction"); 949 } 950 // ps MUST NOT be auto-closed because it's referenced by a cursor 951 PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY, 952 ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); 953 ps.setFetchSize(batchSize); 954 int i = 1; 955 for (Serializable object : q.selectParams) { 956 setToPreparedStatement(ps, i++, object); 957 } 958 // rs MUST NOT be auto-closed because it's referenced by a cursor 959 ResultSet rs = ps.executeQuery(); 960 String scrollId = UUID.randomUUID().toString(); 961 registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds); 962 return scroll(scrollId); 963 } catch (SQLException e) { 964 throw new NuxeoException("Error on query", e); 965 } 966 } 967 968 protected class CursorResult { 969 protected final int keepAliveSeconds; 970 971 protected final PreparedStatement preparedStatement; 972 973 protected final ResultSet resultSet; 974 975 protected final int batchSize; 976 977 protected long lastCallTimestamp; 978 979 CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) { 980 this.preparedStatement = preparedStatement; 981 this.resultSet = resultSet; 982 this.batchSize = batchSize; 983 this.keepAliveSeconds = keepAliveSeconds; 984 lastCallTimestamp = System.currentTimeMillis(); 985 } 986 987 boolean timedOut(String scrollId) { 988 long now = System.currentTimeMillis(); 989 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 990 if (unregisterCursor(scrollId)) { 991 log.warn("Scroll " + scrollId + " timed out"); 992 } 993 return true; 994 } 995 return false; 996 } 997 998 void touch() { 999 lastCallTimestamp = System.currentTimeMillis(); 1000 } 1001 1002 synchronized void close() throws SQLException { 1003 if (resultSet != null) { 1004 resultSet.close(); 1005 } 1006 if (preparedStatement != null) { 1007 preparedStatement.close(); 1008 } 1009 } 1010 } 1011 1012 protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize, 1013 int keepAliveSeconds) { 1014 cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds)); 1015 } 1016 1017 protected boolean unregisterCursor(String scrollId) { 1018 CursorResult cursor = cursorResults.remove(scrollId); 1019 if (cursor != null) { 1020 try { 1021 cursor.close(); 1022 return true; 1023 } catch (SQLException e) { 1024 log.error("Failed to close cursor for scroll: " + scrollId, e); 1025 // do not propagate exception on cleaning 1026 } 1027 } 1028 return false; 1029 } 1030 1031 protected ScrollResult<String> defaultScroll(String query) { 1032 // the database has no proper support for cursor just return everything in one batch 1033 QueryMaker queryMaker = findQueryMaker("NXQL"); 1034 List<String> ids; 1035 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 1036 try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) { 1037 ids = new ArrayList<>((int) ret.size()); 1038 for (Map<String, Serializable> map : ret) { 1039 ids.add(map.get("ecm:uuid").toString()); 1040 } 1041 } catch (SQLException e) { 1042 throw new NuxeoException("Invalid scroll query: " + query, e); 1043 } 1044 return new ScrollResultImpl<>(NOSCROLL_ID, ids); 1045 } 1046 1047 @Override 1048 public ScrollResult<String> scroll(String scrollId) { 1049 if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) { 1050 // there is only one batch in this case 1051 return emptyResult(); 1052 } 1053 CursorResult cursorResult = cursorResults.get(scrollId); 1054 if (cursorResult == null) { 1055 throw new NuxeoException("Unknown or timed out scrollId"); 1056 } else if (cursorResult.timedOut(scrollId)) { 1057 throw new NuxeoException("Timed out scrollId"); 1058 } 1059 cursorResult.touch(); 1060 List<String> ids = new ArrayList<>(cursorResult.batchSize); 1061 synchronized (cursorResult) { 1062 try { 1063 if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) { 1064 unregisterCursor(scrollId); 1065 return emptyResult(); 1066 } 1067 while (ids.size() < cursorResult.batchSize) { 1068 if (cursorResult.resultSet.next()) { 1069 ids.add(cursorResult.resultSet.getString(1)); 1070 } else { 1071 cursorResult.close(); 1072 if (ids.isEmpty()) { 1073 unregisterCursor(scrollId); 1074 } 1075 break; 1076 } 1077 } 1078 } catch (SQLException e) { 1079 throw new NuxeoException("Error during scroll", e); 1080 } 1081 } 1082 return new ScrollResultImpl<>(scrollId, ids); 1083 } 1084 1085 @Override 1086 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 1087 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 1088 if (select == null) { 1089 return getAncestorsIdsIterative(ids); 1090 } 1091 Serializable whereIds = newIdArray(ids); 1092 Set<Serializable> res = new HashSet<>(); 1093 if (logger.isLogEnabled()) { 1094 logger.logSQL(select.sql, Collections.singleton(whereIds)); 1095 } 1096 Column what = select.whatColumns.get(0); 1097 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1098 setToPreparedStatementIdArray(ps, 1, whereIds); 1099 try (ResultSet rs = ps.executeQuery()) { 1100 countExecute(); 1101 List<Serializable> debugIds = null; 1102 if (logger.isLogEnabled()) { 1103 debugIds = new LinkedList<>(); 1104 } 1105 while (rs.next()) { 1106 if (dialect.supportsArraysReturnInsteadOfRows()) { 1107 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 1108 for (Serializable id : resultIds) { 1109 if (id != null) { 1110 res.add(id); 1111 if (logger.isLogEnabled()) { 1112 debugIds.add(id); 1113 } 1114 } 1115 } 1116 } else { 1117 Serializable id = what.getFromResultSet(rs, 1); 1118 if (id != null) { 1119 res.add(id); 1120 if (logger.isLogEnabled()) { 1121 debugIds.add(id); 1122 } 1123 } 1124 } 1125 } 1126 if (logger.isLogEnabled()) { 1127 logger.logIds(debugIds, false, 0); 1128 } 1129 } 1130 return res; 1131 } catch (SQLException e) { 1132 throw new NuxeoException("Failed to get ancestors ids", e); 1133 } 1134 } 1135 1136 /** 1137 * Uses iterative parentid selection. 1138 */ 1139 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 1140 try { 1141 LinkedList<Serializable> todo = new LinkedList<>(ids); 1142 Set<Serializable> done = new HashSet<>(); 1143 Set<Serializable> res = new HashSet<>(); 1144 while (!todo.isEmpty()) { 1145 done.addAll(todo); 1146 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 1147 if (logger.isLogEnabled()) { 1148 logger.logSQL(select.sql, todo); 1149 } 1150 Column what = select.whatColumns.get(0); 1151 Column where = select.whereColumns.get(0); 1152 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 1153 int i = 1; 1154 for (Serializable id : todo) { 1155 where.setToPreparedStatement(ps, i++, id); 1156 } 1157 try (ResultSet rs = ps.executeQuery()) { 1158 countExecute(); 1159 todo = new LinkedList<>(); 1160 List<Serializable> debugIds = null; 1161 if (logger.isLogEnabled()) { 1162 debugIds = new LinkedList<>(); 1163 } 1164 while (rs.next()) { 1165 Serializable id = what.getFromResultSet(rs, 1); 1166 if (id != null) { 1167 res.add(id); 1168 if (!done.contains(id)) { 1169 todo.add(id); 1170 } 1171 if (logger.isLogEnabled()) { 1172 debugIds.add(id); // NOSONAR 1173 } 1174 } 1175 } 1176 if (logger.isLogEnabled()) { 1177 logger.logIds(debugIds, false, 0); 1178 } 1179 } 1180 } 1181 } 1182 return res; 1183 } catch (SQLException e) { 1184 throw new NuxeoException("Failed to get ancestors ids", e); 1185 } 1186 } 1187 1188 @Override 1189 public void updateReadAcls() { 1190 if (!dialect.supportsReadAcl()) { 1191 return; 1192 } 1193 if (log.isDebugEnabled()) { 1194 log.debug("updateReadAcls: updating"); 1195 } 1196 try (Statement st = connection.createStatement()) { 1197 String sql = dialect.getUpdateReadAclsSql(); 1198 if (logger.isLogEnabled()) { 1199 logger.log(sql); 1200 } 1201 st.execute(sql); 1202 countExecute(); 1203 } catch (SQLException e) { 1204 checkConcurrentUpdate(e); 1205 throw new NuxeoException("Failed to update read acls", e); 1206 } 1207 if (log.isDebugEnabled()) { 1208 log.debug("updateReadAcls: done."); 1209 } 1210 } 1211 1212 @Override 1213 public void rebuildReadAcls() { 1214 if (!dialect.supportsReadAcl()) { 1215 return; 1216 } 1217 log.debug("rebuildReadAcls: rebuilding ..."); 1218 try (Statement st = connection.createStatement()) { 1219 String sql = dialect.getRebuildReadAclsSql(); 1220 logger.log(sql); 1221 st.execute(sql); 1222 countExecute(); 1223 } catch (SQLException e) { 1224 throw new NuxeoException("Failed to rebuild read acls", e); 1225 } 1226 log.debug("rebuildReadAcls: done."); 1227 } 1228 1229 /* 1230 * ----- Locking ----- 1231 */ 1232 1233 @Override 1234 public Lock getLock(Serializable id) { 1235 if (log.isDebugEnabled()) { 1236 try { 1237 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1238 } catch (SQLException e) { 1239 throw new RuntimeException(e); 1240 } 1241 } 1242 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1243 Row row = readSimpleRow(rowId); 1244 return row == null ? null 1245 : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1246 } 1247 1248 @Override 1249 public Lock setLock(final Serializable id, final Lock lock) { 1250 if (log.isDebugEnabled()) { 1251 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1252 } 1253 Lock oldLock = getLock(id); 1254 if (oldLock == null) { 1255 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1256 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1257 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1258 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1259 } 1260 return oldLock; 1261 } 1262 1263 @Override 1264 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1265 if (log.isDebugEnabled()) { 1266 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1267 } 1268 Lock oldLock = force ? null : getLock(id); 1269 if (!force && owner != null) { 1270 if (oldLock == null) { 1271 // not locked, nothing to do 1272 return null; 1273 } 1274 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1275 // existing mismatched lock, flag failure 1276 return new Lock(oldLock, true); 1277 } 1278 } 1279 if (force || oldLock != null) { 1280 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1281 } 1282 return oldLock; 1283 } 1284 1285 @Override 1286 public void markReferencedBinaries() { 1287 log.debug("Starting binaries GC mark"); 1288 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 1289 String repositoryName = getRepositoryName(); 1290 try (Statement st = connection.createStatement()) { 1291 int i = -1; 1292 for (String sql : sqlInfo.getBinariesSql) { 1293 i++; 1294 Column col = sqlInfo.getBinariesColumns.get(i); 1295 if (logger.isLogEnabled()) { 1296 logger.log(sql); 1297 } 1298 try (ResultSet rs = st.executeQuery(sql)) { 1299 countExecute(); 1300 int n = 0; 1301 while (rs.next()) { 1302 n++; 1303 String key = (String) col.getFromResultSet(rs, 1); 1304 if (key != null) { 1305 blobManager.markReferencedBinary(key, repositoryName); 1306 } 1307 } 1308 if (logger.isLogEnabled()) { 1309 logger.logCount(n); 1310 } 1311 } 1312 } 1313 } catch (SQLException e) { 1314 throw new RuntimeException("Failed to mark binaries for gC", e); 1315 } 1316 log.debug("End of binaries GC mark"); 1317 } 1318 1319 /* 1320 * ----- XAResource ----- 1321 */ 1322 1323 protected static String systemToString(Object o) { 1324 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1325 } 1326 1327 @Override 1328 public void start(Xid xid, int flags) throws XAException { 1329 try { 1330 xaresource.start(xid, flags); 1331 if (logger.isLogEnabled()) { 1332 logger.log("XA start on " + systemToString(xid)); 1333 } 1334 } catch (NuxeoException e) { 1335 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1336 } catch (XAException e) { 1337 logger.error("XA start error on " + systemToString(xid), e); 1338 throw e; 1339 } 1340 } 1341 1342 @Override 1343 public void end(Xid xid, int flags) throws XAException { 1344 try { 1345 xaresource.end(xid, flags); 1346 if (logger.isLogEnabled()) { 1347 logger.log("XA end on " + systemToString(xid)); 1348 } 1349 } catch (NullPointerException e) { 1350 // H2 when no active transaction 1351 logger.error("XA end error on " + systemToString(xid), e); 1352 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1353 } catch (XAException e) { 1354 if (flags != XAResource.TMFAIL) { 1355 logger.error("XA end error on " + systemToString(xid), e); 1356 } 1357 throw e; 1358 } 1359 } 1360 1361 @Override 1362 public int prepare(Xid xid) throws XAException { 1363 try { 1364 return xaresource.prepare(xid); 1365 } catch (XAException e) { 1366 logger.error("XA prepare error on " + systemToString(xid), e); 1367 throw e; 1368 } 1369 } 1370 1371 @Override 1372 public void commit(Xid xid, boolean onePhase) throws XAException { 1373 try { 1374 xaresource.commit(xid, onePhase); 1375 } catch (XAException e) { 1376 logger.error("XA commit error on " + systemToString(xid), e); 1377 throw e; 1378 } 1379 } 1380 1381 // rollback interacts with caches so is in RowMapper 1382 1383 @Override 1384 public void forget(Xid xid) throws XAException { 1385 xaresource.forget(xid); 1386 } 1387 1388 @Override 1389 public Xid[] recover(int flag) throws XAException { 1390 return xaresource.recover(flag); 1391 } 1392 1393 @Override 1394 public boolean setTransactionTimeout(int seconds) throws XAException { 1395 return xaresource.setTransactionTimeout(seconds); 1396 } 1397 1398 @Override 1399 public int getTransactionTimeout() throws XAException { 1400 return xaresource.getTransactionTimeout(); 1401 } 1402 1403 @Override 1404 public boolean isSameRM(XAResource xares) throws XAException { 1405 throw new UnsupportedOperationException(); 1406 } 1407 1408 @Override 1409 public boolean isConnected() { 1410 return connection != null; 1411 } 1412 1413 @Override 1414 public void connect(boolean noSharing) { 1415 openConnections(noSharing); 1416 } 1417 1418 @Override 1419 public void disconnect() { 1420 closeConnections(); 1421 } 1422 1423 /** 1424 * @since 7.10-HF25, 8.10-HF06, 9.2 1425 */ 1426 @FunctionalInterface 1427 protected interface BiFunctionSQLException<T, U, R> { 1428 1429 /** 1430 * Applies this function to the given arguments. 1431 * 1432 * @param t the first function argument 1433 * @param u the second function argument 1434 * @return the function result 1435 */ 1436 R apply(T t, U u) throws SQLException; 1437 1438 } 1439 1440}