001/* 002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 * Benoit Delbosc 019 */ 020package org.nuxeo.ecm.core.storage.sql.jdbc; 021 022import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; 023 024import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult; 025 026import java.io.Serializable; 027import java.sql.Array; 028import java.sql.PreparedStatement; 029import java.sql.ResultSet; 030import java.sql.SQLDataException; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.sql.Types; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Calendar; 037import java.util.Collection; 038import java.util.Collections; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.LinkedList; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.UUID; 047import java.util.concurrent.ConcurrentHashMap; 048 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 052import org.nuxeo.ecm.core.api.IterableQueryResult; 053import org.nuxeo.ecm.core.api.NuxeoException; 054import org.nuxeo.ecm.core.api.PartialList; 055import org.nuxeo.ecm.core.api.ScrollResult; 056import org.nuxeo.ecm.core.api.ScrollResultImpl; 057import org.nuxeo.ecm.core.blob.DocumentBlobManager; 058import org.nuxeo.ecm.core.query.QueryFilter; 059import org.nuxeo.ecm.core.storage.sql.ColumnType; 060import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId; 061import org.nuxeo.ecm.core.storage.sql.Mapper; 062import org.nuxeo.ecm.core.storage.sql.Model; 063import org.nuxeo.ecm.core.storage.sql.RepositoryImpl; 064import org.nuxeo.ecm.core.storage.sql.RowId; 065import org.nuxeo.ecm.core.storage.sql.Session.PathResolver; 066import org.nuxeo.ecm.core.storage.sql.VCSClusterInvalidator; 067import org.nuxeo.ecm.core.storage.sql.VCSInvalidations; 068import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect; 069import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 070import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 071import org.nuxeo.runtime.api.Framework; 072 073/** 074 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it 075 * computes statements. 076 * <p> 077 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL 078 * statements recorded in the {@link SQLInfo}. 079 */ 080public class JDBCMapper extends JDBCRowMapper implements Mapper { 081 082 private static final Log log = LogFactory.getLog(JDBCMapper.class); 083 084 protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>(); 085 086 private final QueryMakerService queryMakerService; 087 088 private final PathResolver pathResolver; 089 090 private final RepositoryImpl repository; 091 092 protected static final String NOSCROLL_ID = "noscroll"; 093 094 /** 095 * Creates a new Mapper. 096 * 097 * @param model the model 098 * @param pathResolver the path resolver (used for startswith queries) 099 * @param sqlInfo the sql info 100 * @param clusterInvalidator the cluster invalidator 101 * @param repository the repository 102 */ 103 public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, VCSClusterInvalidator clusterInvalidator, 104 RepositoryImpl repository) { 105 super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator()); 106 this.pathResolver = pathResolver; 107 this.repository = repository; 108 queryMakerService = Framework.getService(QueryMakerService.class); 109 } 110 111 @Override 112 public void close() { 113 closeConnection(); 114 } 115 116 @Override 117 public int getTableSize(String tableName) { 118 return sqlInfo.getDatabase().getTable(tableName).getColumns().size(); 119 } 120 121 @Override 122 public int getClusterNodeIdType() { 123 return sqlInfo.getClusterNodeIdType(); 124 } 125 126 @Override 127 public void createClusterNode(Serializable nodeId) { 128 Calendar now = Calendar.getInstance(); 129 String sql = sqlInfo.getCreateClusterNodeSql(); 130 List<Column> columns = sqlInfo.getCreateClusterNodeColumns(); 131 try (PreparedStatement ps = connection.prepareStatement(sql)) { 132 if (logger.isLogEnabled()) { 133 logger.logSQL(sql, Arrays.asList(nodeId, now)); 134 } 135 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 136 columns.get(1).setToPreparedStatement(ps, 2, now); 137 ps.execute(); 138 139 } catch (SQLException e) { 140 try { 141 checkConcurrentUpdate(e); 142 } catch (ConcurrentUpdateException cue) { 143 cue.addInfo("Duplicate cluster node with id: " + nodeId 144 + " (a crashed node must be cleaned up, or the cluster configuration fixed)"); 145 throw cue; 146 } 147 throw new NuxeoException(e); 148 } 149 } 150 151 @Override 152 public void removeClusterNode(Serializable nodeId) { 153 // delete from cluster_nodes 154 String sql = sqlInfo.getDeleteClusterNodeSql(); 155 Column column = sqlInfo.getDeleteClusterNodeColumn(); 156 try (PreparedStatement ps = connection.prepareStatement(sql)) { 157 if (logger.isLogEnabled()) { 158 logger.logSQL(sql, Collections.singletonList(nodeId)); 159 } 160 column.setToPreparedStatement(ps, 1, nodeId); 161 ps.execute(); 162 // delete un-processed invals from cluster_invals 163 deleteClusterInvals(nodeId); 164 } catch (SQLException e) { 165 throw new NuxeoException(e); 166 } 167 } 168 169 protected void deleteClusterInvals(Serializable nodeId) throws SQLException { 170 String sql = sqlInfo.getDeleteClusterInvalsSql(); 171 Column column = sqlInfo.getDeleteClusterInvalsColumn(); 172 try (PreparedStatement ps = connection.prepareStatement(sql)) { 173 if (logger.isLogEnabled()) { 174 logger.logSQL(sql, Collections.singletonList(nodeId)); 175 } 176 column.setToPreparedStatement(ps, 1, nodeId); 177 int n = ps.executeUpdate(); 178 countExecute(); 179 if (logger.isLogEnabled()) { 180 logger.logCount(n); 181 } 182 } 183 } 184 185 @Override 186 public void insertClusterInvalidations(Serializable nodeId, VCSInvalidations invalidations) { 187 String sql = dialect.getClusterInsertInvalidations(); 188 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 189 try (PreparedStatement ps = connection.prepareStatement(sql)) { 190 int kind = VCSInvalidations.MODIFIED; 191 while (true) { 192 Set<RowId> rowIds = invalidations.getKindSet(kind); 193 194 // reorganize by id 195 Map<Serializable, Set<String>> res = new HashMap<>(); 196 for (RowId rowId : rowIds) { 197 Set<String> tableNames = res.get(rowId.id); 198 if (tableNames == null) { 199 res.put(rowId.id, tableNames = new HashSet<>()); 200 } 201 tableNames.add(rowId.tableName); 202 } 203 204 // do inserts 205 for (Entry<Serializable, Set<String>> en : res.entrySet()) { 206 Serializable id = en.getKey(); 207 String fragments = join(en.getValue(), ' '); 208 if (logger.isLogEnabled()) { 209 logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind))); 210 } 211 Serializable frags; 212 if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) { 213 frags = fragments.split(" "); 214 } else { 215 frags = fragments; 216 } 217 columns.get(0).setToPreparedStatement(ps, 1, nodeId); 218 columns.get(1).setToPreparedStatement(ps, 2, id); 219 columns.get(2).setToPreparedStatement(ps, 3, frags); 220 columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind)); 221 ps.execute(); 222 countExecute(); 223 } 224 if (kind == VCSInvalidations.MODIFIED) { 225 kind = VCSInvalidations.DELETED; 226 } else { 227 break; 228 } 229 } 230 } catch (SQLException e) { 231 throw new NuxeoException("Could not invalidate", e); 232 } 233 } 234 235 // join that works on a set 236 protected static String join(Collection<String> strings, char sep) { 237 if (strings.isEmpty()) { 238 throw new RuntimeException(); 239 } 240 if (strings.size() == 1) { 241 return strings.iterator().next(); 242 } 243 int size = 0; 244 for (String word : strings) { 245 size += word.length() + 1; 246 } 247 StringBuilder sb = new StringBuilder(size); 248 for (String word : strings) { 249 sb.append(word); 250 sb.append(sep); 251 } 252 sb.setLength(size - 1); 253 return sb.toString(); 254 } 255 256 @Override 257 public VCSInvalidations getClusterInvalidations(Serializable nodeId) { 258 VCSInvalidations invalidations = new VCSInvalidations(); 259 String sql = dialect.getClusterGetInvalidations(); 260 List<Column> columns = sqlInfo.getClusterInvalidationsColumns(); 261 if (logger.isLogEnabled()) { 262 logger.logSQL(sql, Collections.singletonList(nodeId)); 263 } 264 try (PreparedStatement ps = connection.prepareStatement(sql)) { 265 setToPreparedStatement(ps, 1, nodeId); 266 try (ResultSet rs = ps.executeQuery()) { 267 countExecute(); 268 while (rs.next()) { 269 // first column ignored, it's the node id 270 Serializable id = columns.get(1).getFromResultSet(rs, 1); 271 Serializable frags = columns.get(2).getFromResultSet(rs, 2); 272 int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue(); 273 String[] fragments; 274 if (dialect.supportsArrays() && frags instanceof String[]) { 275 fragments = (String[]) frags; 276 } else { 277 fragments = ((String) frags).split(" "); 278 } 279 invalidations.add(id, fragments, kind); 280 } 281 } 282 if (logger.isLogEnabled()) { 283 // logCount(n); 284 logger.log(" -> " + invalidations); 285 } 286 if (dialect.isClusteringDeleteNeeded()) { 287 deleteClusterInvals(nodeId); 288 } 289 return invalidations; 290 } catch (SQLException e) { 291 throw new NuxeoException("Could not invalidate", e); 292 } 293 } 294 295 @Override 296 public Serializable getRootId(String repositoryId) { 297 String sql = sqlInfo.getSelectRootIdSql(); 298 if (logger.isLogEnabled()) { 299 logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId)); 300 } 301 try (PreparedStatement ps = connection.prepareStatement(sql)) { 302 ps.setString(1, repositoryId); 303 try (ResultSet rs = ps.executeQuery()) { 304 countExecute(); 305 if (!rs.next()) { 306 if (logger.isLogEnabled()) { 307 logger.log(" -> (none)"); 308 } 309 return null; 310 } 311 Column column = sqlInfo.getSelectRootIdWhatColumn(); 312 Serializable id = column.getFromResultSet(rs, 1); 313 if (logger.isLogEnabled()) { 314 logger.log(" -> " + Model.MAIN_KEY + '=' + id); 315 } 316 // check that we didn't get several rows 317 if (rs.next()) { 318 throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql); 319 } 320 return id; 321 } 322 } catch (SQLException e) { 323 throw new NuxeoException("Could not select: " + sql, e); 324 } 325 } 326 327 @Override 328 public void setRootId(Serializable repositoryId, Serializable id) { 329 String sql = sqlInfo.getInsertRootIdSql(); 330 try (PreparedStatement ps = connection.prepareStatement(sql)) { 331 List<Column> columns = sqlInfo.getInsertRootIdColumns(); 332 List<Serializable> debugValues = null; 333 if (logger.isLogEnabled()) { 334 debugValues = new ArrayList<>(2); 335 } 336 int i = 0; 337 for (Column column : columns) { 338 i++; 339 String key = column.getKey(); 340 Serializable v; 341 if (key.equals(Model.MAIN_KEY)) { 342 v = id; 343 } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) { 344 v = repositoryId; 345 } else { 346 throw new RuntimeException(key); 347 } 348 column.setToPreparedStatement(ps, i, v); 349 if (debugValues != null) { 350 debugValues.add(v); 351 } 352 } 353 if (debugValues != null) { 354 logger.logSQL(sql, debugValues); 355 debugValues.clear(); 356 } 357 ps.execute(); 358 countExecute(); 359 } catch (SQLException e) { 360 throw new NuxeoException("Could not insert: " + sql, e); 361 } 362 } 363 364 protected QueryMaker findQueryMaker(String queryType) { 365 for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) { 366 QueryMaker queryMaker; 367 try { 368 queryMaker = klass.getDeclaredConstructor().newInstance(); 369 } catch (ReflectiveOperationException e) { 370 throw new NuxeoException(e); 371 } 372 if (queryMaker.accepts(queryType)) { 373 return queryMaker; 374 } 375 } 376 return null; 377 } 378 379 protected void prepareUserReadAcls(QueryFilter queryFilter) { 380 String sql = dialect.getPrepareUserReadAclsSql(); 381 Serializable principals = queryFilter.getPrincipals(); 382 if (sql == null || principals == null) { 383 return; 384 } 385 if (!dialect.supportsArrays()) { 386 principals = String.join(Dialect.ARRAY_SEP, (String[]) principals); 387 } 388 try (PreparedStatement ps = connection.prepareStatement(sql)) { 389 if (logger.isLogEnabled()) { 390 logger.logSQL(sql, Collections.singleton(principals)); 391 } 392 setToPreparedStatement(ps, 1, principals); 393 ps.execute(); 394 countExecute(); 395 } catch (SQLException e) { 396 throw new NuxeoException("Failed to prepare user read acl cache", e); 397 } 398 } 399 400 @Override 401 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, 402 boolean countTotal) { 403 return query(query, queryType, queryFilter, countTotal ? -1 : 0); 404 } 405 406 @Override 407 public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) { 408 PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo, 409 (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1)); 410 411 if (logger.isLogEnabled()) { 412 logger.logIds(result, countUpTo != 0, result.totalSize()); 413 } 414 415 return result; 416 } 417 418 // queryFilter used for principals and permissions 419 @Override 420 public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter, 421 boolean distinctDocuments, Object... params) { 422 if (dialect.needsPrepareUserReadAcls()) { 423 prepareUserReadAcls(queryFilter); 424 } 425 QueryMaker queryMaker = findQueryMaker(queryType); 426 if (queryMaker == null) { 427 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 428 } 429 query = computeDistinctDocuments(query, distinctDocuments); 430 try { 431 return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params); 432 } catch (SQLException e) { 433 throw new NuxeoException("Invalid query: " + queryType + ": " + query, e, SC_BAD_REQUEST); 434 } 435 } 436 437 @Override 438 public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType, 439 QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) { 440 query = computeDistinctDocuments(query, distinctDocuments); 441 PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo, 442 (info, rs) -> info.mapMaker.makeMap(rs), params); 443 444 if (logger.isLogEnabled()) { 445 logger.logMaps(result, countUpTo != 0, result.totalSize()); 446 } 447 448 return result; 449 } 450 451 protected String computeDistinctDocuments(String query, boolean distinctDocuments) { 452 if (distinctDocuments) { 453 String q = query.toLowerCase(); 454 if (q.startsWith("select ") && !q.startsWith("select distinct ")) { 455 // Replace "select" by "select distinct", split at "select ".length() index 456 query = "SELECT DISTINCT " + query.substring(7); 457 } 458 } 459 return query; 460 } 461 462 protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter, 463 long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) { 464 if (dialect.needsPrepareUserReadAcls()) { 465 prepareUserReadAcls(queryFilter); 466 } 467 QueryMaker queryMaker = findQueryMaker(queryType); 468 if (queryMaker == null) { 469 throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query); 470 } 471 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params); 472 473 if (q == null) { 474 logger.log("Query cannot return anything due to conflicting clauses"); 475 return new PartialList<>(Collections.emptyList(), 0); 476 } 477 long limit = queryFilter.getLimit(); 478 long offset = queryFilter.getOffset(); 479 480 if (logger.isLogEnabled()) { 481 String sql = q.selectInfo.sql; 482 if (limit != 0) { 483 sql += " -- LIMIT " + limit + " OFFSET " + offset; 484 } 485 if (countUpTo != 0) { 486 sql += " -- COUNT TOTAL UP TO " + countUpTo; 487 } 488 logger.logSQL(sql, q.selectParams); 489 } 490 491 String sql = q.selectInfo.sql; 492 493 if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) { 494 // full result set not needed for counting 495 sql = dialect.addPagingClause(sql, limit, offset); 496 limit = 0; 497 offset = 0; 498 } else if (countUpTo > 0 && dialect.supportsPaging()) { 499 // ask one more row 500 sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0); 501 } 502 503 try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, 504 ResultSet.CONCUR_READ_ONLY)) { 505 int i = 1; 506 for (Serializable object : q.selectParams) { 507 setToPreparedStatement(ps, i++, object); 508 } 509 try (ResultSet rs = ps.executeQuery()) { 510 countExecute(); 511 512 // limit/offset 513 long totalSize = -1; 514 boolean available; 515 if ((limit == 0) || (offset == 0)) { 516 available = rs.first(); 517 if (!available) { 518 totalSize = 0; 519 } 520 if (limit == 0) { 521 limit = -1; // infinite 522 } 523 } else { 524 available = rs.absolute((int) offset + 1); 525 } 526 527 List<T> projections = new LinkedList<>(); 528 int rowNum = 0; 529 while (available && (limit != 0)) { 530 try { 531 T projection = extractor.apply(q.selectInfo, rs); 532 projections.add(projection); 533 rowNum = rs.getRow(); 534 available = rs.next(); 535 limit--; 536 } catch (SQLDataException e) { 537 // actually no data available, MariaDB Connector/J lied, stop now 538 available = false; 539 } 540 } 541 542 // total size 543 if (countUpTo != 0 && (totalSize == -1)) { 544 if (!available && (rowNum != 0)) { 545 // last row read was the actual last 546 totalSize = rowNum; 547 } else { 548 // available if limit reached with some left 549 // rowNum == 0 if skipped too far 550 rs.last(); 551 totalSize = rs.getRow(); 552 } 553 if (countUpTo > 0 && totalSize > countUpTo) { 554 // the result where truncated we don't know the total size 555 totalSize = -2; 556 } 557 } 558 559 return new PartialList<>(projections, totalSize); 560 } 561 } catch (SQLException e) { 562 throw new NuxeoException("Invalid query: " + query, e, SC_BAD_REQUEST); 563 } 564 } 565 566 public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException { 567 if (object instanceof Calendar) { 568 dialect.setToPreparedStatementTimestamp(ps, i, object, null); 569 } else if (object instanceof java.sql.Date) { 570 ps.setDate(i, (java.sql.Date) object); 571 } else if (object instanceof Long) { 572 ps.setLong(i, ((Long) object).longValue()); 573 } else if (object instanceof WrappedId) { 574 dialect.setId(ps, i, object.toString()); 575 } else if (object instanceof Object[]) { 576 int jdbcType; 577 if (object instanceof String[]) { 578 jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType; 579 } else if (object instanceof Boolean[]) { 580 jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType; 581 } else if (object instanceof Long[]) { 582 jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType; 583 } else if (object instanceof Double[]) { 584 jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType; 585 } else if (object instanceof java.sql.Date[]) { 586 jdbcType = Types.DATE; 587 } else if (object instanceof java.sql.Clob[]) { 588 jdbcType = Types.CLOB; 589 } else if (object instanceof Calendar[]) { 590 jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType; 591 object = dialect.getTimestampFromCalendar((Calendar[]) object); 592 } else if (object instanceof Integer[]) { 593 jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType; 594 } else { 595 jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType; 596 } 597 Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection); 598 ps.setArray(i, array); 599 } else { 600 ps.setObject(i, object); 601 } 602 return i; 603 } 604 605 @Override 606 public ScrollResult<String> scroll(String query, int batchSize, int keepAliveSeconds) { 607 if (!dialect.supportsScroll()) { 608 return defaultScroll(query); 609 } 610 checkForTimedoutScroll(); 611 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 612 return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds); 613 } 614 615 @Override 616 public ScrollResult<String> scroll(String query, QueryFilter queryFilter, int batchSize, int keepAliveSeconds) { 617 if (!dialect.supportsScroll()) { 618 return defaultScroll(query); 619 } 620 if (dialect.needsPrepareUserReadAcls()) { 621 prepareUserReadAcls(queryFilter); 622 } 623 checkForTimedoutScroll(); 624 return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds); 625 } 626 627 protected void checkForTimedoutScroll() { 628 cursorResults.forEach((id, cursor) -> cursor.timedOut(id)); 629 } 630 631 @SuppressWarnings("resource") // PreparedStatement + ResultSet for cursor, must not be closed 632 protected ScrollResult<String> scrollSearch(String query, QueryFilter queryFilter, int batchSize, 633 int keepAliveSeconds) { 634 QueryMaker queryMaker = findQueryMaker("NXQL"); 635 QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter); 636 if (q == null) { 637 logger.log("Query cannot return anything due to conflicting clauses"); 638 throw new NuxeoException("Query cannot return anything due to conflicting clauses"); 639 } 640 if (logger.isLogEnabled()) { 641 logger.logSQL(q.selectInfo.sql, q.selectParams); 642 } 643 try { 644 if (connection.getAutoCommit()) { 645 throw new NuxeoException("Scroll should be done inside a transaction"); 646 } 647 // ps MUST NOT be auto-closed because it's referenced by a cursor 648 PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY, 649 ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); 650 ps.setFetchSize(batchSize); 651 int i = 1; 652 for (Serializable object : q.selectParams) { 653 setToPreparedStatement(ps, i++, object); 654 } 655 // rs MUST NOT be auto-closed because it's referenced by a cursor 656 ResultSet rs = ps.executeQuery(); 657 String scrollId = UUID.randomUUID().toString(); 658 registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds); 659 return scroll(scrollId); 660 } catch (SQLException e) { 661 throw new NuxeoException("Error on query", e); 662 } 663 } 664 665 protected class CursorResult { 666 protected final int keepAliveSeconds; 667 668 protected final PreparedStatement preparedStatement; 669 670 protected final ResultSet resultSet; 671 672 protected final int batchSize; 673 674 protected long lastCallTimestamp; 675 676 CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) { 677 this.preparedStatement = preparedStatement; 678 this.resultSet = resultSet; 679 this.batchSize = batchSize; 680 this.keepAliveSeconds = keepAliveSeconds; 681 lastCallTimestamp = System.currentTimeMillis(); 682 } 683 684 boolean timedOut(String scrollId) { 685 long now = System.currentTimeMillis(); 686 if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) { 687 if (unregisterCursor(scrollId)) { 688 log.warn("Scroll " + scrollId + " timed out"); 689 } 690 return true; 691 } 692 return false; 693 } 694 695 void touch() { 696 lastCallTimestamp = System.currentTimeMillis(); 697 } 698 699 synchronized void close() throws SQLException { 700 if (resultSet != null) { 701 resultSet.close(); 702 } 703 if (preparedStatement != null) { 704 preparedStatement.close(); 705 } 706 } 707 } 708 709 protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize, 710 int keepAliveSeconds) { 711 cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds)); 712 } 713 714 protected boolean unregisterCursor(String scrollId) { 715 CursorResult cursor = cursorResults.remove(scrollId); 716 if (cursor != null) { 717 try { 718 cursor.close(); 719 return true; 720 } catch (SQLException e) { 721 log.error("Failed to close cursor for scroll: " + scrollId, e); 722 // do not propagate exception on cleaning 723 } 724 } 725 return false; 726 } 727 728 protected ScrollResult<String> defaultScroll(String query) { 729 // the database has no proper support for cursor just return everything in one batch 730 QueryMaker queryMaker = findQueryMaker("NXQL"); 731 List<String> ids; 732 QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0); 733 try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) { 734 ids = new ArrayList<>((int) ret.size()); 735 for (Map<String, Serializable> map : ret) { 736 ids.add(map.get("ecm:uuid").toString()); 737 } 738 } catch (SQLException e) { 739 throw new NuxeoException("Invalid scroll query: " + query, e); 740 } 741 return new ScrollResultImpl<>(NOSCROLL_ID, ids); 742 } 743 744 @Override 745 public ScrollResult<String> scroll(String scrollId) { 746 if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) { 747 // there is only one batch in this case 748 return emptyResult(); 749 } 750 CursorResult cursorResult = cursorResults.get(scrollId); 751 if (cursorResult == null) { 752 throw new NuxeoException("Unknown or timed out scrollId"); 753 } else if (cursorResult.timedOut(scrollId)) { 754 throw new NuxeoException("Timed out scrollId"); 755 } 756 cursorResult.touch(); 757 List<String> ids = new ArrayList<>(cursorResult.batchSize); 758 synchronized (cursorResult) { 759 try { 760 if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) { 761 unregisterCursor(scrollId); 762 return emptyResult(); 763 } 764 while (ids.size() < cursorResult.batchSize) { 765 if (cursorResult.resultSet.next()) { 766 ids.add(cursorResult.resultSet.getString(1)); 767 } else { 768 cursorResult.close(); 769 if (ids.isEmpty()) { 770 unregisterCursor(scrollId); 771 } 772 break; 773 } 774 } 775 } catch (SQLException e) { 776 throw new NuxeoException("Error during scroll", e); 777 } 778 } 779 return new ScrollResultImpl<>(scrollId, ids); 780 } 781 782 @Override 783 public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) { 784 SQLInfoSelect select = sqlInfo.getSelectAncestorsIds(); 785 if (select == null) { 786 return getAncestorsIdsIterative(ids); 787 } 788 Serializable whereIds = newIdArray(ids); 789 Set<Serializable> res = new HashSet<>(); 790 if (logger.isLogEnabled()) { 791 logger.logSQL(select.sql, Collections.singleton(whereIds)); 792 } 793 Column what = select.whatColumns.get(0); 794 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 795 setToPreparedStatementIdArray(ps, 1, whereIds); 796 try (ResultSet rs = ps.executeQuery()) { 797 countExecute(); 798 List<Serializable> debugIds = null; 799 if (logger.isLogEnabled()) { 800 debugIds = new LinkedList<>(); 801 } 802 while (rs.next()) { 803 if (dialect.supportsArraysReturnInsteadOfRows()) { 804 Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1)); 805 for (Serializable id : resultIds) { 806 if (id != null) { 807 res.add(id); 808 if (logger.isLogEnabled()) { 809 debugIds.add(id); 810 } 811 } 812 } 813 } else { 814 Serializable id = what.getFromResultSet(rs, 1); 815 if (id != null) { 816 res.add(id); 817 if (logger.isLogEnabled()) { 818 debugIds.add(id); 819 } 820 } 821 } 822 } 823 if (logger.isLogEnabled()) { 824 logger.logIds(debugIds, false, 0); 825 } 826 } 827 return res; 828 } catch (SQLException e) { 829 throw new NuxeoException("Failed to get ancestors ids", e); 830 } 831 } 832 833 /** 834 * Uses iterative parentid selection. 835 */ 836 protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) { 837 try { 838 LinkedList<Serializable> todo = new LinkedList<>(ids); 839 Set<Serializable> done = new HashSet<>(); 840 Set<Serializable> res = new HashSet<>(); 841 while (!todo.isEmpty()) { 842 done.addAll(todo); 843 SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size()); 844 if (logger.isLogEnabled()) { 845 logger.logSQL(select.sql, todo); 846 } 847 Column what = select.whatColumns.get(0); 848 Column where = select.whereColumns.get(0); 849 try (PreparedStatement ps = connection.prepareStatement(select.sql)) { 850 int i = 1; 851 for (Serializable id : todo) { 852 where.setToPreparedStatement(ps, i++, id); 853 } 854 try (ResultSet rs = ps.executeQuery()) { 855 countExecute(); 856 todo = new LinkedList<>(); 857 List<Serializable> debugIds = null; 858 if (logger.isLogEnabled()) { 859 debugIds = new LinkedList<>(); 860 } 861 while (rs.next()) { 862 Serializable id = what.getFromResultSet(rs, 1); 863 if (id != null) { 864 res.add(id); 865 if (!done.contains(id)) { 866 todo.add(id); 867 } 868 if (logger.isLogEnabled()) { 869 debugIds.add(id); // NOSONAR 870 } 871 } 872 } 873 if (logger.isLogEnabled()) { 874 logger.logIds(debugIds, false, 0); 875 } 876 } 877 } 878 } 879 return res; 880 } catch (SQLException e) { 881 throw new NuxeoException("Failed to get ancestors ids", e); 882 } 883 } 884 885 @Override 886 public void updateReadAcls() { 887 if (!dialect.supportsReadAcl()) { 888 return; 889 } 890 if (log.isDebugEnabled()) { 891 log.debug("updateReadAcls: updating"); 892 } 893 try (Statement st = connection.createStatement()) { 894 String sql = dialect.getUpdateReadAclsSql(); 895 if (logger.isLogEnabled()) { 896 logger.log(sql); 897 } 898 st.execute(sql); 899 countExecute(); 900 } catch (SQLException e) { 901 checkConcurrentUpdate(e); 902 throw new NuxeoException("Failed to update read acls", e); 903 } 904 if (log.isDebugEnabled()) { 905 log.debug("updateReadAcls: done."); 906 } 907 } 908 909 @Override 910 public void rebuildReadAcls() { 911 if (!dialect.supportsReadAcl()) { 912 return; 913 } 914 log.debug("rebuildReadAcls: rebuilding ..."); 915 try (Statement st = connection.createStatement()) { 916 String sql = dialect.getRebuildReadAclsSql(); 917 logger.log(sql); 918 st.execute(sql); 919 countExecute(); 920 } catch (SQLException e) { 921 throw new NuxeoException("Failed to rebuild read acls", e); 922 } 923 log.debug("rebuildReadAcls: done."); 924 } 925 926 @Override 927 public void markReferencedBinaries() { 928 log.debug("Starting binaries GC mark"); 929 DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class); 930 String repositoryName = getRepositoryName(); 931 try (Statement st = connection.createStatement()) { 932 int i = -1; 933 for (String sql : sqlInfo.getBinariesSql) { 934 i++; 935 Column col = sqlInfo.getBinariesColumns.get(i); 936 if (logger.isLogEnabled()) { 937 logger.log(sql); 938 } 939 try (ResultSet rs = st.executeQuery(sql)) { 940 countExecute(); 941 int n = 0; 942 while (rs.next()) { 943 n++; 944 String key = (String) col.getFromResultSet(rs, 1); 945 if (key != null) { 946 blobManager.markReferencedBinary(key, repositoryName); 947 } 948 } 949 if (logger.isLogEnabled()) { 950 logger.logCount(n); 951 } 952 } 953 } 954 } catch (SQLException e) { 955 throw new RuntimeException("Failed to mark binaries for gC", e); 956 } 957 log.debug("End of binaries GC mark"); 958 } 959 960 /** 961 * @since 7.10-HF25, 8.10-HF06, 9.2 962 */ 963 @FunctionalInterface 964 protected interface BiFunctionSQLException<T, U, R> { 965 966 /** 967 * Applies this function to the given arguments. 968 * 969 * @param t the first function argument 970 * @param u the second function argument 971 * @return the function result 972 */ 973 R apply(T t, U u) throws SQLException; 974 975 } 976 977}