001/* 002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import java.io.File; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.OutputStream; 026import java.io.PrintStream; 027import java.io.Serializable; 028import java.security.MessageDigest; 029import java.security.NoSuchAlgorithmException; 030import java.sql.Array; 031import java.sql.Connection; 032import java.sql.DatabaseMetaData; 033import java.sql.PreparedStatement; 034import java.sql.ResultSet; 035import java.sql.SQLException; 036import java.sql.Statement; 037import java.sql.Types; 038import java.text.SimpleDateFormat; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Calendar; 042import java.util.Collection; 043import java.util.Collections; 044import java.util.Date; 045import java.util.HashMap; 046import java.util.HashSet; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.Map; 050import java.util.Map.Entry; 051import java.util.Set; 052import java.util.concurrent.Callable; 053 054import javax.sql.XADataSource; 055import javax.transaction.xa.XAException; 056import javax.transaction.xa.XAResource; 057import javax.transaction.xa.Xid; 058 059import org.apache.commons.logging.Log; 060import org.apache.commons.logging.LogFactory; 061import org.nuxeo.common.Environment; 062import org.nuxeo.common.utils.StringUtils; 063import org.nuxeo.ecm.core.api.IterableQueryResult; 064import org.nuxeo.ecm.core.api.Lock; 065import org.nuxeo.ecm.core.api.NuxeoException; 066import org.nuxeo.ecm.core.api.PartialList; 067import org.nuxeo.ecm.core.blob.BlobManager; 068import org.nuxeo.ecm.core.model.LockManager; 069import org.nuxeo.ecm.core.query.QueryFilter; 070import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator; 071import org.nuxeo.ecm.core.storage.sql.ColumnType; 072import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 073import org.nuxeo.ecm.core.storage.sql.Invalidations; 074import org.nuxeo.ecm.core.storage.sql.Mapper; 075import org.nuxeo.ecm.core.storage.sql.Model; 076import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor; 077import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 078import org.nuxeo.ecm.core.storage.sql.Row; 079import org.nuxeo.ecm.core.storage.sql.RowId; 080import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 081import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 082import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database; 084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table; 085import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector; 088import org.nuxeo.runtime.api.Framework; 089 090/** 091 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 092 * computes statements. 093 * <p> 094 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 095 * statements recorded in the {@link SQLInfo}. 096 */ 097public class JDBCMapper extends JDBCRowMapper implements Mapper { 098 099 private static final Log log = LogFactory.getLog(JDBCMapper.class); 100 101 public static Map<String, Serializable> testProps = new HashMap<String, Serializable>(); 102 103 public static final String TEST_UPGRADE = "testUpgrade"; 104 105 // property in sql.txt file 106 public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions"; 107 108 public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor"; 109 110 public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks"; 111 112 protected TableUpgrader tableUpgrader; 113 114 private final QueryMakerService queryMakerService; 115 116 private final PathResolver pathResolver; 117 118 private final RepositoryImpl repository; 119 120 protected boolean clusteringEnabled; 121 122 /** 123 * Creates a new Mapper. 124 * 125 * @param model the model 126 * @param pathResolver the path resolver (used for startswith queries) 127 * @param sqlInfo the sql info 128 * @param xadatasource the XA datasource to use to get connections 129 * @param clusterInvalidator the cluster invalidator 130 * @param repository 131 */ 132 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, XADataSource xadatasource, 133 ClusterInvalidator clusterInvalidator, boolean noSharing, RepositoryImpl repository) { 134 super(model, sqlInfo, xadatasource, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing); 135 this.pathResolver = pathResolver; 136 this.repository = repository; 137 clusteringEnabled = clusterInvalidator != null; 138 queryMakerService = Framework.getService(QueryMakerService.class); 139 140 tableUpgrader = new TableUpgrader(this); 141 tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions", 142 TEST_UPGRADE_VERSIONS); 143 tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR); 144 tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS); 145 146 } 147 148 @Override 149 public int getTableSize(String tableName) { 150 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 151 } 152 153 /* 154 * ----- Root ----- 155 */ 156 157 @Override 158 public void createDatabase(String ddlMode) { 159 try { 160 createTables(ddlMode); 161 } catch (SQLException e) { 162 throw new NuxeoException(e); 163 } 164 } 165 166 protected String getTableName(String origName) { 167 168 if (dialect instanceof DialectOracle) { 169 if (origName.length() > 30) { 170 171 StringBuilder sb = new StringBuilder(origName.length()); 172 173 try { 174 MessageDigest digest = MessageDigest.getInstance("MD5"); 175 sb.append(origName.substring(0, 15)); 176 sb.append('_'); 177 178 digest.update(origName.getBytes()); 179 sb.append(Dialect.toHexString(digest.digest()).substring(0, 12)); 180 181 return sb.toString(); 182 183 } catch (NoSuchAlgorithmException e) { 184 throw new RuntimeException("Error", e); 185 } 186 } 187 } 188 189 return origName; 190 } 191 192 protected void createTables(String ddlMode) throws SQLException { 193 ListCollector ddlCollector = new ListCollector(); 194 195 sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category 196 sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector); 197 sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector); 198 if (testProps.containsKey(TEST_UPGRADE)) { 199 // create "old" tables 200 sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect 201 } 202 203 String schemaName = dialect.getConnectionSchema(connection); 204 DatabaseMetaData metadata = connection.getMetaData(); 205 Set<String> tableNames = findTableNames(metadata, schemaName); 206 Database database = sqlInfo.getDatabase(); 207 Map<String, List<Column>> added = new HashMap<String, List<Column>>(); 208 209 for (Table table : database.getTables()) { 210 String tableName = getTableName(table.getPhysicalName()); 211 if (!tableNames.contains(tableName.toUpperCase())) { 212 213 /* 214 * Create missing table. 215 */ 216 217 ddlCollector.add(table.getCreateSql()); 218 ddlCollector.addAll(table.getPostCreateSqls(model)); 219 220 added.put(table.getKey(), null); // null = table created 221 222 sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE); 223 224 } else { 225 226 /* 227 * Get existing columns. 228 */ 229 230 Map<String, Integer> columnTypes = new HashMap<String, Integer>(); 231 Map<String, String> columnTypeNames = new HashMap<String, String>(); 232 Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>(); 233 try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) { 234 while (rs.next()) { 235 String schema = rs.getString("TABLE_SCHEM"); 236 if (schema != null) { // null for MySQL, doh! 237 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 238 // H2 returns some system tables (locks) 239 continue; 240 } 241 } 242 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 243 columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE"))); 244 columnTypeNames.put(columnName, rs.getString("TYPE_NAME")); 245 columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE"))); 246 } 247 } 248 249 /* 250 * Update types and create missing columns. 251 */ 252 253 List<Column> addedColumns = new LinkedList<Column>(); 254 for (Column column : table.getColumns()) { 255 String upperName = column.getPhysicalName().toUpperCase(); 256 Integer type = columnTypes.remove(upperName); 257 if (type == null) { 258 log.warn("Adding missing column in database: " + column.getFullQuotedName()); 259 ddlCollector.add(table.getAddColumnSql(column)); 260 ddlCollector.addAll(table.getPostAddSqls(column, model)); 261 addedColumns.add(column); 262 } else { 263 int expected = column.getJdbcType(); 264 int actual = type.intValue(); 265 String actualName = columnTypeNames.get(upperName); 266 Integer actualSize = columnTypeSizes.get(upperName); 267 if (!column.setJdbcType(actual, actualName, actualSize.intValue())) { 268 log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)", 269 column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, 270 actualSize)); 271 } 272 } 273 } 274 for (String col : dialect.getIgnoredColumns(table)) { 275 columnTypes.remove(col.toUpperCase()); 276 } 277 if (!columnTypes.isEmpty()) { 278 log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": " 279 + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", ")); 280 } 281 if (!addedColumns.isEmpty()) { 282 if (added.containsKey(table.getKey())) { 283 throw new AssertionError(); 284 } 285 added.put(table.getKey(), addedColumns); 286 } 287 } 288 } 289 290 if (testProps.containsKey(TEST_UPGRADE)) { 291 // create "old" content in tables 292 sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector); 293 } 294 295 // run upgrade for each table if added columns or test 296 for (Entry<String, List<Column>> en : added.entrySet()) { 297 List<Column> addedColumns = en.getValue(); 298 String tableKey = en.getKey(); 299 upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector); 300 } 301 302 sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector); 303 sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector); 304 305 // aclr_permission check for PostgreSQL 306 dialect.performAdditionalStatements(connection); 307 308 /* 309 * Execute all the collected DDL, or dump it if requested, depending on ddlMode 310 */ 311 312 // ddlMode may be: 313 // ignore (not treated here, nothing done) 314 // dump (implies execute) 315 // dump,execute 316 // dump,ignore (no execute) 317 // execute 318 // abort (implies dump) 319 // compat can be used instead of execute to always recreate stored procedures 320 321 List<String> ddl = ddlCollector.getStrings(); 322 boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE); 323 boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP); 324 boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT); 325 if (dump || abort) { 326 327 /* 328 * Dump DDL if not empty. 329 */ 330 331 if (!ddl.isEmpty()) { 332 File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql"); 333 try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) { 334 for (String sql : dialect.getDumpStart()) { 335 ps.println(sql); 336 } 337 for (String sql : ddl) { 338 sql = sql.trim(); 339 if (sql.endsWith(";")) { 340 sql = sql.substring(0, sql.length() - 1); 341 } 342 ps.println(dialect.getSQLForDump(sql)); 343 } 344 for (String sql : dialect.getDumpStop()) { 345 ps.println(sql); 346 } 347 } catch (IOException e) { 348 throw new NuxeoException(e); 349 } 350 351 /* 352 * Abort if requested. 353 */ 354 355 if (abort) { 356 log.error("Dumped DDL to: " + dumpFile); 357 throw new NuxeoException("Database initialization failed for: " + repository.getName() 358 + ", DDL must be executed: " + dumpFile); 359 } 360 } 361 } 362 if (!ignore) { 363 364 /* 365 * Execute DDL. 366 */ 367 368 try (Statement st = connection.createStatement()) { 369 for (String sql : ddl) { 370 logger.log(sql.replace("\n", "\n ")); // indented 371 try { 372 st.execute(sql); 373 } catch (SQLException e) { 374 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 375 } 376 countExecute(); 377 } 378 } 379 380 /* 381 * Execute post-DDL stuff. 382 */ 383 384 try (Statement st = connection.createStatement()) { 385 for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) { 386 logger.log(sql.replace("\n", "\n ")); // indented 387 try { 388 st.execute(sql); 389 } catch (SQLException e) { 390 throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e); 391 } 392 countExecute(); 393 } 394 } 395 } 396 } 397 398 protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector) 399 throws SQLException { 400 tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector); 401 } 402 403 /** Finds uppercase table names. */ 404 protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException { 405 Set<String> tableNames = new HashSet<String>(); 406 ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" }); 407 while (rs.next()) { 408 String tableName = rs.getString("TABLE_NAME"); 409 tableNames.add(tableName.toUpperCase()); 410 } 411 rs.close(); 412 return tableNames; 413 } 414 415 @Override 416 public int getClusterNodeIdType() { 417 return sqlInfo.getClusterNodeIdType(); 418 } 419 420 @Override 421 public void createClusterNode(Serializable nodeId) { 422 Calendar now = Calendar.getInstance(); 423 try { 424 String sql = sqlInfo.getCreateClusterNodeSql(); 425 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 426 PreparedStatement ps = connection.prepareStatement(sql); 427 try { 428 if (logger.isLogEnabled()) { 429 logger.logSQL(sql, Arrays.asList(nodeId, now)); 430 } 431 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 432 columns.get(1).setToPreparedStatement(ps, 2, now); 433 ps.execute(); 434 } finally { 435 closeStatement(ps); 436 } 437 } catch (SQLException e) { 438 throw new NuxeoException(e); 439 } 440 } 441 442 @Override 443 public void removeClusterNode(Serializable nodeId) { 444 try { 445 // delete from cluster_nodes 446 String sql = sqlInfo.getDeleteClusterNodeSql(); 447 Column column = sqlInfo.getDeleteClusterNodeColumn(); 448 PreparedStatement ps = connection.prepareStatement(sql); 449 try { 450 if (logger.isLogEnabled()) { 451 logger.logSQL(sql, Arrays.asList(nodeId)); 452 } 453 column.setToPreparedStatement(ps, 1, nodeId); 454 ps.execute(); 455 } finally { 456 closeStatement(ps); 457 } 458 // delete un-processed invals from cluster_invals 459 deleteClusterInvals(nodeId); 460 } catch (SQLException e) { 461 throw new NuxeoException(e); 462 } 463 } 464 465 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 466 String sql = sqlInfo.getDeleteClusterInvalsSql(); 467 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 468 PreparedStatement ps = connection.prepareStatement(sql); 469 try { 470 if (logger.isLogEnabled()) { 471 logger.logSQL(sql, Arrays.asList(nodeId)); 472 } 473 column.setToPreparedStatement(ps, 1, nodeId); 474 int n = ps.executeUpdate(); 475 countExecute(); 476 if (logger.isLogEnabled()) { 477 logger.logCount(n); 478 } 479 } finally { 480 try { 481 closeStatement(ps); 482 } catch (SQLException e) { 483 log.error("deleteClusterInvals: " + e.getMessage(), e); 484 } 485 } 486 } 487 488 @Override 489 public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) { 490 String sql = dialect.getClusterInsertInvalidations(); 491 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 492 PreparedStatement ps = null; 493 try { 494 ps = connection.prepareStatement(sql); 495 int kind = Invalidations.MODIFIED; 496 while (true) { 497 Set<RowId> rowIds = invalidations.getKindSet(kind); 498 499 // reorganize by id 500 Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>(); 501 for (RowId rowId : rowIds) { 502 Set<String> tableNames = res.get(rowId.id); 503 if (tableNames == null) { 504 res.put(rowId.id, tableNames = new HashSet<String>()); 505 } 506 tableNames.add(rowId.tableName); 507 } 508 509 // do inserts 510 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 511 Serializable id = en.getKey(); 512 String fragments = join(en.getValue(), ' '); 513 if (logger.isLogEnabled()) { 514 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 515 } 516 Serializable frags; 517 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 518 frags = fragments.split(" "); 519 } else { 520 frags = fragments; 521 } 522 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 523 columns.get(1).setToPreparedStatement(ps, 2, id); 524 columns.get(2).setToPreparedStatement(ps, 3, frags); 525 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 526 ps.execute(); 527 countExecute(); 528 } 529 if (kind == Invalidations.MODIFIED) { 530 kind = Invalidations.DELETED; 531 } else { 532 break; 533 } 534 } 535 } catch (SQLException e) { 536 throw new NuxeoException("Could not invalidate", e); 537 } finally { 538 try { 539 closeStatement(ps); 540 } catch (SQLException e) { 541 log.error(e.getMessage(), e); 542 } 543 } 544 } 545 546 // join that works on a set 547 protected static final String join(Collection<String> strings, char sep) { 548 if (strings.isEmpty()) { 549 throw new RuntimeException(); 550 } 551 if (strings.size() == 1) { 552 return strings.iterator().next(); 553 } 554 int size = 0; 555 for (String word : strings) { 556 size += word.length() + 1; 557 } 558 StringBuilder buf = new StringBuilder(size); 559 for (String word : strings) { 560 buf.append(word); 561 buf.append(sep); 562 } 563 buf.setLength(size - 1); 564 return buf.toString(); 565 } 566 567 @Override 568 public Invalidations getClusterInvalidations(Serializable nodeId) { 569 Invalidations invalidations = new Invalidations(); 570 String sql = dialect.getClusterGetInvalidations(); 571 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 572 try { 573 if (logger.isLogEnabled()) { 574 logger.logSQL(sql, Arrays.asList(nodeId)); 575 } 576 PreparedStatement ps = connection.prepareStatement(sql); 577 ResultSet rs = null; 578 try { 579 setToPreparedStatement(ps, 1, nodeId); 580 rs = ps.executeQuery(); 581 countExecute(); 582 while (rs.next()) { 583 // first column ignored, it's the node id 584 Serializable id = columns.get(1).getFromResultSet(rs, 1); 585 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 586 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 587 String[] fragments; 588 if (dialect.supportsArrays() && frags instanceof String[]) { 589 fragments = (String[]) frags; 590 } else { 591 fragments = ((String) frags).split(" "); 592 } 593 invalidations.add(id, fragments, kind); 594 } 595 } finally { 596 closeStatement(ps, rs); 597 } 598 if (logger.isLogEnabled()) { 599 // logCount(n); 600 logger.log(" -> " + invalidations); 601 } 602 if (dialect.isClusteringDeleteNeeded()) { 603 deleteClusterInvals(nodeId); 604 } 605 return invalidations; 606 } catch (SQLException e) { 607 throw new NuxeoException("Could not invalidate", e); 608 } 609 } 610 611 @Override 612 public Serializable getRootId(String repositoryId) { 613 String sql = sqlInfo.getSelectRootIdSql(); 614 try { 615 if (logger.isLogEnabled()) { 616 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 617 } 618 PreparedStatement ps = connection.prepareStatement(sql); 619 ResultSet rs = null; 620 try { 621 ps.setString(1, repositoryId); 622 rs = ps.executeQuery(); 623 countExecute(); 624 if (!rs.next()) { 625 if (logger.isLogEnabled()) { 626 logger.log(" -> (none)"); 627 } 628 return null; 629 } 630 Column column = sqlInfo.getSelectRootIdWhatColumn(); 631 Serializable id = column.getFromResultSet(rs, 1); 632 if (logger.isLogEnabled()) { 633 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 634 } 635 // check that we didn't get several rows 636 if (rs.next()) { 637 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 638 } 639 return id; 640 } finally { 641 closeStatement(ps, rs); 642 } 643 } catch (SQLException e) { 644 throw new NuxeoException("Could not select: " + sql, e); 645 } 646 } 647 648 @Override 649 public void setRootId(Serializable repositoryId, Serializable id) { 650 String sql = sqlInfo.getInsertRootIdSql(); 651 try { 652 PreparedStatement ps = connection.prepareStatement(sql); 653 try { 654 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 655 List<Serializable> debugValues = null; 656 if (logger.isLogEnabled()) { 657 debugValues = new ArrayList<Serializable>(2); 658 } 659 int i = 0; 660 for (Column column : columns) { 661 i++; 662 String key = column.getKey(); 663 Serializable v; 664 if (key.equals(Model.MAIN_KEY)) { 665 v = id; 666 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 667 v = repositoryId; 668 } else { 669 throw new RuntimeException(key); 670 } 671 column.setToPreparedStatement(ps, i, v); 672 if (debugValues != null) { 673 debugValues.add(v); 674 } 675 } 676 if (debugValues != null) { 677 logger.logSQL(sql, debugValues); 678 debugValues.clear(); 679 } 680 ps.execute(); 681 countExecute(); 682 } finally { 683 closeStatement(ps); 684 } 685 } catch (SQLException e) { 686 throw new NuxeoException("Could not insert: " + sql, e); 687 } 688 } 689 690 protected QueryMaker findQueryMaker(String queryType) { 691 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 692 QueryMaker queryMaker; 693 try { 694 queryMaker = klass.newInstance(); 695 } catch (ReflectiveOperationException e) { 696 throw new NuxeoException(e); 697 } 698 if (queryMaker.accepts(queryType)) { 699 return queryMaker; 700 } 701 } 702 return null; 703 } 704 705 protected void prepareUserReadAcls(QueryFilter queryFilter) { 706 String sql = dialect.getPrepareUserReadAclsSql(); 707 Serializable principals = queryFilter.getPrincipals(); 708 if (sql == null || principals == null) { 709 return; 710 } 711 if (!dialect.supportsArrays()) { 712 principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP); 713 } 714 PreparedStatement ps = null; 715 try { 716 ps = connection.prepareStatement(sql); 717 if (logger.isLogEnabled()) { 718 logger.logSQL(sql, Collections.singleton(principals)); 719 } 720 setToPreparedStatement(ps, 1, principals); 721 ps.execute(); 722 countExecute(); 723 } catch (SQLException e) { 724 throw new NuxeoException("Failed to prepare user read acl cache", e); 725 } finally { 726 try { 727 closeStatement(ps); 728 } catch (SQLException e) { 729 log.error(e.getMessage(), e); 730 } 731 } 732 } 733 734 @Override 735 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 736 boolean countTotal) { 737 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 738 } 739 740 @Override 741 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 742 if (dialect.needsPrepareUserReadAcls()) { 743 prepareUserReadAcls(queryFilter); 744 } 745 QueryMaker queryMaker = findQueryMaker(queryType); 746 if (queryMaker == null) { 747 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 748 } 749 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 750 751 if (q == null) { 752 logger.log("Query cannot return anything due to conflicting clauses"); 753 return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0); 754 } 755 long limit = queryFilter.getLimit(); 756 long offset = queryFilter.getOffset(); 757 758 if (logger.isLogEnabled()) { 759 String sql = q.selectInfo.sql; 760 if (limit != 0) { 761 sql += " -- LIMIT " + limit + " OFFSET " + offset; 762 } 763 if (countUpTo != 0) { 764 sql += " -- COUNT TOTAL UP TO " + countUpTo; 765 } 766 logger.logSQL(sql, q.selectParams); 767 } 768 769 String sql = q.selectInfo.sql; 770 771 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 772 // full result set not needed for counting 773 sql = dialect.addPagingClause(sql, limit, offset); 774 limit = 0; 775 offset = 0; 776 } else if (countUpTo > 0 && dialect.supportsPaging()) { 777 // ask one more row 778 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 779 } 780 781 PreparedStatement ps = null; 782 ResultSet rs = null; 783 try { 784 ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); 785 int i = 1; 786 for (Serializable object : q.selectParams) { 787 setToPreparedStatement(ps, i++, object); 788 } 789 rs = ps.executeQuery(); 790 countExecute(); 791 792 // limit/offset 793 long totalSize = -1; 794 boolean available; 795 if ((limit == 0) || (offset == 0)) { 796 available = rs.first(); 797 if (!available) { 798 totalSize = 0; 799 } 800 if (limit == 0) { 801 limit = -1; // infinite 802 } 803 } else { 804 available = rs.absolute((int) offset + 1); 805 } 806 807 Column column = q.selectInfo.whatColumns.get(0); 808 List<Serializable> ids = new LinkedList<Serializable>(); 809 int rowNum = 0; 810 while (available && (limit != 0)) { 811 Serializable id = column.getFromResultSet(rs, 1); 812 ids.add(id); 813 rowNum = rs.getRow(); 814 available = rs.next(); 815 limit--; 816 } 817 818 // total size 819 if (countUpTo != 0 && (totalSize == -1)) { 820 if (!available && (rowNum != 0)) { 821 // last row read was the actual last 822 totalSize = rowNum; 823 } else { 824 // available if limit reached with some left 825 // rowNum == 0 if skipped too far 826 rs.last(); 827 totalSize = rs.getRow(); 828 } 829 if (countUpTo > 0 && totalSize > countUpTo) { 830 // the result where truncated we don't know the total size 831 totalSize = -2; 832 } 833 } 834 835 if (logger.isLogEnabled()) { 836 logger.logIds(ids, countUpTo != 0, totalSize); 837 } 838 839 return new PartialList<Serializable>(ids, totalSize); 840 } catch (SQLException e) { 841 throw new NuxeoException("Invalid query: " + query, e); 842 } finally { 843 try { 844 closeStatement(ps, rs); 845 } catch (SQLException e) { 846 log.error("Cannot close connection", e); 847 } 848 } 849 } 850 851 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 852 if (object instanceof Calendar) { 853 Calendar cal = (Calendar) object; 854 ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal); 855 } else if (object instanceof java.sql.Date) { 856 ps.setDate(i, (java.sql.Date) object); 857 } else if (object instanceof Long) { 858 ps.setLong(i, ((Long) object).longValue()); 859 } else if (object instanceof WrappedId) { 860 dialect.setId(ps, i, object.toString()); 861 } else if (object instanceof Object[]) { 862 int jdbcType; 863 if (object instanceof String[]) { 864 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 865 } else if (object instanceof Boolean[]) { 866 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 867 } else if (object instanceof Long[]) { 868 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 869 } else if (object instanceof Double[]) { 870 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 871 } else if (object instanceof java.sql.Date[]) { 872 jdbcType = Types.DATE; 873 } else if (object instanceof java.sql.Clob[]) { 874 jdbcType = Types.CLOB; 875 } else if (object instanceof Calendar[]) { 876 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 877 object = dialect.getTimestampFromCalendar((Calendar) object); 878 } else if (object instanceof Integer[]) { 879 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 880 } else { 881 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 882 } 883 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 884 ps.setArray(i, array); 885 } else { 886 ps.setObject(i, object); 887 } 888 return i; 889 } 890 891 // queryFilter used for principals and permissions 892 @Override 893 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 894 boolean distinctDocuments, Object... params) { 895 if (dialect.needsPrepareUserReadAcls()) { 896 prepareUserReadAcls(queryFilter); 897 } 898 QueryMaker queryMaker = findQueryMaker(queryType); 899 if (queryMaker == null) { 900 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 901 } 902 if (distinctDocuments) { 903 String q = query.toLowerCase(); 904 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 905 query = "SELECT DISTINCT " + query.substring("SELECT ".length()); 906 } 907 } 908 try { 909 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 910 } catch (SQLException e) { 911 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e); 912 } 913 } 914 915 @Override 916 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 917 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 918 if (select == null) { 919 return getAncestorsIdsIterative(ids); 920 } 921 Serializable whereIds = newIdArray(ids); 922 Set<Serializable> res = new HashSet<Serializable>(); 923 PreparedStatement ps = null; 924 ResultSet rs = null; 925 try { 926 if (logger.isLogEnabled()) { 927 logger.logSQL(select.sql, Collections.singleton(whereIds)); 928 } 929 Column what = select.whatColumns.get(0); 930 ps = connection.prepareStatement(select.sql); 931 setToPreparedStatementIdArray(ps, 1, whereIds); 932 rs = ps.executeQuery(); 933 countExecute(); 934 List<Serializable> debugIds = null; 935 if (logger.isLogEnabled()) { 936 debugIds = new LinkedList<Serializable>(); 937 } 938 while (rs.next()) { 939 if (dialect.supportsArraysReturnInsteadOfRows()) { 940 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 941 for (Serializable id : resultIds) { 942 if (id != null) { 943 res.add(id); 944 if (logger.isLogEnabled()) { 945 debugIds.add(id); 946 } 947 } 948 } 949 } else { 950 Serializable id = what.getFromResultSet(rs, 1); 951 if (id != null) { 952 res.add(id); 953 if (logger.isLogEnabled()) { 954 debugIds.add(id); 955 } 956 } 957 } 958 } 959 if (logger.isLogEnabled()) { 960 logger.logIds(debugIds, false, 0); 961 } 962 return res; 963 } catch (SQLException e) { 964 throw new NuxeoException("Failed to get ancestors ids", e); 965 } finally { 966 try { 967 closeStatement(ps, rs); 968 } catch (SQLException e) { 969 log.error(e.getMessage(), e); 970 } 971 } 972 } 973 974 /** 975 * Uses iterative parentid selection. 976 */ 977 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 978 PreparedStatement ps = null; 979 ResultSet rs = null; 980 try { 981 LinkedList<Serializable> todo = new LinkedList<Serializable>(ids); 982 Set<Serializable> done = new HashSet<Serializable>(); 983 Set<Serializable> res = new HashSet<Serializable>(); 984 while (!todo.isEmpty()) { 985 done.addAll(todo); 986 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 987 if (logger.isLogEnabled()) { 988 logger.logSQL(select.sql, todo); 989 } 990 Column what = select.whatColumns.get(0); 991 Column where = select.whereColumns.get(0); 992 ps = connection.prepareStatement(select.sql); 993 int i = 1; 994 for (Serializable id : todo) { 995 where.setToPreparedStatement(ps, i++, id); 996 } 997 rs = ps.executeQuery(); 998 countExecute(); 999 todo = new LinkedList<Serializable>(); 1000 List<Serializable> debugIds = null; 1001 if (logger.isLogEnabled()) { 1002 debugIds = new LinkedList<Serializable>(); 1003 } 1004 while (rs.next()) { 1005 Serializable id = what.getFromResultSet(rs, 1); 1006 if (id != null) { 1007 res.add(id); 1008 if (!done.contains(id)) { 1009 todo.add(id); 1010 } 1011 if (logger.isLogEnabled()) { 1012 debugIds.add(id); 1013 } 1014 } 1015 } 1016 if (logger.isLogEnabled()) { 1017 logger.logIds(debugIds, false, 0); 1018 } 1019 rs.close(); 1020 ps.close(); 1021 } 1022 return res; 1023 } catch (SQLException e) { 1024 throw new NuxeoException("Failed to get ancestors ids", e); 1025 } finally { 1026 try { 1027 closeStatement(ps, rs); 1028 } catch (SQLException e) { 1029 log.error(e.getMessage(), e); 1030 } 1031 } 1032 } 1033 1034 @Override 1035 public void updateReadAcls() { 1036 if (!dialect.supportsReadAcl()) { 1037 return; 1038 } 1039 if (log.isDebugEnabled()) { 1040 log.debug("updateReadAcls: updating"); 1041 } 1042 Statement st = null; 1043 try { 1044 st = connection.createStatement(); 1045 String sql = dialect.getUpdateReadAclsSql(); 1046 if (logger.isLogEnabled()) { 1047 logger.log(sql); 1048 } 1049 st.execute(sql); 1050 countExecute(); 1051 } catch (SQLException e) { 1052 throw new NuxeoException("Failed to update read acls", e); 1053 } finally { 1054 try { 1055 closeStatement(st); 1056 } catch (SQLException e) { 1057 log.error(e.getMessage(), e); 1058 } 1059 } 1060 if (log.isDebugEnabled()) { 1061 log.debug("updateReadAcls: done."); 1062 } 1063 } 1064 1065 @Override 1066 public void rebuildReadAcls() { 1067 if (!dialect.supportsReadAcl()) { 1068 return; 1069 } 1070 log.debug("rebuildReadAcls: rebuilding ..."); 1071 Statement st = null; 1072 try { 1073 st = connection.createStatement(); 1074 String sql = dialect.getRebuildReadAclsSql(); 1075 logger.log(sql); 1076 st.execute(sql); 1077 countExecute(); 1078 } catch (SQLException e) { 1079 throw new NuxeoException("Failed to rebuild read acls", e); 1080 } finally { 1081 try { 1082 closeStatement(st); 1083 } catch (SQLException e) { 1084 log.error(e.getMessage(), e); 1085 } 1086 } 1087 log.debug("rebuildReadAcls: done."); 1088 } 1089 1090 /* 1091 * ----- Locking ----- 1092 */ 1093 1094 protected Connection connection(boolean autocommit) { 1095 try { 1096 connection.setAutoCommit(autocommit); 1097 } catch (SQLException e) { 1098 throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e); 1099 } 1100 return connection; 1101 } 1102 1103 /** 1104 * Calls the callable, inside a transaction if in cluster mode. 1105 * <p> 1106 * Called under {@link #serializationLock}. 1107 */ 1108 protected Lock callInTransaction(LockCallable callable, boolean tx) { 1109 boolean ok = false; 1110 try { 1111 if (log.isDebugEnabled()) { 1112 log.debug("callInTransaction setAutoCommit " + !tx); 1113 } 1114 connection.setAutoCommit(!tx); 1115 } catch (SQLException e) { 1116 throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e); 1117 } 1118 try { 1119 Lock result = callable.call(); 1120 ok = true; 1121 return result; 1122 } finally { 1123 if (tx) { 1124 try { 1125 try { 1126 if (ok) { 1127 if (log.isDebugEnabled()) { 1128 log.debug("callInTransaction commit"); 1129 } 1130 connection.commit(); 1131 } else { 1132 if (log.isDebugEnabled()) { 1133 log.debug("callInTransaction rollback"); 1134 } 1135 connection.rollback(); 1136 } 1137 } finally { 1138 // restore autoCommit=true 1139 if (log.isDebugEnabled()) { 1140 log.debug("callInTransaction restoring autoCommit=true"); 1141 } 1142 connection.setAutoCommit(true); 1143 } 1144 } catch (SQLException e) { 1145 throw new NuxeoException(e); 1146 } 1147 } 1148 } 1149 } 1150 1151 public interface LockCallable extends Callable<Lock> { 1152 @Override 1153 public Lock call(); 1154 } 1155 1156 @Override 1157 public Lock getLock(Serializable id) { 1158 if (log.isDebugEnabled()) { 1159 try { 1160 log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit()); 1161 } catch (SQLException e) { 1162 throw new RuntimeException(e); 1163 } 1164 } 1165 RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id); 1166 Row row = readSimpleRow(rowId); 1167 return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY), 1168 (Calendar) row.get(Model.LOCK_CREATED_KEY)); 1169 } 1170 1171 @Override 1172 public Lock setLock(final Serializable id, final Lock lock) { 1173 if (log.isDebugEnabled()) { 1174 log.debug("setLock " + id + " owner=" + lock.getOwner()); 1175 } 1176 SetLock call = new SetLock(id, lock); 1177 return callInTransaction(call, clusteringEnabled); 1178 } 1179 1180 protected class SetLock implements LockCallable { 1181 protected final Serializable id; 1182 1183 protected final Lock lock; 1184 1185 protected SetLock(Serializable id, Lock lock) { 1186 super(); 1187 this.id = id; 1188 this.lock = lock; 1189 } 1190 1191 @Override 1192 public Lock call() { 1193 Lock oldLock = getLock(id); 1194 if (oldLock == null) { 1195 Row row = new Row(Model.LOCK_TABLE_NAME, id); 1196 row.put(Model.LOCK_OWNER_KEY, lock.getOwner()); 1197 row.put(Model.LOCK_CREATED_KEY, lock.getCreated()); 1198 insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row)); 1199 } 1200 return oldLock; 1201 } 1202 } 1203 1204 @Override 1205 public Lock removeLock(final Serializable id, final String owner, final boolean force) { 1206 if (log.isDebugEnabled()) { 1207 log.debug("removeLock " + id + " owner=" + owner + " force=" + force); 1208 } 1209 RemoveLock call = new RemoveLock(id, owner, force); 1210 return callInTransaction(call, !force); 1211 } 1212 1213 protected class RemoveLock implements LockCallable { 1214 protected final Serializable id; 1215 1216 protected final String owner; 1217 1218 protected final boolean force; 1219 1220 protected RemoveLock(Serializable id, String owner, boolean force) { 1221 super(); 1222 this.id = id; 1223 this.owner = owner; 1224 this.force = force; 1225 } 1226 1227 @Override 1228 public Lock call() { 1229 Lock oldLock = force ? null : getLock(id); 1230 if (!force && owner != null) { 1231 if (oldLock == null) { 1232 // not locked, nothing to do 1233 return null; 1234 } 1235 if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) { 1236 // existing mismatched lock, flag failure 1237 return new Lock(oldLock, true); 1238 } 1239 } 1240 if (force || oldLock != null) { 1241 deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id)); 1242 } 1243 return oldLock; 1244 } 1245 } 1246 1247 @Override 1248 public void markReferencedBinaries() { 1249 log.debug("Starting binaries GC mark"); 1250 Statement st = null; 1251 ResultSet rs = null; 1252 BlobManager blobManager = Framework.getService(BlobManager.class); 1253 String repositoryName = getRepositoryName(); 1254 try { 1255 st = connection.createStatement(); 1256 int i = -1; 1257 for (String sql : sqlInfo.getBinariesSql) { 1258 i++; 1259 Column col = sqlInfo.getBinariesColumns.get(i); 1260 if (logger.isLogEnabled()) { 1261 logger.log(sql); 1262 } 1263 rs = st.executeQuery(sql); 1264 countExecute(); 1265 int n = 0; 1266 while (rs.next()) { 1267 n++; 1268 String key = (String) col.getFromResultSet(rs, 1); 1269 if (key != null) { 1270 blobManager.markReferencedBinary(key, repositoryName); 1271 } 1272 } 1273 if (logger.isLogEnabled()) { 1274 logger.logCount(n); 1275 } 1276 rs.close(); 1277 } 1278 } catch (SQLException e) { 1279 throw new RuntimeException("Failed to mark binaries for gC", e); 1280 } finally { 1281 try { 1282 closeStatement(st, rs); 1283 } catch (SQLException e) { 1284 log.error(e.getMessage(), e); 1285 } 1286 } 1287 log.debug("End of binaries GC mark"); 1288 } 1289 1290 /* 1291 * ----- XAResource ----- 1292 */ 1293 1294 protected static String systemToString(Object o) { 1295 return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o)); 1296 } 1297 1298 @Override 1299 public void start(Xid xid, int flags) throws XAException { 1300 try { 1301 xaresource.start(xid, flags); 1302 if (logger.isLogEnabled()) { 1303 logger.log("XA start on " + systemToString(xid)); 1304 } 1305 } catch (NuxeoException e) { 1306 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1307 } catch (XAException e) { 1308 logger.error("XA start error on " + systemToString(xid), e); 1309 throw e; 1310 } 1311 } 1312 1313 @Override 1314 public void end(Xid xid, int flags) throws XAException { 1315 try { 1316 xaresource.end(xid, flags); 1317 if (logger.isLogEnabled()) { 1318 logger.log("XA end on " + systemToString(xid)); 1319 } 1320 } catch (NullPointerException e) { 1321 // H2 when no active transaction 1322 logger.error("XA end error on " + systemToString(xid), e); 1323 throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e); 1324 } catch (XAException e) { 1325 if (flags != XAResource.TMFAIL) { 1326 logger.error("XA end error on " + systemToString(xid), e); 1327 } 1328 throw e; 1329 } 1330 } 1331 1332 @Override 1333 public int prepare(Xid xid) throws XAException { 1334 try { 1335 return xaresource.prepare(xid); 1336 } catch (XAException e) { 1337 logger.error("XA prepare error on " + systemToString(xid), e); 1338 throw e; 1339 } 1340 } 1341 1342 @Override 1343 public void commit(Xid xid, boolean onePhase) throws XAException { 1344 try { 1345 xaresource.commit(xid, onePhase); 1346 } catch (XAException e) { 1347 logger.error("XA commit error on " + systemToString(xid), e); 1348 throw e; 1349 } 1350 } 1351 1352 // rollback interacts with caches so is in RowMapper 1353 1354 @Override 1355 public void forget(Xid xid) throws XAException { 1356 xaresource.forget(xid); 1357 } 1358 1359 @Override 1360 public Xid[] recover(int flag) throws XAException { 1361 return xaresource.recover(flag); 1362 } 1363 1364 @Override 1365 public boolean setTransactionTimeout(int seconds) throws XAException { 1366 return xaresource.setTransactionTimeout(seconds); 1367 } 1368 1369 @Override 1370 public int getTransactionTimeout() throws XAException { 1371 return xaresource.getTransactionTimeout(); 1372 } 1373 1374 @Override 1375 public boolean isSameRM(XAResource xares) throws XAException { 1376 throw new UnsupportedOperationException(); 1377 } 1378 1379 @Override 1380 public boolean isConnected() { 1381 return connection != null; 1382 } 1383 1384 @Override 1385 public void connect() { 1386 openConnections(); 1387 } 1388 1389 @Override 1390 public void disconnect() { 1391 closeConnections(); 1392 } 1393 1394}