001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql.kv; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 023import static org.apache.commons.lang3.StringUtils.isBlank; 024 025import java.io.Serializable; 026import java.nio.charset.CharacterCodingException; 027import java.sql.Connection; 028import java.sql.DatabaseMetaData; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.function.BiConsumer; 040import java.util.stream.Stream; 041 042import org.apache.commons.lang3.StringUtils; 043import org.apache.logging.log4j.LogManager; 044import org.apache.logging.log4j.Logger; 045import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.storage.sql.ColumnType; 048import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger; 049import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 050import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl; 051import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 052import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 053import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectPostgreSQL; 054import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectSQLServer; 055import org.nuxeo.runtime.RuntimeMessage; 056import org.nuxeo.runtime.RuntimeMessage.Level; 057import org.nuxeo.runtime.RuntimeMessage.Source; 058import org.nuxeo.runtime.api.Framework; 059import org.nuxeo.runtime.datasource.ConnectionHelper; 060import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 061import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 062 063/** 064 * SQL implementation of a Key/Value Store Provider. 065 * <p> 066 * The following configuration properties are available: 067 * <ul> 068 * <li>datasource: the datasource to use. 069 * <li>table: the table to use. The default is the Store name. 070 * </ul> 071 * If a namespace is specified, it is used as a table name suffix, otherwise of the store name. 072 * <p> 073 * This implementation uses a table with a KEY column (unique and not NULL), and for the value one of these three 074 * columns is used: LONG, STRING, BYTES. If possible LONG is used, then STRING, otherwise BYTES. 075 * <p> 076 * The TTL is stored as an expiration time (seconds since epoch) in its own column. Expiration is done by a thread 077 * running a cleanup DELETE query every 60 seconds. 078 * 079 * @since 10.10 080 */ 081public class SQLKeyValueStore extends AbstractKeyValueStoreProvider { 082 083 private static final Logger log = LogManager.getLogger(SQLKeyValueStore.class); 084 085 /** Datasource configuration property. */ 086 public static final String DATASOURCE_PROP = "datasource"; 087 088 /** Table configuration property. Default is the store name. The namespace is also used for disambiguation. */ 089 public static final String TABLE_PROP = "table"; 090 091 /** Key column, a short string. */ 092 public static final String KEY_COL = "key"; 093 094 /** Long column, or NULL if the value is not representable as a Long. */ 095 public static final String LONG_COL = "long"; 096 097 /** String column, or NULL if the value is representable as a Long or not representable as a String. */ 098 public static final String STRING_COL = "string"; 099 100 /** Bytes column, or NULL if the value is representable as a Long or String. */ 101 public static final String BYTES_COL = "bytes"; 102 103 /** TTL column, holding expiration date in seconds since epoch, or NULL if there is no expiration. */ 104 public static final String TTL_COL = "ttl"; 105 106 protected static final int TTL_EXPIRATION_FREQUENCY_MS = 60_000; // 60 seconds 107 108 // maximum number of retries in case of concurrency 109 protected static final int MAX_RETRY = 5; 110 111 protected JDBCLogger logger; 112 113 protected String dataSourceName; 114 115 protected Dialect dialect; 116 117 protected TableImpl table; 118 119 protected Column keyCol; 120 121 protected Column longCol; 122 123 protected Column stringCol; 124 125 protected Column bytesCol; 126 127 protected Column ttlCol; 128 129 protected String tableName; 130 131 protected String keyColName; 132 133 protected String longColName; 134 135 protected String stringColName; 136 137 protected String bytesColName; 138 139 protected String ttlColName; 140 141 protected Thread ttlThread; 142 143 protected String getSQL; 144 145 protected String getMultiSQL; 146 147 protected String getLongSQL; 148 149 protected String deleteAllSQL; 150 151 protected String deleteSQL; 152 153 protected String deleteIfLongSQL; 154 155 protected String deleteIfStringSQL; 156 157 protected String deleteIfBytesSQL; 158 159 protected String expireSQL; 160 161 protected String keyStreamSQL; 162 163 protected String keyStreamPrefixSQL; 164 165 protected String setTTLSQL; 166 167 protected String existsSQL; 168 169 protected String insertSQL; 170 171 protected String insertLongSQL; 172 173 protected String updateLongSQL; 174 175 protected String updateReturningPostgreSQLSql; 176 177 protected String updateReturningOracleSql; 178 179 protected String updateReturningSQLServerSql; 180 181 @Override 182 public void initialize(KeyValueStoreDescriptor descriptor) { 183 super.initialize(descriptor); 184 logger = new JDBCLogger(name); 185 Map<String, String> properties = descriptor.properties; 186 dataSourceName = properties.get(DATASOURCE_PROP); 187 if (StringUtils.isAllBlank(dataSourceName)) { 188 throw new NuxeoException("Missing " + DATASOURCE_PROP + " property in configuration"); 189 } 190 String tableProp = properties.get(TABLE_PROP); 191 String namespace = descriptor.namespace; 192 String tbl; 193 if (isBlank(tableProp)) { 194 tbl = defaultIfBlank(namespace, name).trim(); 195 } else if (isBlank(namespace)) { 196 tbl = tableProp.trim(); 197 } else { 198 tbl = tableProp.trim() + "_" + namespace.trim(); 199 } 200 // check connection, get dialect and create table if needed 201 try (Connection connection = getConnection()) { 202 dialect = Dialect.createDialect(connection, null); 203 getTable(connection, tbl); 204 } catch (SQLException e) { 205 throw new NuxeoException(e); 206 } 207 prepareSQL(); 208 startTTLThread(); 209 } 210 211 @Override 212 public void close() { 213 stopTTLThread(); 214 } 215 216 protected void getTable(Connection connection, String tbl) throws SQLException { 217 String tablePhysicalName = dialect.getTableName(tbl); 218 table = new TableImpl(dialect, tablePhysicalName, tablePhysicalName); 219 keyCol = addColumn(KEY_COL, ColumnType.SYSNAME); 220 keyCol.setPrimary(true); 221 keyCol.setNullable(false); 222 longCol = addColumn(LONG_COL, ColumnType.LONG); 223 stringCol = addColumn(STRING_COL, ColumnType.CLOB); 224 bytesCol = addColumn(BYTES_COL, ColumnType.BLOB); 225 ttlCol = addColumn(TTL_COL, ColumnType.LONG); 226 table.addIndex(TTL_COL); 227 tableName = table.getQuotedName(); 228 keyColName = keyCol.getQuotedName(); 229 longColName = longCol.getQuotedName(); 230 stringColName = stringCol.getQuotedName(); 231 bytesColName = bytesCol.getQuotedName(); 232 ttlColName = ttlCol.getQuotedName(); 233 if (!tableExists(connection)) { 234 createTable(connection); 235 } 236 checkColumns(connection); 237 } 238 239 protected Column addColumn(String columnName, ColumnType type) { 240 String colPhysicalName = dialect.getColumnName(columnName); 241 Column column = new Column(table, colPhysicalName, type, columnName); 242 return table.addColumn(column.getKey(), column); 243 } 244 245 protected void prepareSQL() { 246 getSQL = "SELECT " + longColName + ", " + stringColName + ", " + bytesColName + " FROM " + tableName + " WHERE " 247 + keyColName + " = ?"; 248 getMultiSQL = "SELECT " + keyColName + ", " + longColName + ", " + stringColName + ", " + bytesColName 249 + " FROM " + tableName + " WHERE " + keyColName + " IN (%s)"; 250 getLongSQL = "SELECT " + longColName + " FROM " + tableName + " WHERE " + keyColName + " = ?"; 251 deleteAllSQL = "DELETE FROM " + tableName; 252 deleteSQL = "DELETE FROM " + tableName + " WHERE " + keyColName + " = ?"; 253 deleteIfLongSQL = deleteSQL + " AND " + longColName + " = ?"; 254 deleteIfStringSQL = deleteSQL + " AND " + dialect.getQuotedNameForExpression(stringCol) + " = ?"; 255 deleteIfBytesSQL = deleteSQL + " AND " + bytesColName + " = ?"; 256 expireSQL = "DELETE FROM " + tableName + " WHERE " + ttlColName + " < ?"; 257 keyStreamSQL = "SELECT " + keyColName + " FROM " + tableName; 258 keyStreamPrefixSQL = keyStreamSQL + " WHERE " + keyColName + " LIKE ?"; 259 String esc = dialect.getLikeEscaping(); 260 if (esc != null) { 261 keyStreamPrefixSQL += esc; 262 } 263 setTTLSQL = "UPDATE " + tableName + " SET " + ttlColName + " = ? WHERE " + keyColName + " = ?"; 264 existsSQL = "SELECT 1 FROM " + tableName + " WHERE " + keyColName + " = ?"; 265 insertSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ", " + stringColName + ", " 266 + bytesColName + ", " + ttlColName + ") VALUES (?, ?, ?, ?, ?)"; 267 insertLongSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ") VALUES (?, ?)"; 268 updateLongSQL = "UPDATE " + tableName + " SET " + longColName + " = ? WHERE " + keyColName + " = ? AND " 269 + longColName + " = ?"; 270 updateReturningPostgreSQLSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName 271 + " + ? WHERE " + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName 272 + " IS NULL RETURNING " + longColName; 273 updateReturningOracleSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName + " + ? WHERE " 274 + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName + " IS NULL RETURNING " 275 + longColName + " INTO ?"; 276 updateReturningSQLServerSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName 277 + " + ? OUTPUT INSERTED." + longColName + " WHERE " + keyColName + " = ? AND " + stringColName 278 + " IS NULL AND " + bytesColName + " IS NULL"; 279 } 280 281 protected void startTTLThread() { 282 ttlThread = new Thread(this::expireTTLThread); 283 ttlThread.setName("Nuxeo-Expire-KeyValueStore-" + name); 284 ttlThread.setDaemon(true); 285 ttlThread.start(); 286 } 287 288 protected void stopTTLThread() { 289 if (ttlThread == null) { 290 return; 291 } 292 ttlThread.interrupt(); 293 ttlThread = null; 294 } 295 296 /** 297 * Runs in a thread to do TTL expiration. 298 */ 299 protected void expireTTLThread() { 300 log.debug("Starting TTL expiration thread for KeyValueStore: {}", name); 301 try { 302 // for the initial wait, use a random duration to avoid thundering herd problems 303 Thread.sleep((long) (TTL_EXPIRATION_FREQUENCY_MS * Math.random())); 304 for (;;) { 305 if (Thread.currentThread().isInterrupted()) { 306 break; 307 } 308 Thread.sleep(TTL_EXPIRATION_FREQUENCY_MS); 309 expireTTLOnce(); 310 } 311 } catch (InterruptedException e) { 312 Thread.currentThread().interrupt(); 313 } 314 log.debug("Stopping TTL expiration thread for KeyValueStore: {}", name); 315 } 316 317 protected String escapeLike(String prefix) { 318 return prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_"); 319 } 320 321 /** 322 * Canonicalizes value for the database: use a String or a Long if possible. 323 */ 324 protected Object toStorage(Object value) { 325 // try to convert byte array to UTF-8 string 326 if (value instanceof byte[]) { 327 try { 328 value = bytesToString((byte[]) value); 329 } catch (CharacterCodingException e) { 330 // ignore 331 } 332 } 333 // try to convert String to Long 334 if (value instanceof String) { 335 try { 336 value = Long.valueOf((String) value); 337 } catch (NumberFormatException e) { 338 // ignore 339 } 340 } 341 return value; 342 } 343 344 protected byte[] toBytes(Object value) { 345 if (value instanceof String) { 346 return ((String) value).getBytes(UTF_8); 347 } else if (value instanceof Long) { 348 return ((Long) value).toString().getBytes(UTF_8); 349 } else if (value instanceof byte[]) { 350 return (byte[]) value; 351 } 352 return null; 353 } 354 355 protected String toString(Object value) { 356 if (value instanceof String) { 357 return (String) value; 358 } else if (value instanceof Long) { 359 return ((Long) value).toString(); 360 } else if (value instanceof byte[]) { 361 byte[] bytes = (byte[]) value; 362 try { 363 return bytesToString(bytes); 364 } catch (CharacterCodingException e) { 365 return null; 366 } 367 } 368 return null; 369 } 370 371 protected Long toLong(Object value) throws NumberFormatException { // NOSONAR 372 if (value instanceof Long) { 373 return (Long) value; 374 } else if (value instanceof String) { 375 return Long.valueOf((String) value); 376 } else if (value instanceof byte[]) { 377 byte[] bytes = (byte[]) value; 378 return bytesToLong(bytes); 379 } 380 return null; 381 } 382 383 protected Connection getConnection() throws SQLException { 384 // open connection in noSharing mode 385 return ConnectionHelper.getConnection(dataSourceName, true); 386 } 387 388 protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column, Serializable value) 389 throws SQLException { 390 setToPreparedStatement(sql, ps, Arrays.asList(column), Arrays.asList(value)); 391 } 392 393 protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1, 394 Column column2, Serializable value2) throws SQLException { 395 setToPreparedStatement(sql, ps, Arrays.asList(column1, column2), Arrays.asList(value1, value2)); 396 } 397 398 protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1, 399 Column column2, Serializable value2, Column column3, Serializable value3) throws SQLException { 400 setToPreparedStatement(sql, ps, Arrays.asList(column1, column2, column3), 401 Arrays.asList(value1, value2, value3)); 402 } 403 404 protected void setToPreparedStatement(String sql, PreparedStatement ps, List<Column> columns, 405 List<? extends Serializable> values) throws SQLException { 406 if (columns.size() != values.size()) { 407 throw new IllegalStateException(); 408 } 409 for (int i = 0; i < columns.size(); i++) { 410 columns.get(i).setToPreparedStatement(ps, i + 1, values.get(i)); 411 } 412 if (logger.isLogEnabled()) { 413 logger.logSQL(sql, values); 414 } 415 } 416 417 protected boolean tableExists(Connection connection) throws SQLException { 418 DatabaseMetaData metadata = connection.getMetaData(); 419 String schemaName = getDatabaseSchemaName(connection); 420 try (ResultSet rs = metadata.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" })) { 421 boolean exists = rs.next(); 422 log.trace("Checking if table {} exists: {}", table.getPhysicalName(), exists); 423 return exists; 424 } 425 } 426 427 protected void createTable(Connection connection) throws SQLException { 428 try (Statement st = connection.createStatement()) { 429 String createSQL = table.getCreateSql(); 430 logger.log(createSQL); 431 st.execute(createSQL); 432 for (String sql : table.getPostCreateSqls(null)) { 433 logger.log(sql); 434 st.execute(sql); 435 } 436 } 437 } 438 439 /** 440 * Checks that columns have expected JDBC types. 441 */ 442 protected void checkColumns(Connection connection) throws SQLException { 443 DatabaseMetaData metadata = connection.getMetaData(); 444 String schemaName = getDatabaseSchemaName(connection); 445 try (ResultSet rs = metadata.getColumns(null, schemaName, table.getPhysicalName(), "%")) { 446 while (rs.next()) { 447 String schema = rs.getString("TABLE_SCHEM"); 448 if (schema != null) { // null for MySQL, doh! 449 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 450 // H2 returns some system tables (locks) 451 continue; 452 } 453 } 454 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 455 int actual = rs.getInt("DATA_TYPE"); 456 String actualName = rs.getString("TYPE_NAME"); 457 int actualSize = rs.getInt("COLUMN_SIZE"); 458 Column column = null; 459 for (Column c : table.getColumns()) { 460 String upperName = c.getPhysicalName().toUpperCase(); 461 if (upperName.equals(columnName)) { 462 column = c; 463 } 464 } 465 if (column == null) { 466 log.error("Column not found: {} in table: {}", columnName, tableName); 467 continue; 468 } 469 String message = column.checkJdbcType(actual, actualName, actualSize); 470 if (message != null) { 471 log.error(message); 472 Framework.getRuntime() 473 .getMessageHandler() 474 .addMessage( 475 new RuntimeMessage(Level.ERROR, message, Source.CODE, this.getClass().getName())); 476 } 477 } 478 } 479 } 480 481 protected String getDatabaseSchemaName(Connection connection) throws SQLException { 482 String schemaName = null; 483 if (dialect instanceof DialectOracle) { 484 try (Statement st = connection.createStatement()) { 485 String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL"; 486 logger.log(sql); 487 try (ResultSet rs = st.executeQuery(sql)) { 488 if (rs.next()) { 489 schemaName = rs.getString(1); 490 logger.log(" -> schema: " + schemaName); 491 } 492 } 493 } 494 } 495 return schemaName; 496 } 497 498 protected void expireTTLOnce() { 499 try (Connection connection = getConnection(); // 500 PreparedStatement ps = connection.prepareStatement(expireSQL)) { 501 Long ttlDeadline = getTTLValue(0); 502 setToPreparedStatement(expireSQL, ps, ttlCol, ttlDeadline); 503 int count = ps.executeUpdate(); 504 logger.logCount(count); 505 } catch (SQLException e) { 506 if (dialect.isConcurrentUpdateException(e)) { 507 // ignore 508 return; 509 } 510 log.debug("Exception during TTL expiration", e); 511 } 512 } 513 514 @Override 515 public void clear() { 516 try (Connection connection = getConnection(); // 517 Statement st = connection.createStatement()) { 518 logger.log(deleteAllSQL); 519 st.execute(deleteAllSQL); 520 } catch (SQLException e) { 521 throw new NuxeoException(e); 522 } 523 } 524 525 @Override 526 public Stream<String> keyStream() { 527 return keyStream(null); 528 } 529 530 @Override 531 public Stream<String> keyStream(String prefix) { 532 try (Connection connection = getConnection()) { 533 return keyStream(connection, prefix); 534 } catch (SQLException e) { 535 throw new NuxeoException(e); 536 } 537 } 538 539 protected Stream<String> keyStream(Connection connection, String prefix) throws SQLException { 540 String sql = prefix == null ? keyStreamSQL : keyStreamPrefixSQL; 541 List<String> keys = new ArrayList<>(); 542 try (PreparedStatement ps = connection.prepareStatement(sql)) { 543 if (prefix != null) { 544 String like = escapeLike(prefix) + "%"; 545 setToPreparedStatement(sql, ps, keyCol, like); 546 } 547 try (ResultSet rs = ps.executeQuery()) { 548 while (rs.next()) { 549 String key = (String) keyCol.getFromResultSet(rs, 1); 550 keys.add(key); 551 } 552 } 553 } 554 return keys.stream(); 555 } 556 557 @Override 558 public byte[] get(String key) { 559 Object value = getObject(key); 560 if (value == null) { 561 return null; 562 } 563 byte[] bytes = toBytes(value); 564 if (bytes != null) { 565 return bytes; 566 } 567 throw new UnsupportedOperationException(value.getClass().getName()); 568 } 569 570 @Override 571 public String getString(String key) { 572 Object value = getObject(key); 573 if (value == null) { 574 return null; 575 } 576 String string = toString(value); 577 if (string != null) { 578 return string; 579 } 580 throw new IllegalArgumentException("Value is not a String for key: " + key); 581 } 582 583 @Override 584 public Long getLong(String key) throws NumberFormatException { // NOSONAR 585 Object value = getObject(key); 586 if (value == null) { 587 return null; 588 } 589 Long longValue = toLong(value); 590 if (longValue != null) { 591 return longValue; 592 } 593 throw new NumberFormatException("Value is not a Long for key: " + key); 594 } 595 596 @Override 597 public Map<String, byte[]> get(Collection<String> keys) { 598 Map<String, byte[]> map = new HashMap<>(keys.size()); 599 getObjects(keys, (key, value) -> { 600 byte[] bytes = toBytes(value); 601 if (bytes == null) { 602 throw new UnsupportedOperationException(String.format("Value of class %s is not supported for key: %s", 603 value.getClass().getName(), key)); 604 } 605 map.put(key, bytes); 606 }); 607 return map; 608 } 609 610 @Override 611 public Map<String, String> getStrings(Collection<String> keys) { 612 Map<String, String> map = new HashMap<>(keys.size()); 613 getObjects(keys, (key, value) -> { 614 String string = toString(value); 615 if (string == null) { 616 throw new IllegalArgumentException("Value is not a String for key: " + key); 617 } 618 map.put(key, string); 619 }); 620 return map; 621 } 622 623 @Override 624 public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR 625 Map<String, Long> map = new HashMap<>(keys.size()); 626 getObjects(keys, (key, value) -> { 627 Long longValue = toLong(value); 628 if (longValue == null) { 629 throw new IllegalArgumentException("Value is not a Long for key: " + key); 630 } 631 map.put(key, longValue); 632 }); 633 return map; 634 } 635 636 protected Object getObject(String key) { 637 try (Connection connection = getConnection(); // 638 PreparedStatement ps = connection.prepareStatement(getSQL)) { 639 setToPreparedStatement(getSQL, ps, keyCol, key); 640 try (ResultSet rs = ps.executeQuery()) { 641 if (!rs.next()) { 642 if (logger.isLogEnabled()) { 643 logger.log(" -> null"); 644 } 645 return null; 646 } 647 Long longValue = (Long) longCol.getFromResultSet(rs, 1); 648 String string = (String) stringCol.getFromResultSet(rs, 2); 649 byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 3); 650 if (logger.isLogEnabled()) { 651 logger.logResultSet(rs, Arrays.asList(longCol, stringCol, bytesCol)); 652 } 653 if (string != null) { 654 return string; 655 } else if (longValue != null) { 656 return longValue; 657 } else { 658 return bytes; 659 } 660 } 661 } catch (SQLException e) { 662 throw new NuxeoException(e); 663 } 664 } 665 666 protected void getObjects(Collection<String> keys, BiConsumer<String, Object> consumer) { 667 if (keys.isEmpty()) { 668 return; 669 } 670 String sql = String.format(getMultiSQL, nParams(keys.size())); 671 logger.logSQL(sql, keys); 672 try (Connection connection = getConnection(); // 673 PreparedStatement ps = connection.prepareStatement(sql)) { 674 int i = 1; 675 for (String key : keys) { 676 keyCol.setToPreparedStatement(ps, i++, key); 677 } 678 try (ResultSet rs = ps.executeQuery()) { 679 while (rs.next()) { 680 String key = (String) keyCol.getFromResultSet(rs, 1); 681 Long longVal = (Long) longCol.getFromResultSet(rs, 2); 682 String string = (String) stringCol.getFromResultSet(rs, 3); 683 byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 4); 684 if (logger.isLogEnabled()) { 685 logger.logResultSet(rs, Arrays.asList(keyCol, longCol, stringCol, bytesCol)); 686 } 687 Object value; 688 if (string != null) { 689 value = string; 690 } else if (longVal != null) { 691 value = longVal; 692 } else { 693 value = bytes; 694 } 695 if (value != null) { 696 consumer.accept(key, value); 697 } 698 } 699 } 700 } catch (SQLException e) { 701 throw new NuxeoException(e); 702 } 703 } 704 705 protected String nParams(int n) { 706 StringBuilder sb = new StringBuilder(); 707 for (int i = 0; i < n; i++) { 708 if (i != 0) { 709 sb.append(", "); 710 } 711 sb.append('?'); 712 } 713 return sb.toString(); 714 } 715 716 protected Long ttlToStorage(long ttl) { 717 return ttl == 0 ? null : getTTLValue(ttl); 718 } 719 720 protected Long getTTLValue(long ttl) { 721 return Long.valueOf(System.currentTimeMillis() / 1000 + ttl); 722 } 723 724 @Override 725 public void put(String key, byte[] bytes) { 726 put(key, toStorage(bytes), 0); 727 } 728 729 @Override 730 public void put(String key, byte[] bytes, long ttl) { 731 put(key, toStorage(bytes), ttl); 732 } 733 734 @Override 735 public void put(String key, String string) { 736 put(key, toStorage(string), 0); 737 } 738 739 @Override 740 public void put(String key, String string, long ttl) { 741 put(key, toStorage(string), ttl); 742 } 743 744 @Override 745 public void put(String key, Long value) { 746 put(key, (Object) value, 0); 747 } 748 749 @Override 750 public void put(String key, Long value, long ttl) { 751 put(key, (Object) value, ttl); 752 } 753 754 protected void put(String key, Object value, long ttl) { 755 try (Connection connection = getConnection()) { 756 if (value == null) { 757 // delete 758 try (PreparedStatement ps = connection.prepareStatement(deleteSQL)) { 759 setToPreparedStatement(deleteSQL, ps, keyCol, key); 760 ps.execute(); 761 } 762 } else { 763 // upsert (update or insert) 764 Long longValue = value instanceof Long ? (Long) value : null; 765 String stringValue = value instanceof String ? (String) value : null; 766 byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null; 767 Long ttlValue = ttlToStorage(ttl); 768 List<Column> psColumns = new ArrayList<>(); 769 List<Serializable> psValues = new ArrayList<>(); 770 String sql = dialect.getUpsertSql(Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol), 771 Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue), psColumns, psValues); 772 for (int retry = 0; retry < MAX_RETRY; retry++) { 773 try { 774 try (PreparedStatement ps = connection.prepareStatement(sql)) { 775 setToPreparedStatement(sql, ps, psColumns, psValues); 776 ps.execute(); 777 } 778 return; 779 } catch (SQLException e) { 780 if (!dialect.isConcurrentUpdateException(e)) { 781 throw e; 782 } 783 // Oracle MERGE can throw DUP_VAL_ON_INDEX (ORA-0001) or NO_DATA_FOUND (ORA-01403) 784 // in that case retry a few times 785 } 786 sleepBeforeRetry(); 787 } 788 throw new ConcurrentUpdateException("Failed to do atomic put for key: " + key); 789 } 790 } catch (SQLException e) { 791 throw new NuxeoException(e); 792 } 793 } 794 795 @Override 796 public boolean setTTL(String key, long ttl) { 797 try (Connection connection = getConnection(); // 798 PreparedStatement ps = connection.prepareStatement(setTTLSQL)) { 799 setToPreparedStatement(setTTLSQL, ps, ttlCol, ttlToStorage(ttl), keyCol, key); 800 int count = ps.executeUpdate(); 801 boolean set = count == 1; 802 return set; 803 } catch (SQLException e) { 804 throw new NuxeoException(e); 805 } 806 } 807 808 @Override 809 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 810 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 811 } 812 813 @Override 814 public boolean compareAndSet(String key, String expected, String value, long ttl) { 815 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 816 } 817 818 protected boolean compareAndSet(String key, Object expected, Object value, long ttl) { 819 try (Connection connection = getConnection()) { 820 if (expected == null && value == null) { 821 // check that document doesn't exist 822 try (PreparedStatement ps = connection.prepareStatement(existsSQL)) { 823 setToPreparedStatement(existsSQL, ps, keyCol, key); 824 try (ResultSet rs = ps.executeQuery()) { 825 boolean set = !rs.next(); 826 if (logger.isLogEnabled()) { 827 logger.log(" -> " + (set ? "NOP" : "FAILED")); 828 } 829 return set; 830 } 831 } 832 } else if (expected == null) { 833 // set value if no document already exists: regular insert detecting duplicate row 834 Long longValue = value instanceof Long ? (Long) value : null; 835 String stringValue = value instanceof String ? (String) value : null; 836 byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null; 837 Long ttlValue = ttlToStorage(ttl); 838 List<Column> psColumns = new ArrayList<>(); 839 List<Serializable> psValues = new ArrayList<>(); 840 String insertOnConflictDoNothingSql = dialect.getInsertOnConflictDoNothingSql( 841 Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol), 842 Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue), psColumns, psValues); 843 boolean set; 844 if (insertOnConflictDoNothingSql != null) { 845 try (PreparedStatement ps = connection.prepareStatement(insertOnConflictDoNothingSql)) { 846 setToPreparedStatement(insertOnConflictDoNothingSql, ps, psColumns, psValues); 847 int count = ps.executeUpdate(); 848 set = count == 1; 849 } 850 } else { 851 try (PreparedStatement ps = connection.prepareStatement(insertSQL)) { 852 setToPreparedStatement(insertSQL, ps, 853 Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol), 854 Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue)); 855 try { 856 ps.executeUpdate(); 857 set = true; 858 } catch (SQLException e) { 859 if (!dialect.isConcurrentUpdateException(e)) { 860 throw e; 861 } 862 set = false; 863 } 864 } 865 } 866 if (logger.isLogEnabled()) { 867 logger.log(" -> " + (set ? "SET" : "FAILED")); 868 } 869 return set; 870 } else if (value == null) { 871 // delete if previous value exists 872 String sql; 873 Column col; 874 if (expected instanceof Long) { 875 sql = deleteIfLongSQL; 876 col = longCol; 877 } else if (expected instanceof String) { 878 sql = deleteIfStringSQL; 879 col = stringCol; 880 } else { 881 sql = deleteIfBytesSQL; 882 col = bytesCol; 883 } 884 try (PreparedStatement ps = connection.prepareStatement(sql)) { 885 setToPreparedStatement(sql, ps, keyCol, key, col, (Serializable) expected); 886 int count = ps.executeUpdate(); 887 boolean set = count == 1; 888 if (logger.isLogEnabled()) { 889 logger.log(" -> " + (set ? "DEL" : "FAILED")); 890 } 891 return set; 892 } 893 } else { 894 // replace if previous value exists 895 Column expectedCol = expected instanceof Long ? longCol 896 : expected instanceof String ? stringCol : bytesCol; 897 Column valueCol = value instanceof Long ? longCol : value instanceof String ? stringCol : bytesCol; 898 if (expectedCol != valueCol) { 899 throw new NuxeoException("TODO expected and value have different types"); 900 // TODO in that case we must set to null the old value column 901 } 902 // Prevent ORA-24816 by setting the long value at the end 903 String sql = "UPDATE " + tableName + " SET " + ttlColName + " = ?, " + valueCol.getQuotedName() 904 + " = ? WHERE " + keyColName + " = ? AND " + dialect.getQuotedNameForExpression(expectedCol) 905 + " = ?"; 906 try (PreparedStatement ps = connection.prepareStatement(sql)) { 907 setToPreparedStatement(sql, ps, Arrays.asList(ttlCol, valueCol, keyCol, expectedCol), 908 Arrays.asList(ttlToStorage(ttl), (Serializable) value, key, (Serializable) expected)); 909 int count = ps.executeUpdate(); 910 boolean set = count == 1; 911 if (logger.isLogEnabled()) { 912 logger.log(" -> " + (set ? "SET" : "FAILED")); 913 } 914 return set; 915 } 916 } 917 } catch (SQLException e) { 918 throw new NuxeoException(e); 919 } 920 } 921 922 @Override 923 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 924 try (Connection connection = getConnection()) { 925 for (int retry = 0; retry < MAX_RETRY; retry++) { 926 String updateReturningSql; 927 boolean useReturnResultSet = false; 928 if (dialect instanceof DialectPostgreSQL) { 929 updateReturningSql = updateReturningPostgreSQLSql; 930 } else if (dialect instanceof DialectOracle) { 931 updateReturningSql = updateReturningOracleSql; 932 useReturnResultSet = true; 933 } else if (dialect instanceof DialectSQLServer) { 934 updateReturningSql = updateReturningSQLServerSql; 935 } else { 936 updateReturningSql = null; 937 } 938 if (updateReturningSql != null) { 939 List<Column> psColumns = Arrays.asList(longCol, keyCol); 940 List<Serializable> psValues = Arrays.asList(Long.valueOf(delta), key); 941 try (PreparedStatement ps = connection.prepareStatement(updateReturningSql)) { 942 setToPreparedStatement(updateReturningSql, ps, psColumns, psValues); 943 if (useReturnResultSet) { 944 dialect.registerReturnParameter(ps, 3, longCol.getJdbcType()); 945 } 946 boolean hasResultSet; 947 if (useReturnResultSet) { 948 int count = ps.executeUpdate(); 949 hasResultSet = count > 0; 950 } else { 951 hasResultSet = true; 952 } 953 if (hasResultSet) { 954 ResultSet rs; 955 if (useReturnResultSet) { 956 rs = dialect.getReturnResultSet(ps); 957 } else { 958 rs = ps.executeQuery(); 959 } 960 try { 961 if (rs.next()) { 962 Long longValue = (Long) longCol.getFromResultSet(rs, 1); 963 // We may get NULL here, because if the value is an empty string 964 // a normal database would not match any row, but Oracle treats 965 // "" as NULL and we end up trying to increment the long field 966 // which is also NULL. 967 if (longValue == null) { 968 throw new NumberFormatException("Value is not a Long for key: " + key); 969 } 970 return longValue; 971 } 972 } finally { 973 rs.close(); 974 } 975 } 976 } 977 } 978 // the dialect doesn't support UPDATE RETURNING, or 979 // there was no row for this key, or 980 // the row didn't contain a long 981 // -> retry using a full transaction doing check + insert 982 // start transaction 983 connection.setAutoCommit(false); 984 try { 985 // check value 986 Long currentLong; 987 try (PreparedStatement ps = connection.prepareStatement(getLongSQL)) { 988 setToPreparedStatement(getLongSQL, ps, keyCol, key); 989 try (ResultSet rs = ps.executeQuery()) { 990 if (rs.next()) { 991 currentLong = (Long) longCol.getFromResultSet(rs, 1); 992 if (logger.isLogEnabled()) { 993 logger.logResultSet(rs, Arrays.asList(longCol)); 994 } 995 if (currentLong == null) { 996 throw new NumberFormatException("Value is not a Long for key: " + key); 997 } 998 } else { 999 currentLong = null; 1000 } 1001 } 1002 } 1003 if (currentLong == null) { 1004 // try insert 1005 try (PreparedStatement ps = connection.prepareStatement(insertLongSQL)) { 1006 setToPreparedStatement(insertLongSQL, ps, keyCol, key, longCol, Long.valueOf(delta)); 1007 try { 1008 ps.executeUpdate(); 1009 return delta; 1010 } catch (SQLException e) { 1011 if (!dialect.isConcurrentUpdateException(e)) { 1012 throw e; 1013 } 1014 // if concurrent update, retry 1015 } 1016 } 1017 } else { 1018 // update existing value 1019 Long newLong = Long.valueOf(currentLong.longValue() + delta); 1020 try (PreparedStatement ps = connection.prepareStatement(updateLongSQL)) { 1021 setToPreparedStatement(updateLongSQL, ps, longCol, newLong, keyCol, key, longCol, 1022 currentLong); 1023 int count = ps.executeUpdate(); 1024 if (count == 1) { 1025 return newLong; 1026 } 1027 // else the value changed... 1028 // concurrent update, retry 1029 } 1030 } 1031 } finally { 1032 connection.commit(); 1033 connection.setAutoCommit(true); 1034 } 1035 // concurrent update on insert or update, retry a few times 1036 sleepBeforeRetry(); 1037 } 1038 throw new ConcurrentUpdateException("Failed to do atomic addAndGet for key: " + key); 1039 } catch (SQLException e) { 1040 throw new NuxeoException(e); 1041 } 1042 } 1043 1044 protected void sleepBeforeRetry() { 1045 try { 1046 Thread.sleep(5); 1047 } catch (InterruptedException e) { 1048 Thread.currentThread().interrupt(); 1049 throw new NuxeoException(e); 1050 } 1051 } 1052 1053 // works on any representation that can be converted to a Long 1054 protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR 1055 for (;;) { 1056 Object value = getObject(key); 1057 long result; 1058 if (value == null) { 1059 result = delta; 1060 } else { 1061 Long base = toLong(value); 1062 if (base == null) { 1063 throw new NumberFormatException("Value is not a Long for key: " + key); 1064 } 1065 result = base.longValue() + delta; 1066 } 1067 Object newValue = Long.valueOf(result); 1068 if (compareAndSet(key, value, newValue, 0)) { 1069 return result; 1070 } 1071 // else loop to try again 1072 } 1073 } 1074 1075 @Override 1076 public String toString() { 1077 return getClass().getSimpleName() + "(" + name + ")"; 1078 } 1079 1080}