001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import java.io.File; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.OutputStream; 026import java.io.PrintStream; 027import java.io.Serializable; 028import java.security.MessageDigest; 029import java.security.NoSuchAlgorithmException; 030import java.sql.Array; 031import java.sql.Connection; 032import java.sql.DatabaseMetaData; 033import java.sql.PreparedStatement; 034import java.sql.ResultSet; 035import java.sql.SQLException; 036import java.sql.Statement; 037import java.sql.Types; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.Calendar; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashMap; 044import java.util.HashSet; 045import java.util.LinkedList; 046import java.util.List; 047import java.util.Map; 048import java.util.Map.Entry; 049import java.util.Set; 050import java.util.UUID; 051import java.util.concurrent.Callable; 052import java.util.concurrent.ConcurrentHashMap; 053 054import javax.transaction.xa.XAException; 055import javax.transaction.xa.XAResource; 056import javax.transaction.xa.Xid; 057 058import org.apache.commons.logging.Log; 059import org.apache.commons.logging.LogFactory; 060import org.nuxeo.common.Environment; 061import org.nuxeo.common.utils.StringUtils; 062import org.nuxeo.ecm.core.api.IterableQueryResult; 063import org.nuxeo.ecm.core.api.Lock; 064import org.nuxeo.ecm.core.api.NuxeoException; 065import org.nuxeo.ecm.core.api.PartialList; 066import org.nuxeo.ecm.core.api.ScrollResult; 067import org.nuxeo.ecm.core.api.ScrollResultImpl; 068import org.nuxeo.ecm.core.blob.BlobManager; 069import org.nuxeo.ecm.core.model.LockManager; 070import org.nuxeo.ecm.core.query.QueryFilter; 071import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 072import org.nuxeo.ecm.core.storage.sql.ColumnType; 073import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 074import org.nuxeo.ecm.core.storage.sql.Invalidations; 075import org.nuxeo.ecm.core.storage.sql.Mapper; 076import org.nuxeo.ecm.core.storage.sql.Model; 077import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 078import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 079import org.nuxeo.ecm.core.storage.sql.Row; 080import org.nuxeo.ecm.core.storage.sql.RowId; 081import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 082import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector; 089import org.nuxeo.runtime.api.Framework; 090 091import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 092 093/** 094 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 095 * computes statements. 096 * <p> 097 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 098 * statements recorded in the {@link SQLInfo}. 099 */ 100public class JDBCMapper extends JDBCRowMapper implements Mapper { 101 102 private static final Log log = LogFactory.getLog(JDBCMapper.class); 103 104 public static Map<String, Serializable> testProps = new HashMap<String, Serializable>(); 105 106 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 107 108 public static final String TEST_UPGRADE = "testUpgrade"; 109 110 // property in sql.txt file 111 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 112 113 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 114 115 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 116 117 protected TableUpgrader tableUpgrader; 118 119 private final QueryMakerService queryMakerService; 120 121 private final PathResolver pathResolver; 122 123 private final RepositoryImpl repository; 124 125 protected boolean clusteringEnabled; 126 127 protected static final String NOSCROLL_ID = "noscroll"; 128 129 /** 130 * Creates a new Mapper. 131 * 132 * @param model the model 133 * @param pathResolver the path resolver (used for startswith queries) 134 * @param sqlInfo the sql info 135 * @param clusterInvalidator the cluster invalidator 136 * @param noSharing whether to use no-sharing mode for the connection 137 * @param repository 138 */ 139 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator, 140 boolean noSharing, RepositoryImpl repository) { 141 super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing); 142 this.pathResolver = pathResolver; 143 this.repository = repository; 144 clusteringEnabled = clusterInvalidator != null; 145 queryMakerService = Framework.getService(QueryMakerService.class); 146 147 tableUpgrader = new TableUpgrader(this); 148 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 149 TEST_UPGRADE_VERSIONS); 150 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 151 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 152 153 } 154 155 @Override 156 public int getTableSize(String tableName) { 157 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 158 } 159 160 /* 161 * ----- Root ----- 162 */ 163 164 @Override 165 public void createDatabase(String ddlMode) { 166 // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it 167 try { 168 boolean suspend = !connection.getAutoCommit(); 169 try { 170 if (suspend) { 171 connection.setAutoCommit(true); 172 } 173 createTables(ddlMode); 174 } finally { 175 if (suspend) { 176 connection.setAutoCommit(false); 177 } 178 } 179 } catch (SQLException e) { 180 throw new NuxeoException(e); 181 } 182 } 183 184 protected String getTableName(String origName) { 185 186 if (dialect instanceof DialectOracle) { 187 if (origName.length() > 30) { 188 189 StringBuilder sb = new StringBuilder(origName.length()); 190 191 try { 192 MessageDigest digest = MessageDigest.getInstance("MD5"); 193 sb.append(origName.substring(0, 15)); 194 sb.append('_'); 195 196 digest.update(origName.getBytes()); 197 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 198 199 return sb.toString(); 200 201 } catch (NoSuchAlgorithmException e) { 202 throw new RuntimeException("Error", e); 203 } 204 } 205 } 206 207 return origName; 208 } 209 210 protected void createTables(String ddlMode) throws SQLException { 211 ListCollector ddlCollector = new ListCollector(); 212 213 sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category 214 sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector); 215 sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector); 216 if (testProps.containsKey(TEST_UPGRADE)) { 217 // create "old" tables 218 sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect 219 } 220 221 String schemaName = dialect.getConnectionSchema(connection); 222 DatabaseMetaData metadata = connection.getMetaData(); 223 Set<String> tableNames = findTableNames(metadata, schemaName); 224 Database database = sqlInfo.getDatabase(); 225 Map<String, List<Column>> added = new HashMap<String, List<Column>>(); 226 227 for (Table table : database.getTables()) { 228 String tableName = getTableName(table.getPhysicalName()); 229 if (!tableNames.contains(tableName.toUpperCase())) { 230 231 /* 232 * Create missing table. 233 */ 234 235 ddlCollector.add(table.getCreateSql()); 236 ddlCollector.addAll(table.getPostCreateSqls(model)); 237 238 added.put(table.getKey(), null); // null = table created 239 240 sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE); 241 242 } else { 243 244 /* 245 * Get existing columns. 246 */ 247 248 Map<String, Integer> columnTypes = new HashMap<String, Integer>(); 249 Map<String, String> columnTypeNames = new HashMap<String, String>(); 250 Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>(); 251 try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) { 252 while (rs.next()) { 253 String schema = rs.getString("TABLE_SCHEM"); 254 if (schema != null) { // null for MySQL, doh! 255 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 256 // H2 returns some system tables (locks) 257 continue; 258 } 259 } 260 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 261 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 262 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 263 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 264 } 265 } 266 267 /* 268 * Update types and create missing columns. 269 */ 270 271 List<Column> addedColumns = new LinkedList<Column>(); 272 for (Column column : table.getColumns()) { 273 String upperName = column.getPhysicalName().toUpperCase(); 274 Integer type = columnTypes.remove(upperName); 275 if (type == null) { 276 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 277 ddlCollector.add(table.getAddColumnSql(column)); 278 ddlCollector.addAll(table.getPostAddSqls(column, model)); 279 addedColumns.add(column); 280 } else { 281 int expected = column.getJdbcType(); 282 int actual = type.intValue(); 283 String actualName = columnTypeNames.get(upperName); 284 Integer actualSize = columnTypeSizes.get(upperName); 285 if (!column.setJdbcType(actual, actualName, actualSize.intValue())) { 286 log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)", 287 column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, 288 actualSize)); 289 } 290 } 291 } 292 for (String col : dialect.getIgnoredColumns(table)) { 293 columnTypes.remove(col.toUpperCase()); 294 } 295 if (!columnTypes.isEmpty()) { 296 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 297 + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", ")); 298 } 299 if (!addedColumns.isEmpty()) { 300 if (added.containsKey(table.getKey())) { 301 throw new AssertionError(); 302 } 303 added.put(table.getKey(), addedColumns); 304 } 305 } 306 } 307 308 if (testProps.containsKey(TEST_UPGRADE)) { 309 // create "old" content in tables 310 sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector); 311 } 312 313 // run upgrade for each table if added columns or test 314 for (Entry<String, List<Column>> en : added.entrySet()) { 315 List<Column> addedColumns = en.getValue(); 316 String tableKey = en.getKey(); 317 upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector); 318 } 319 320 sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector); 321 sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector); 322 323 // aclr_permission check for PostgreSQL 324 dialect.performAdditionalStatements(connection); 325 326 /* 327 * Execute all the collected DDL, or dump it if requested, depending on ddlMode 328 */ 329 330 // ddlMode may be: 331 // ignore (not treated here, nothing done) 332 // dump (implies execute) 333 // dump,execute 334 // dump,ignore (no execute) 335 // execute 336 // abort (implies dump) 337 // compat can be used instead of execute to always recreate stored procedures 338 339 List<String> ddl = ddlCollector.getStrings(); 340 boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE); 341 boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP); 342 boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT); 343 if (dump || abort) { 344 345 /* 346 * Dump DDL if not empty. 347 */ 348 349 if (!ddl.isEmpty()) { 350 File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql"); 351 try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) { 352 for (String sql : dialect.getDumpStart()) { 353 ps.println(sql); 354 } 355 for (String sql : ddl) { 356 sql = sql.trim(); 357 if (sql.endsWith(";")) { 358 sql = sql.substring(0, sql.length() - 1); 359 } 360 ps.println(dialect.getSQLForDump(sql)); 361 } 362 for (String sql : dialect.getDumpStop()) { 363 ps.println(sql); 364 } 365 } catch (IOException e) { 366 throw new NuxeoException(e); 367 } 368 369 /* 370 * Abort if requested. 371 */ 372 373 if (abort) { 374 log.error("Dumped DDL to: " + dumpFile); 375 throw new NuxeoException("Database initialization failed for: " + repository.getName() 376 + ", DDL must be executed: " + dumpFile); 377 } 378 } 379 } 380 if (!ignore) { 381 382 /* 383 * Execute DDL. 384 */ 385 386 try (Statement st = connection.createStatement()) { 387 for (String sql : ddl) { 388 logger.log(sql.replace("\n", "\n ")); // indented 389 try { 390 st.execute(sql); 391 } catch (SQLException e) { 392 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 393 } 394 countExecute(); 395 } 396 } 397 398 /* 399 * Execute post-DDL stuff. 400 */ 401 402 try (Statement st = connection.createStatement()) { 403 for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) { 404 logger.log(sql.replace("\n", "\n ")); // indented 405 try { 406 st.execute(sql); 407 } catch (SQLException e) { 408 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 409 } 410 countExecute(); 411 } 412 } 413 } 414 } 415 416 protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector) 417 throws SQLException { 418 tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector); 419 } 420 421 /** Finds uppercase table names. */ 422 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 423 Set<String> tableNames = new HashSet<String>(); 424 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 425 while (rs.next()) { 426 String tableName = rs.getString("TABLE_NAME"); 427 tableNames.add(tableName.toUpperCase()); 428 } 429 rs.close(); 430 return tableNames; 431 } 432 433 @Override 434 public int getClusterNodeIdType() { 435 return sqlInfo.getClusterNodeIdType(); 436 } 437 438 @Override 439 public void createClusterNode(Serializable nodeId) { 440 Calendar now = Calendar.getInstance(); 441 try { 442 String sql = sqlInfo.getCreateClusterNodeSql(); 443 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 444 PreparedStatement ps = connection.prepareStatement(sql); 445 try { 446 if (logger.isLogEnabled()) { 447 logger.logSQL(sql, Arrays.asList(nodeId, now)); 448 } 449 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 450 columns.get(1).setToPreparedStatement(ps, 2, now); 451 ps.execute(); 452 } finally { 453 closeStatement(ps); 454 } 455 } catch (SQLException e) { 456 throw new NuxeoException(e); 457 } 458 } 459 460 @Override 461 public void removeClusterNode(Serializable nodeId) { 462 try { 463 // delete from cluster_nodes 464 String sql = sqlInfo.getDeleteClusterNodeSql(); 465 Column column = sqlInfo.getDeleteClusterNodeColumn(); 466 PreparedStatement ps = connection.prepareStatement(sql); 467 try { 468 if (logger.isLogEnabled()) { 469 logger.logSQL(sql, Arrays.asList(nodeId)); 470 } 471 column.setToPreparedStatement(ps, 1, nodeId); 472 ps.execute(); 473 } finally { 474 closeStatement(ps); 475 } 476 // delete un-processed invals from cluster_invals 477 deleteClusterInvals(nodeId); 478 } catch (SQLException e) { 479 throw new NuxeoException(e); 480 } 481 } 482 483 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 484 String sql = sqlInfo.getDeleteClusterInvalsSql(); 485 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 486 PreparedStatement ps = connection.prepareStatement(sql); 487 try { 488 if (logger.isLogEnabled()) { 489 logger.logSQL(sql, Arrays.asList(nodeId)); 490 } 491 column.setToPreparedStatement(ps, 1, nodeId); 492 int n = ps.executeUpdate(); 493 countExecute(); 494 if (logger.isLogEnabled()) { 495 logger.logCount(n); 496 } 497 } finally { 498 try { 499 closeStatement(ps); 500 } catch (SQLException e) { 501 log.error("deleteClusterInvals: " + e.getMessage(), e); 502 } 503 } 504 } 505 506 @Override 507 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 508 String sql = dialect.getClusterInsertInvalidations(); 509 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 510 PreparedStatement ps = null; 511 try { 512 ps = connection.prepareStatement(sql); 513 int kind = Invalidations.MODIFIED; 514 while (true) { 515 Set<RowId> rowIds = invalidations.getKindSet(kind); 516 517 // reorganize by id 518 Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>(); 519 for (RowId rowId : rowIds) { 520 Set<String> tableNames = res.get(rowId.id); 521 if (tableNames == null) { 522 res.put(rowId.id, tableNames = new HashSet<String>()); 523 } 524 tableNames.add(rowId.tableName); 525 } 526 527 // do inserts 528 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 529 Serializable id = en.getKey(); 530 String fragments = join(en.getValue(), ' '); 531 if (logger.isLogEnabled()) { 532 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 533 } 534 Serializable frags; 535 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 536 frags = fragments.split(" "); 537 } else { 538 frags = fragments; 539 } 540 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 541 columns.get(1).setToPreparedStatement(ps, 2, id); 542 columns.get(2).setToPreparedStatement(ps, 3, frags); 543 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 544 ps.execute(); 545 countExecute(); 546 } 547 if (kind == Invalidations.MODIFIED) { 548 kind = Invalidations.DELETED; 549 } else { 550 break; 551 } 552 } 553 } catch (SQLException e) { 554 throw new NuxeoException("Could not invalidate", e); 555 } finally { 556 try { 557 closeStatement(ps); 558 } catch (SQLException e) { 559 log.error(e.getMessage(), e); 560 } 561 } 562 } 563 564 // join that works on a set 565 protected static final String join(Collection<String> strings, char sep) { 566 if (strings.isEmpty()) { 567 throw new RuntimeException(); 568 } 569 if (strings.size() == 1) { 570 return strings.iterator().next(); 571 } 572 int size = 0; 573 for (String word : strings) { 574 size += word.length() + 1; 575 } 576 StringBuilder buf = new StringBuilder(size); 577 for (String word : strings) { 578 buf.append(word); 579 buf.append(sep); 580 } 581 buf.setLength(size - 1); 582 return buf.toString(); 583 } 584 585 @Override 586 public Invalidations getClusterInvalidations(Serializable nodeId) { 587 Invalidations invalidations = new Invalidations(); 588 String sql = dialect.getClusterGetInvalidations(); 589 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 590 try { 591 if (logger.isLogEnabled()) { 592 logger.logSQL(sql, Arrays.asList(nodeId)); 593 } 594 PreparedStatement ps = connection.prepareStatement(sql); 595 ResultSet rs = null; 596 try { 597 setToPreparedStatement(ps, 1, nodeId); 598 rs = ps.executeQuery(); 599 countExecute(); 600 while (rs.next()) { 601 // first column ignored, it's the node id 602 Serializable id = columns.get(1).getFromResultSet(rs, 1); 603 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 604 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 605 String[] fragments; 606 if (dialect.supportsArrays() && frags instanceof String[]) { 607 fragments = (String[]) frags; 608 } else { 609 fragments = ((String) frags).split(" "); 610 } 611 invalidations.add(id, fragments, kind); 612 } 613 } finally { 614 closeStatement(ps, rs); 615 } 616 if (logger.isLogEnabled()) { 617 // logCount(n); 618 logger.log(" -> " + invalidations); 619 } 620 if (dialect.isClusteringDeleteNeeded()) { 621 deleteClusterInvals(nodeId); 622 } 623 return invalidations; 624 } catch (SQLException e) { 625 throw new NuxeoException("Could not invalidate", e); 626 } 627 } 628 629 @Override 630 public Serializable getRootId(String repositoryId) { 631 String sql = sqlInfo.getSelectRootIdSql(); 632 try { 633 if (logger.isLogEnabled()) { 634 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 635 } 636 PreparedStatement ps = connection.prepareStatement(sql); 637 ResultSet rs = null; 638 try { 639 ps.setString(1, repositoryId); 640 rs = ps.executeQuery(); 641 countExecute(); 642 if (!rs.next()) { 643 if (logger.isLogEnabled()) { 644 logger.log(" -> (none)"); 645 } 646 return null; 647 } 648 Column column = sqlInfo.getSelectRootIdWhatColumn(); 649 Serializable id = column.getFromResultSet(rs, 1); 650 if (logger.isLogEnabled()) { 651 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 652 } 653 // check that we didn't get several rows 654 if (rs.next()) { 655 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 656 } 657 return id; 658 } finally { 659 closeStatement(ps, rs); 660 } 661 } catch (SQLException e) { 662 throw new NuxeoException("Could not select: " + sql, e); 663 } 664 } 665 666 @Override 667 public void setRootId(Serializable repositoryId, Serializable id) { 668 String sql = sqlInfo.getInsertRootIdSql(); 669 try { 670 PreparedStatement ps = connection.prepareStatement(sql); 671 try { 672 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 673 List<Serializable> debugValues = null; 674 if (logger.isLogEnabled()) { 675 debugValues = new ArrayList<Serializable>(2); 676 } 677 int i = 0; 678 for (Column column : columns) { 679 i++; 680 String key = column.getKey(); 681 Serializable v; 682 if (key.equals(Model.MAIN_KEY)) { 683 v = id; 684 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 685 v = repositoryId; 686 } else { 687 throw new RuntimeException(key); 688 } 689 column.setToPreparedStatement(ps, i, v); 690 if (debugValues != null) { 691 debugValues.add(v); 692 } 693 } 694 if (debugValues != null) { 695 logger.logSQL(sql, debugValues); 696 debugValues.clear(); 697 } 698 ps.execute(); 699 countExecute(); 700 } finally { 701 closeStatement(ps); 702 } 703 } catch (SQLException e) { 704 throw new NuxeoException("Could not insert: " + sql, e); 705 } 706 } 707 708 protected QueryMaker findQueryMaker(String queryType) { 709 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 710 QueryMaker queryMaker; 711 try { 712 queryMaker = klass.newInstance(); 713 } catch (ReflectiveOperationException e) { 714 throw new NuxeoException(e); 715 } 716 if (queryMaker.accepts(queryType)) { 717 return queryMaker; 718 } 719 } 720 return null; 721 } 722 723 protected void prepareUserReadAcls(QueryFilter queryFilter) { 724 String sql = dialect.getPrepareUserReadAclsSql(); 725 Serializable principals = queryFilter.getPrincipals(); 726 if (sql == null || principals == null) { 727 return; 728 } 729 if (!dialect.supportsArrays()) { 730 principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP); 731 } 732 PreparedStatement ps = null; 733 try { 734 ps = connection.prepareStatement(sql); 735 if (logger.isLogEnabled()) { 736 logger.logSQL(sql, Collections.singleton(principals)); 737 } 738 setToPreparedStatement(ps, 1, principals); 739 ps.execute(); 740 countExecute(); 741 } catch (SQLException e) { 742 throw new NuxeoException("Failed to prepare user read acl cache", e); 743 } finally { 744 try { 745 closeStatement(ps); 746 } catch (SQLException e) { 747 log.error(e.getMessage(), e); 748 } 749 } 750 } 751 752 @Override 753 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 754 boolean countTotal) { 755 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 756 } 757 758 @Override 759 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 760 if (dialect.needsPrepareUserReadAcls()) { 761 prepareUserReadAcls(queryFilter); 762 } 763 QueryMaker queryMaker = findQueryMaker(queryType); 764 if (queryMaker == null) { 765 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 766 } 767 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 768 769 if (q == null) { 770 logger.log("Query cannot return anything due to conflicting clauses"); 771 return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0); 772 } 773 long limit = queryFilter.getLimit(); 774 long offset = queryFilter.getOffset(); 775 776 if (logger.isLogEnabled()) { 777 String sql = q.selectInfo.sql; 778 if (limit != 0) { 779 sql += " -- LIMIT " + limit + " OFFSET " + offset; 780 } 781 if (countUpTo != 0) { 782 sql += " -- COUNT TOTAL UP TO " + countUpTo; 783 } 784 logger.logSQL(sql, q.selectParams); 785 } 786 787 String sql = q.selectInfo.sql; 788 789 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 790 // full result set not needed for counting 791 sql = dialect.addPagingClause(sql, limit, offset); 792 limit = 0; 793 offset = 0; 794 } else if (countUpTo > 0 && dialect.supportsPaging()) { 795 // ask one more row 796 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 797 } 798 799 PreparedStatement ps = null; 800 ResultSet rs = null; 801 try { 802 ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 803 int i = 1; 804 for (Serializable object : q.selectParams) { 805 setToPreparedStatement(ps, i++, object); 806 } 807 rs = ps.executeQuery(); 808 countExecute(); 809 810 // limit/offset 811 long totalSize = -1; 812 boolean available; 813 if ((limit == 0) || (offset == 0)) { 814 available = rs.first(); 815 if (!available) { 816 totalSize = 0; 817 } 818 if (limit == 0) { 819 limit = -1; // infinite 820 } 821 } else { 822 available = rs.absolute((int) offset + 1); 823 } 824 825 Column column = q.selectInfo.whatColumns.get(0); 826 List<Serializable> ids = new LinkedList<Serializable>(); 827 int rowNum = 0; 828 while (available && (limit != 0)) { 829 Serializable id = column.getFromResultSet(rs, 1); 830 ids.add(id); 831 rowNum = rs.getRow(); 832 available = rs.next(); 833 limit--; 834 } 835 836 // total size 837 if (countUpTo != 0 && (totalSize == -1)) { 838 if (!available && (rowNum != 0)) { 839 // last row read was the actual last 840 totalSize = rowNum; 841 } else { 842 // available if limit reached with some left 843 // rowNum == 0 if skipped too far 844 rs.last(); 845 totalSize = rs.getRow(); 846 } 847 if (countUpTo > 0 && totalSize > countUpTo) { 848 // the result where truncated we don't know the total size 849 totalSize = -2; 850 } 851 } 852 853 if (logger.isLogEnabled()) { 854 logger.logIds(ids, countUpTo != 0, totalSize); 855 } 856 857 return new PartialList<Serializable>(ids, totalSize); 858 } catch (SQLException e) { 859 throw new NuxeoException("Invalid query: " + query, e); 860 } finally { 861 try { 862 closeStatement(ps, rs); 863 } catch (SQLException e) { 864 log.error("Cannot close connection", e); 865 } 866 } 867 } 868 869 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 870 if (object instanceof Calendar) { 871 Calendar cal = (Calendar) object; 872 ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal); 873 } else if (object instanceof java.sql.Date) { 874 ps.setDate(i, (java.sql.Date) object); 875 } else if (object instanceof Long) { 876 ps.setLong(i, ((Long) object).longValue()); 877 } else if (object instanceof WrappedId) { 878 dialect.setId(ps, i, object.toString()); 879 } else if (object instanceof Object[]) { 880 int jdbcType; 881 if (object instanceof String[]) { 882 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 883 } else if (object instanceof Boolean[]) { 884 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 885 } else if (object instanceof Long[]) { 886 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 887 } else if (object instanceof Double[]) { 888 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 889 } else if (object instanceof java.sql.Date[]) { 890 jdbcType = Types.DATE; 891 } else if (object instanceof java.sql.Clob[]) { 892 jdbcType = Types.CLOB; 893 } else if (object instanceof Calendar[]) { 894 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 895 object = dialect.getTimestampFromCalendar((Calendar) object); 896 } else if (object instanceof Integer[]) { 897 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 898 } else { 899 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 900 } 901 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 902 ps.setArray(i, array); 903 } else { 904 ps.setObject(i, object); 905 } 906 return i; 907 } 908 909 // queryFilter used for principals and permissions 910 @Override 911 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 912 boolean distinctDocuments, Object... params) { 913 if (dialect.needsPrepareUserReadAcls()) { 914 prepareUserReadAcls(queryFilter); 915 } 916 QueryMaker queryMaker = findQueryMaker(queryType); 917 if (queryMaker == null) { 918 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 919 } 920 if (distinctDocuments) { 921 String q = query.toLowerCase(); 922 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 923 query = "SELECT DISTINCT " + query.substring("SELECT ".length()); 924 } 925 } 926 try { 927 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 928 } catch (SQLException e) { 929 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 930 } 931 } 932 933 @Override 934 public ScrollResult scroll(String query, int batchSize, int keepAliveSeconds) { 935 if (!dialect.supportsScroll()) { 936 return defaultScroll(query); 937 } 938 checkForTimedoutScroll(); 939 return scrollSearch(query, batchSize, keepAliveSeconds); 940 } 941 942 protected void checkForTimedoutScroll() { 943 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 944 } 945 946 protected ScrollResult scrollSearch(String query, int batchSize, int keepAliveSeconds) { 947 QueryMaker queryMaker = findQueryMaker("NXQL"); 948 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 949 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, null); 950 if (q == null) { 951 logger.log("Query cannot return anything due to conflicting clauses"); 952 throw new NuxeoException("Query cannot return anything due to conflicting clauses"); 953 } 954 if (logger.isLogEnabled()) { 955 logger.logSQL(q.selectInfo.sql, q.selectParams); 956 } 957 try { 958 if (connection.getAutoCommit()) { 959 throw new NuxeoException("Scroll should be done inside a transaction"); 960 } 961 PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY, 962 ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); 963 ps.setFetchSize(batchSize); 964 int i = 1; 965 for (Serializable object : q.selectParams) { 966 setToPreparedStatement(ps, i++, object); 967 } 968 ResultSet rs = ps.executeQuery(); 969 String scrollId = UUID.randomUUID().toString(); 970 registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds); 971 return scroll(scrollId); 972 } catch (SQLException e) { 973 throw new NuxeoException("Error on query", e); 974 } 975 } 976 977 protected class CursorResult { 978 protected final int keepAliveSeconds; 979 protected final PreparedStatement preparedStatement; 980 protected final ResultSet resultSet; 981 protected final int batchSize; 982 protected long lastCallTimestamp; 983 984 CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) { 985 this.preparedStatement = preparedStatement; 986 this.resultSet = resultSet; 987 this.batchSize = batchSize; 988 this.keepAliveSeconds = keepAliveSeconds; 989 lastCallTimestamp = System.currentTimeMillis(); 990 } 991 992 boolean timedOut(String scrollId) { 993 long now = System.currentTimeMillis(); 994 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 995 if (unregisterCursor(scrollId)) { 996 log.warn("Scroll " + scrollId + " timed out"); 997 } 998 return true; 999 } 1000 return false; 1001 } 1002 1003 void touch() { 1004 lastCallTimestamp = System.currentTimeMillis(); 1005 } 1006 1007 synchronized void close() throws SQLException { 1008 if (resultSet != null) { 1009 resultSet.close(); 1010 } 1011 if (preparedStatement != null) { 1012 preparedStatement.close(); 1013 } 1014 } 1015 } 1016 1017 protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize, 1018 int keepAliveSeconds) { 1019 cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds)); 1020 } 1021 1022 protected boolean unregisterCursor(String scrollId) { 1023 CursorResult cursor = cursorResults.remove(scrollId); 1024 if (cursor != null) { 1025 try { 1026 cursor.close(); 1027 return true; 1028 } catch (SQLException e) { 1029 log.error("Failed to close cursor for scroll: " + scrollId, e); 1030 // do not propagate exception on cleaning 1031 } 1032 } 1033 return false; 1034 } 1035 1036 protected ScrollResult defaultScroll(String query) { 1037 // the database has no proper support for cursor just return everything in one batch 1038 QueryMaker queryMaker = findQueryMaker("NXQL"); 1039 List<String> ids; 1040 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 1041 try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, null)) { 1042 ids = new ArrayList<>((int) ret.size()); 1043 for (Map<String, Serializable> map : ret) { 1044 ids.add(map.get("ecm:uuid").toString()); 1045 } 1046 } catch (SQLException e) { 1047 throw new NuxeoException("Invalid scroll query: " + query, e); 1048 } 1049 return new ScrollResultImpl(NOSCROLL_ID, ids); 1050 } 1051 1052 @Override 1053 public ScrollResult scroll(String scrollId) { 1054 if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) { 1055 // there is only one batch in this case 1056 return emptyResult(); 1057 } 1058 CursorResult cursorResult = cursorResults.get(scrollId); 1059 if (cursorResult == null) { 1060 throw new NuxeoException("Unknown or timed out scrollId"); 1061 } else if (cursorResult.timedOut(scrollId)) { 1062 throw new NuxeoException("Timed out scrollId"); 1063 } 1064 cursorResult.touch(); 1065 List<String> ids = new ArrayList<>(cursorResult.batchSize); 1066 synchronized (cursorResult) { 1067 try { 1068 if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) { 1069 unregisterCursor(scrollId); 1070 return emptyResult(); 1071 } 1072 while (ids.size() < cursorResult.batchSize) { 1073 if (cursorResult.resultSet.next()) { 1074 ids.add(cursorResult.resultSet.getString(1)); 1075 } else { 1076 cursorResult.close(); 1077 if (ids.isEmpty()) { 1078 unregisterCursor(scrollId); 1079 } 1080 break; 1081 } 1082 } 1083 } catch (SQLException e) { 1084 throw new NuxeoException("Error during scroll", e); 1085 } 1086 } 1087 return new ScrollResultImpl(scrollId, ids); 1088 } 1089 1090 @Override 1091 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 1092 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 1093 if (select == null) { 1094 return getAncestorsIdsIterative(ids); 1095 } 1096 Serializable whereIds = newIdArray(ids); 1097 Set<Serializable> res = new HashSet<Serializable>(); 1098 PreparedStatement ps = null; 1099 ResultSet rs = null; 1100 try { 1101 if (logger.isLogEnabled()) { 1102 logger.logSQL(select.sql, Collections.singleton(whereIds)); 1103 } 1104 Column what = select.whatColumns.get(0); 1105 ps = connection.prepareStatement(select.sql); 1106 setToPreparedStatementIdArray(ps, 1, whereIds); 1107 rs = ps.executeQuery(); 1108 countExecute(); 1109 List<Serializable> debugIds = null; 1110 if (logger.isLogEnabled()) { 1111 debugIds = new LinkedList<Serializable>(); 1112 } 1113 while (rs.next()) { 1114 if (dialect.supportsArraysReturnInsteadOfRows()) { 1115 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 1116 for (Serializable id : resultIds) { 1117 if (id != null) { 1118 res.add(id); 1119 if (logger.isLogEnabled()) { 1120 debugIds.add(id); 1121 } 1122 } 1123 } 1124 } else { 1125 Serializable id = what.getFromResultSet(rs, 1); 1126 if (id != null) { 1127 res.add(id); 1128 if (logger.isLogEnabled()) { 1129 debugIds.add(id); 1130 } 1131 } 1132 } 1133 } 1134 if (logger.isLogEnabled()) { 1135 logger.logIds(debugIds, false, 0); 1136 } 1137 return res; 1138 } catch (SQLException e) { 1139 throw new NuxeoException("Failed to get ancestors ids", e); 1140 } finally { 1141 try { 1142 closeStatement(ps, rs); 1143 } catch (SQLException e) { 1144 log.error(e.getMessage(), e); 1145 } 1146 } 1147 } 1148 1149 /** 1150 * Uses iterative parentid selection. 1151 */ 1152 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 1153 PreparedStatement ps = null; 1154 ResultSet rs = null; 1155 try { 1156 LinkedList<Serializable> todo = new LinkedList<Serializable>(ids); 1157 Set<Serializable> done = new HashSet<Serializable>(); 1158 Set<Serializable> res = new HashSet<Serializable>(); 1159 while (!todo.isEmpty()) { 1160 done.addAll(todo); 1161 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 1162 if (logger.isLogEnabled()) { 1163 logger.logSQL(select.sql, todo); 1164 } 1165 Column what = select.whatColumns.get(0); 1166 Column where = select.whereColumns.get(0); 1167 ps = connection.prepareStatement(select.sql); 1168 int i = 1; 1169 for (Serializable id : todo) { 1170 where.setToPreparedStatement(ps, i++, id); 1171 } 1172 rs = ps.executeQuery(); 1173 countExecute(); 1174 todo = new LinkedList<Serializable>(); 1175 List<Serializable> debugIds = null; 1176 if (logger.isLogEnabled()) { 1177 debugIds = new LinkedList<Serializable>(); 1178 } 1179 while (rs.next()) { 1180 Serializable id = what.getFromResultSet(rs, 1); 1181 if (id != null) { 1182 res.add(id); 1183 if (!done.contains(id)) { 1184 todo.add(id); 1185 } 1186 if (logger.isLogEnabled()) { 1187 debugIds.add(id); 1188 } 1189 } 1190 } 1191 if (logger.isLogEnabled()) { 1192 logger.logIds(debugIds, false, 0); 1193 } 1194 rs.close(); 1195 ps.close(); 1196 } 1197 return res; 1198 } catch (SQLException e) { 1199 throw new NuxeoException("Failed to get ancestors ids", e); 1200 } finally { 1201 try { 1202 closeStatement(ps, rs); 1203 } catch (SQLException e) { 1204 log.error(e.getMessage(), e); 1205 } 1206 } 1207 } 1208 1209 @Override 1210 public void updateReadAcls() { 1211 if (!dialect.supportsReadAcl()) { 1212 return; 1213 } 1214 if (log.isDebugEnabled()) { 1215 log.debug("updateReadAcls: updating"); 1216 } 1217 Statement st = null; 1218 try { 1219 st = connection.createStatement(); 1220 String sql = dialect.getUpdateReadAclsSql(); 1221 if (logger.isLogEnabled()) { 1222 logger.log(sql); 1223 } 1224 st.execute(sql); 1225 countExecute(); 1226 } catch (SQLException e) { 1227 checkConcurrentUpdate(e); 1228 throw new NuxeoException("Failed to update read acls", e); 1229 } finally { 1230 try { 1231 closeStatement(st); 1232 } catch (SQLException e) { 1233 log.error(e.getMessage(), e); 1234 } 1235 } 1236 if (log.isDebugEnabled()) { 1237 log.debug("updateReadAcls: done."); 1238 } 1239 } 1240 1241 @Override 1242 public void rebuildReadAcls() { 1243 if (!dialect.supportsReadAcl()) { 1244 return; 1245 } 1246 log.debug("rebuildReadAcls: rebuilding ..."); 1247 Statement st = null; 1248 try { 1249 st = connection.createStatement(); 1250 String sql = dialect.getRebuildReadAclsSql(); 1251 logger.log(sql); 1252 st.execute(sql); 1253 countExecute(); 1254 } catch (SQLException e) { 1255 throw new NuxeoException("Failed to rebuild read acls", e); 1256 } finally { 1257 try { 1258 closeStatement(st); 1259 } catch (SQLException e) { 1260 log.error(e.getMessage(), e); 1261 } 1262 } 1263 log.debug("rebuildReadAcls: done."); 1264 } 1265 1266 /* 1267 * ----- Locking ----- 1268 */ 1269 1270 protected Connection connection(boolean autocommit) { 1271 try { 1272 connection.setAutoCommit(autocommit); 1273 } catch (SQLException e) { 1274 throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e); 1275 } 1276 return connection; 1277 } 1278 1279 /** 1280 * Calls the callable, inside a transaction if in cluster mode. 1281 * <p> 1282 * Called under {@link #serializationLock}. 1283 */ 1284 protected Lock callInTransaction(LockCallable callable, boolean tx) { 1285 boolean ok = false; 1286 try { 1287 if (log.isDebugEnabled()) { 1288 log.debug("callInTransaction setAutoCommit " + !tx); 1289 } 1290 connection.setAutoCommit(!tx); 1291 } catch (SQLException e) { 1292 throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e); 1293 } 1294 try { 1295 Lock result = callable.call(); 1296 ok = true; 1297 return result; 1298 } finally { 1299 if (tx) { 1300 try { 1301 try { 1302 if (ok) { 1303 if (log.isDebugEnabled()) { 1304 log.debug("callInTransaction commit"); 1305 } 1306 connection.commit(); 1307 } else { 1308 if (log.isDebugEnabled()) { 1309 log.debug("callInTransaction rollback"); 1310 } 1311 connection.rollback(); 1312 } 1313 } finally { 1314 // restore autoCommit=true 1315 if (log.isDebugEnabled()) { 1316 log.debug("callInTransaction restoring autoCommit=true"); 1317 } 1318 connection.setAutoCommit(true); 1319 } 1320 } catch (SQLException e) { 1321 throw new NuxeoException(e); 1322 } 1323 } 1324 } 1325 } 1326 1327 public interface LockCallable extends Callable<Lock> { 1328 @Override 1329 public Lock call(); 1330 } 1331 1332 @Override 1333 public Lock getLock(Serializable id) { 1334 if (log.isDebugEnabled()) { 1335 try { 1336 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1337 } catch (SQLException e) { 1338 throw new RuntimeException(e); 1339 } 1340 } 1341 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1342 Row row = readSimpleRow(rowId); 1343 return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY), 1344 (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1345 } 1346 1347 @Override 1348 public Lock setLock(final Serializable id, final Lock lock) { 1349 if (log.isDebugEnabled()) { 1350 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1351 } 1352 SetLock call = new SetLock(id, lock); 1353 return callInTransaction(call, clusteringEnabled); 1354 } 1355 1356 protected class SetLock implements LockCallable { 1357 protected final Serializable id; 1358 1359 protected final Lock lock; 1360 1361 protected SetLock(Serializable id, Lock lock) { 1362 super(); 1363 this.id = id; 1364 this.lock = lock; 1365 } 1366 1367 @Override 1368 public Lock call() { 1369 Lock oldLock = getLock(id); 1370 if (oldLock == null) { 1371 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1372 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1373 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1374 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1375 } 1376 return oldLock; 1377 } 1378 } 1379 1380 @Override 1381 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1382 if (log.isDebugEnabled()) { 1383 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1384 } 1385 RemoveLock call = new RemoveLock(id, owner, force); 1386 return callInTransaction(call, !force); 1387 } 1388 1389 protected class RemoveLock implements LockCallable { 1390 protected final Serializable id; 1391 1392 protected final String owner; 1393 1394 protected final boolean force; 1395 1396 protected RemoveLock(Serializable id, String owner, boolean force) { 1397 super(); 1398 this.id = id; 1399 this.owner = owner; 1400 this.force = force; 1401 } 1402 1403 @Override 1404 public Lock call() { 1405 Lock oldLock = force ? null : getLock(id); 1406 if (!force && owner != null) { 1407 if (oldLock == null) { 1408 // not locked, nothing to do 1409 return null; 1410 } 1411 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1412 // existing mismatched lock, flag failure 1413 return new Lock(oldLock, true); 1414 } 1415 } 1416 if (force || oldLock != null) { 1417 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1418 } 1419 return oldLock; 1420 } 1421 } 1422 1423 @Override 1424 public void markReferencedBinaries() { 1425 log.debug("Starting binaries GC mark"); 1426 Statement st = null; 1427 ResultSet rs = null; 1428 BlobManager blobManager = Framework.getService(BlobManager.class); 1429 String repositoryName = getRepositoryName(); 1430 try { 1431 st = connection.createStatement(); 1432 int i = -1; 1433 for (String sql : sqlInfo.getBinariesSql) { 1434 i++; 1435 Column col = sqlInfo.getBinariesColumns.get(i); 1436 if (logger.isLogEnabled()) { 1437 logger.log(sql); 1438 } 1439 rs = st.executeQuery(sql); 1440 countExecute(); 1441 int n = 0; 1442 while (rs.next()) { 1443 n++; 1444 String key = (String) col.getFromResultSet(rs, 1); 1445 if (key != null) { 1446 blobManager.markReferencedBinary(key, repositoryName); 1447 } 1448 } 1449 if (logger.isLogEnabled()) { 1450 logger.logCount(n); 1451 } 1452 rs.close(); 1453 } 1454 } catch (SQLException e) { 1455 throw new RuntimeException("Failed to mark binaries for gC", e); 1456 } finally { 1457 try { 1458 closeStatement(st, rs); 1459 } catch (SQLException e) { 1460 log.error(e.getMessage(), e); 1461 } 1462 } 1463 log.debug("End of binaries GC mark"); 1464 } 1465 1466 /* 1467 * ----- XAResource ----- 1468 */ 1469 1470 protected static String systemToString(Object o) { 1471 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1472 } 1473 1474 @Override 1475 public void start(Xid xid, int flags) throws XAException { 1476 try { 1477 xaresource.start(xid, flags); 1478 if (logger.isLogEnabled()) { 1479 logger.log("XA start on " + systemToString(xid)); 1480 } 1481 } catch (NuxeoException e) { 1482 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1483 } catch (XAException e) { 1484 logger.error("XA start error on " + systemToString(xid), e); 1485 throw e; 1486 } 1487 } 1488 1489 @Override 1490 public void end(Xid xid, int flags) throws XAException { 1491 try { 1492 xaresource.end(xid, flags); 1493 if (logger.isLogEnabled()) { 1494 logger.log("XA end on " + systemToString(xid)); 1495 } 1496 } catch (NullPointerException e) { 1497 // H2 when no active transaction 1498 logger.error("XA end error on " + systemToString(xid), e); 1499 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1500 } catch (XAException e) { 1501 if (flags != XAResource.TMFAIL) { 1502 logger.error("XA end error on " + systemToString(xid), e); 1503 } 1504 throw e; 1505 } 1506 } 1507 1508 @Override 1509 public int prepare(Xid xid) throws XAException { 1510 try { 1511 return xaresource.prepare(xid); 1512 } catch (XAException e) { 1513 logger.error("XA prepare error on " + systemToString(xid), e); 1514 throw e; 1515 } 1516 } 1517 1518 @Override 1519 public void commit(Xid xid, boolean onePhase) throws XAException { 1520 try { 1521 xaresource.commit(xid, onePhase); 1522 } catch (XAException e) { 1523 logger.error("XA commit error on " + systemToString(xid), e); 1524 throw e; 1525 } 1526 } 1527 1528 // rollback interacts with caches so is in RowMapper 1529 1530 @Override 1531 public void forget(Xid xid) throws XAException { 1532 xaresource.forget(xid); 1533 } 1534 1535 @Override 1536 public Xid[] recover(int flag) throws XAException { 1537 return xaresource.recover(flag); 1538 } 1539 1540 @Override 1541 public boolean setTransactionTimeout(int seconds) throws XAException { 1542 return xaresource.setTransactionTimeout(seconds); 1543 } 1544 1545 @Override 1546 public int getTransactionTimeout() throws XAException { 1547 return xaresource.getTransactionTimeout(); 1548 } 1549 1550 @Override 1551 public boolean isSameRM(XAResource xares) throws XAException { 1552 throw new UnsupportedOperationException(); 1553 } 1554 1555 @Override 1556 public boolean isConnected() { 1557 return connection != null; 1558 } 1559 1560 @Override 1561 public void connect() { 1562 openConnections(); 1563 } 1564 1565 @Override 1566 public void disconnect() { 1567 closeConnections(); 1568 } 1569 1570}