001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.storage.sql.kv; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.apache.commons.lang3.StringUtils.defaultIfBlank; 023import static org.apache.commons.lang3.StringUtils.isBlank; 024 025import java.io.Serializable; 026import java.nio.charset.CharacterCodingException; 027import java.sql.Connection; 028import java.sql.DatabaseMetaData; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.HashMap; 037import java.util.List; 038import java.util.Map; 039import java.util.function.BiConsumer; 040import java.util.stream.Stream; 041 042import org.apache.commons.lang3.StringUtils; 043import org.apache.logging.log4j.LogManager; 044import org.apache.logging.log4j.Logger; 045import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 046import org.nuxeo.ecm.core.api.NuxeoException; 047import org.nuxeo.ecm.core.storage.sql.ColumnType; 048import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger; 049import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column; 050import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl; 051import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect; 052import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle; 053import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectPostgreSQL; 054import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectSQLServer; 055import org.nuxeo.runtime.api.Framework; 056import org.nuxeo.runtime.datasource.ConnectionHelper; 057import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider; 058import org.nuxeo.runtime.kv.KeyValueStoreDescriptor; 059import org.nuxeo.runtime.transaction.TransactionHelper; 060 061/** 062 * SQL implementation of a Key/Value Store Provider. 063 * <p> 064 * The following configuration properties are available: 065 * <ul> 066 * <li>datasource: the datasource to use. 067 * <li>table: the table to use. The default is the Store name. 068 * </ul> 069 * If a namespace is specified, it is used as a table name suffix, otherwise of the store name. 070 * <p> 071 * This implementation uses a table with a KEY column (unique and not NULL), and for the value one of these three 072 * columns is used: LONG, STRING, BYTES. If possible LONG is used, then STRING, otherwise BYTES. 073 * <p> 074 * The TTL is stored as an expiration time (seconds since epoch) in its own column. Expiration is done by a thread 075 * running a cleanup DELETE query every 60 seconds. 076 * 077 * @since 10.10 078 */ 079public class SQLKeyValueStore extends AbstractKeyValueStoreProvider { 080 081 private static final Logger log = LogManager.getLogger(SQLKeyValueStore.class); 082 083 /** Datasource configuration property. */ 084 public static final String DATASOURCE_PROP = "datasource"; 085 086 /** Table configuration property. Default is the store name. The namespace is also used for disambiguation. */ 087 public static final String TABLE_PROP = "table"; 088 089 /** Key column, a short string. */ 090 public static final String KEY_COL = "key"; 091 092 /** Long column, or NULL if the value is not representable as a Long. */ 093 public static final String LONG_COL = "long"; 094 095 /** String column, or NULL if the value is representable as a Long or not representable as a String. */ 096 public static final String STRING_COL = "string"; 097 098 /** Bytes column, or NULL if the value is representable as a Long or String. */ 099 public static final String BYTES_COL = "bytes"; 100 101 /** TTL column, holding expiration date in seconds since epoch, or NULL if there is no expiration. */ 102 public static final String TTL_COL = "ttl"; 103 104 protected static final int TTL_EXPIRATION_FREQUENCY_MS = 60_000; // 60 seconds 105 106 // maximum number of retries in case of concurrency 107 protected static final int MAX_RETRY = 5; 108 109 protected JDBCLogger logger; 110 111 protected String dataSourceName; 112 113 protected Dialect dialect; 114 115 protected TableImpl table; 116 117 protected Column keyCol; 118 119 protected Column longCol; 120 121 protected Column stringCol; 122 123 protected Column bytesCol; 124 125 protected Column ttlCol; 126 127 protected String tableName; 128 129 protected String keyColName; 130 131 protected String longColName; 132 133 protected String stringColName; 134 135 protected String bytesColName; 136 137 protected String ttlColName; 138 139 protected Thread ttlThread; 140 141 protected String getSQL; 142 143 protected String getMultiSQL; 144 145 protected String getLongSQL; 146 147 protected String deleteAllSQL; 148 149 protected String deleteSQL; 150 151 protected String deleteIfLongSQL; 152 153 protected String deleteIfStringSQL; 154 155 protected String deleteIfBytesSQL; 156 157 protected String expireSQL; 158 159 protected String keyStreamSQL; 160 161 protected String keyStreamPrefixSQL; 162 163 protected String setTTLSQL; 164 165 protected String existsSQL; 166 167 protected String insertSQL; 168 169 protected String insertLongSQL; 170 171 protected String updateLongSQL; 172 173 protected String updateReturningPostgreSQLSql; 174 175 protected String updateReturningOracleSql; 176 177 protected String updateReturningSQLServerSql; 178 179 @Override 180 public void initialize(KeyValueStoreDescriptor descriptor) { 181 super.initialize(descriptor); 182 logger = new JDBCLogger(name); 183 Map<String, String> properties = descriptor.properties; 184 dataSourceName = properties.get(DATASOURCE_PROP); 185 if (StringUtils.isAllBlank(dataSourceName)) { 186 throw new NuxeoException("Missing " + DATASOURCE_PROP + " property in configuration"); 187 } 188 String tableProp = properties.get(TABLE_PROP); 189 String namespace = descriptor.namespace; 190 String tbl; 191 if (isBlank(tableProp)) { 192 tbl = defaultIfBlank(namespace, name).trim(); 193 } else if (isBlank(namespace)) { 194 tbl = tableProp.trim(); 195 } else { 196 tbl = tableProp.trim() + "_" + namespace.trim(); 197 } 198 // check connection, get dialect and create table if needed 199 runWithConnection(connection -> { 200 dialect = Dialect.createDialect(connection, null); 201 getTable(connection, tbl); 202 }); 203 prepareSQL(); 204 startTTLThread(); 205 } 206 207 @Override 208 public void close() { 209 stopTTLThread(); 210 } 211 212 protected void getTable(Connection connection, String tbl) throws SQLException { 213 String tablePhysicalName = dialect.getTableName(tbl); 214 table = new TableImpl(dialect, tablePhysicalName, tablePhysicalName); 215 keyCol = addColumn(KEY_COL, ColumnType.SYSNAME); 216 keyCol.setPrimary(true); 217 keyCol.setNullable(false); 218 longCol = addColumn(LONG_COL, ColumnType.LONG); 219 stringCol = addColumn(STRING_COL, ColumnType.CLOB); 220 bytesCol = addColumn(BYTES_COL, ColumnType.BLOB); 221 ttlCol = addColumn(TTL_COL, ColumnType.LONG); 222 table.addIndex(TTL_COL); 223 tableName = table.getQuotedName(); 224 keyColName = keyCol.getQuotedName(); 225 longColName = longCol.getQuotedName(); 226 stringColName = stringCol.getQuotedName(); 227 bytesColName = bytesCol.getQuotedName(); 228 ttlColName = ttlCol.getQuotedName(); 229 if (!tableExists(connection)) { 230 createTable(connection); 231 } 232 checkColumns(connection); 233 } 234 235 protected Column addColumn(String columnName, ColumnType type) { 236 String colPhysicalName = dialect.getColumnName(columnName); 237 Column column = new Column(table, colPhysicalName, type, columnName); 238 return table.addColumn(column.getKey(), column); 239 } 240 241 protected void prepareSQL() { 242 getSQL = "SELECT " + longColName + ", " + stringColName + ", " + bytesColName + " FROM " + tableName + " WHERE " 243 + keyColName + " = ?"; 244 getMultiSQL = "SELECT " + keyColName + ", " + longColName + ", " + stringColName + ", " + bytesColName 245 + " FROM " + tableName + " WHERE " + keyColName + " IN (%s)"; 246 getLongSQL = "SELECT " + longColName + " FROM " + tableName + " WHERE " + keyColName + " = ?"; 247 deleteAllSQL = "DELETE FROM " + tableName; 248 deleteSQL = "DELETE FROM " + tableName + " WHERE " + keyColName + " = ?"; 249 deleteIfLongSQL = deleteSQL + " AND " + longColName + " = ?"; 250 deleteIfStringSQL = deleteSQL + " AND " + dialect.getQuotedNameForExpression(stringCol) + " = ?"; 251 deleteIfBytesSQL = deleteSQL + " AND " + bytesColName + " = ?"; 252 expireSQL = "DELETE FROM " + tableName + " WHERE " + ttlColName + " < ?"; 253 keyStreamSQL = "SELECT " + keyColName + " FROM " + tableName; 254 keyStreamPrefixSQL = keyStreamSQL + " WHERE " + keyColName + " LIKE ?"; 255 String esc = dialect.getLikeEscaping(); 256 if (esc != null) { 257 keyStreamPrefixSQL += esc; 258 } 259 setTTLSQL = "UPDATE " + tableName + " SET " + ttlColName + " = ? WHERE " + keyColName + " = ?"; 260 existsSQL = "SELECT 1 FROM " + tableName + " WHERE " + keyColName + " = ?"; 261 insertSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ", " + stringColName + ", " 262 + bytesColName + ", " + ttlColName + ") VALUES (?, ?, ?, ?, ?)"; 263 insertLongSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ") VALUES (?, ?)"; 264 updateLongSQL = "UPDATE " + tableName + " SET " + longColName + " = ? WHERE " + keyColName + " = ? AND " 265 + longColName + " = ?"; 266 updateReturningPostgreSQLSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName 267 + " + ? WHERE " + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName 268 + " IS NULL RETURNING " + longColName; 269 updateReturningOracleSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName + " + ? WHERE " 270 + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName + " IS NULL RETURNING " 271 + longColName + " INTO ?"; 272 updateReturningSQLServerSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName 273 + " + ? OUTPUT INSERTED." + longColName + " WHERE " + keyColName + " = ? AND " + stringColName 274 + " IS NULL AND " + bytesColName + " IS NULL"; 275 } 276 277 protected void startTTLThread() { 278 ttlThread = new Thread(this::expireTTLThread); 279 ttlThread.setName("Nuxeo-Expire-KeyValueStore-" + name); 280 ttlThread.setDaemon(true); 281 ttlThread.start(); 282 } 283 284 protected void stopTTLThread() { 285 if (ttlThread == null) { 286 return; 287 } 288 ttlThread.interrupt(); 289 ttlThread = null; 290 } 291 292 /** 293 * Runs in a thread to do TTL expiration. 294 */ 295 protected void expireTTLThread() { 296 log.debug("Starting TTL expiration thread for KeyValueStore: {}", name); 297 try { 298 // for the initial wait, use a random duration to avoid thundering herd problems 299 Thread.sleep((long) (TTL_EXPIRATION_FREQUENCY_MS * Math.random())); 300 for (;;) { 301 if (Thread.currentThread().isInterrupted()) { 302 break; 303 } 304 Thread.sleep(TTL_EXPIRATION_FREQUENCY_MS); 305 expireTTLOnce(); 306 } 307 } catch (InterruptedException e) { 308 Thread.currentThread().interrupt(); 309 } 310 log.debug("Stopping TTL expiration thread for KeyValueStore: {}", name); 311 } 312 313 protected String escapeLike(String prefix) { 314 return prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_"); 315 } 316 317 /** 318 * Canonicalizes value for the database: use a String or a Long if possible. 319 */ 320 protected Object toStorage(Object value) { 321 // try to convert byte array to UTF-8 string 322 if (value instanceof byte[]) { 323 try { 324 value = bytesToString((byte[]) value); 325 } catch (CharacterCodingException e) { 326 // ignore 327 } 328 } 329 // try to convert String to Long 330 if (value instanceof String) { 331 try { 332 value = Long.valueOf((String) value); 333 } catch (NumberFormatException e) { 334 // ignore 335 } 336 } 337 return value; 338 } 339 340 protected byte[] toBytes(Object value) { 341 if (value instanceof String) { 342 return ((String) value).getBytes(UTF_8); 343 } else if (value instanceof Long) { 344 return ((Long) value).toString().getBytes(UTF_8); 345 } else if (value instanceof byte[]) { 346 return (byte[]) value; 347 } 348 return null; 349 } 350 351 protected String toString(Object value) { 352 if (value instanceof String) { 353 return (String) value; 354 } else if (value instanceof Long) { 355 return ((Long) value).toString(); 356 } else if (value instanceof byte[]) { 357 byte[] bytes = (byte[]) value; 358 try { 359 return bytesToString(bytes); 360 } catch (CharacterCodingException e) { 361 return null; 362 } 363 } 364 return null; 365 } 366 367 protected Long toLong(Object value) throws NumberFormatException { // NOSONAR 368 if (value instanceof Long) { 369 return (Long) value; 370 } else if (value instanceof String) { 371 return Long.valueOf((String) value); 372 } else if (value instanceof byte[]) { 373 byte[] bytes = (byte[]) value; 374 return bytesToLong(bytes); 375 } 376 return null; 377 } 378 379 /** A {@link java.util.function.Consumer Consumer} that can throw {@link SQLException}. */ 380 @FunctionalInterface 381 protected interface SQLConsumer<T> { 382 383 /** 384 * Performs this operation on the given argument. 385 * 386 * @param t the input argument 387 * @throws SQLException 388 */ 389 void accept(T t) throws SQLException; 390 } 391 392 /** A {@link java.util.function.Function Function} that can throw {@link SQLException}. */ 393 @FunctionalInterface 394 protected interface SQLFunction<T, R> { 395 396 /** 397 * Applies this function to the given argument. 398 * 399 * @param t the function argument 400 * @return the function result 401 * @throws SQLException 402 */ 403 R apply(T t) throws SQLException; 404 } 405 406 protected void runWithConnection(SQLConsumer<Connection> consumer) { 407 TransactionHelper.runWithoutTransaction(() -> { 408 try (Connection connection = getConnection()) { 409 consumer.accept(connection); 410 } catch (SQLException e) { 411 throw new NuxeoException(e); 412 } 413 }); 414 } 415 416 protected <R> R runWithConnection(SQLFunction<Connection, R> function) { 417 return TransactionHelper.runWithoutTransaction(() -> { 418 try (Connection connection = getConnection()) { 419 return function.apply(connection); 420 } catch (SQLException e) { 421 throw new NuxeoException(e); 422 } 423 }); 424 } 425 426 protected Connection getConnection() throws SQLException { 427 return ConnectionHelper.getConnection(dataSourceName); 428 } 429 430 protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column, Serializable value) 431 throws SQLException { 432 setToPreparedStatement(sql, ps, Arrays.asList(column), Arrays.asList(value)); 433 } 434 435 protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1, 436 Column column2, Serializable value2) throws SQLException { 437 setToPreparedStatement(sql, ps, Arrays.asList(column1, column2), Arrays.asList(value1, value2)); 438 } 439 440 protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1, 441 Column column2, Serializable value2, Column column3, Serializable value3) throws SQLException { 442 setToPreparedStatement(sql, ps, Arrays.asList(column1, column2, column3), 443 Arrays.asList(value1, value2, value3)); 444 } 445 446 protected void setToPreparedStatement(String sql, PreparedStatement ps, List<Column> columns, 447 List<? extends Serializable> values) throws SQLException { 448 if (columns.size() != values.size()) { 449 throw new IllegalStateException(); 450 } 451 for (int i = 0; i < columns.size(); i++) { 452 columns.get(i).setToPreparedStatement(ps, i + 1, values.get(i)); 453 } 454 if (logger.isLogEnabled()) { 455 logger.logSQL(sql, values); 456 } 457 } 458 459 protected boolean tableExists(Connection connection) throws SQLException { 460 DatabaseMetaData metadata = connection.getMetaData(); 461 String schemaName = getDatabaseSchemaName(connection); 462 try (ResultSet rs = metadata.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" })) { 463 boolean exists = rs.next(); 464 log.trace("Checking if table {} exists: {}", table.getPhysicalName(), exists); 465 return exists; 466 } 467 } 468 469 protected void createTable(Connection connection) throws SQLException { 470 try (Statement st = connection.createStatement()) { 471 String createSQL = table.getCreateSql(); 472 logger.log(createSQL); 473 st.execute(createSQL); 474 for (String sql : table.getPostCreateSqls(null)) { 475 logger.log(sql); 476 st.execute(sql); 477 } 478 } 479 } 480 481 /** 482 * Checks that columns have expected JDBC types. 483 */ 484 protected void checkColumns(Connection connection) throws SQLException { 485 DatabaseMetaData metadata = connection.getMetaData(); 486 String schemaName = getDatabaseSchemaName(connection); 487 try (ResultSet rs = metadata.getColumns(null, schemaName, table.getPhysicalName(), "%")) { 488 while (rs.next()) { 489 String schema = rs.getString("TABLE_SCHEM"); 490 if (schema != null) { // null for MySQL, doh! 491 if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) { 492 // H2 returns some system tables (locks) 493 continue; 494 } 495 } 496 String columnName = rs.getString("COLUMN_NAME").toUpperCase(); 497 int actual = rs.getInt("DATA_TYPE"); 498 String actualName = rs.getString("TYPE_NAME"); 499 int actualSize = rs.getInt("COLUMN_SIZE"); 500 Column column = null; 501 for (Column c : table.getColumns()) { 502 String upperName = c.getPhysicalName().toUpperCase(); 503 if (upperName.equals(columnName)) { 504 column = c; 505 } 506 } 507 if (column == null) { 508 log.error("Column not found: {} in table: {}", columnName, tableName); 509 continue; 510 } 511 String message = column.checkJdbcType(actual, actualName, actualSize); 512 if (message != null) { 513 log.error(message); 514 Framework.getRuntime().getMessageHandler().addError(message); 515 } 516 } 517 } 518 } 519 520 protected String getDatabaseSchemaName(Connection connection) throws SQLException { 521 String schemaName = null; 522 if (dialect instanceof DialectOracle) { 523 try (Statement st = connection.createStatement()) { 524 String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL"; 525 logger.log(sql); 526 try (ResultSet rs = st.executeQuery(sql)) { 527 if (rs.next()) { 528 schemaName = rs.getString(1); 529 logger.log(" -> schema: " + schemaName); 530 } 531 } 532 } 533 } 534 return schemaName; 535 } 536 537 protected void expireTTLOnce() { 538 runWithConnection(connection -> { 539 try { 540 try (PreparedStatement ps = connection.prepareStatement(expireSQL)) { 541 Long ttlDeadline = getTTLValue(0); 542 setToPreparedStatement(expireSQL, ps, ttlCol, ttlDeadline); 543 int count = ps.executeUpdate(); 544 logger.logCount(count); 545 } 546 } catch (SQLException e) { 547 if (dialect.isConcurrentUpdateException(e)) { 548 // ignore 549 return; 550 } 551 log.debug("Exception during TTL expiration", e); 552 } 553 }); 554 } 555 556 @Override 557 public void clear() { 558 runWithConnection(connection -> { 559 try (Statement st = connection.createStatement()) { 560 logger.log(deleteAllSQL); 561 st.execute(deleteAllSQL); 562 } 563 }); 564 } 565 566 @Override 567 public Stream<String> keyStream() { 568 return runWithConnection((Connection connection) -> keyStream(connection, null)); 569 } 570 571 @Override 572 public Stream<String> keyStream(String prefix) { 573 return runWithConnection((Connection connection) -> keyStream(connection, prefix)); 574 } 575 576 protected Stream<String> keyStream(Connection connection, String prefix) throws SQLException { 577 String sql = prefix == null ? keyStreamSQL : keyStreamPrefixSQL; 578 List<String> keys = new ArrayList<>(); 579 try (PreparedStatement ps = connection.prepareStatement(sql)) { 580 if (prefix != null) { 581 String like = escapeLike(prefix) + "%"; 582 setToPreparedStatement(sql, ps, keyCol, like); 583 } 584 try (ResultSet rs = ps.executeQuery()) { 585 while (rs.next()) { 586 String key = (String) keyCol.getFromResultSet(rs, 1); 587 keys.add(key); 588 } 589 } 590 } 591 return keys.stream(); 592 } 593 594 @Override 595 public byte[] get(String key) { 596 Object value = getObject(key); 597 if (value == null) { 598 return null; 599 } 600 byte[] bytes = toBytes(value); 601 if (bytes != null) { 602 return bytes; 603 } 604 throw new UnsupportedOperationException(value.getClass().getName()); 605 } 606 607 @Override 608 public String getString(String key) { 609 Object value = getObject(key); 610 if (value == null) { 611 return null; 612 } 613 String string = toString(value); 614 if (string != null) { 615 return string; 616 } 617 throw new IllegalArgumentException("Value is not a String for key: " + key); 618 } 619 620 @Override 621 public Long getLong(String key) throws NumberFormatException { // NOSONAR 622 Object value = getObject(key); 623 if (value == null) { 624 return null; 625 } 626 Long longValue = toLong(value); 627 if (longValue != null) { 628 return longValue; 629 } 630 throw new NumberFormatException("Value is not a Long for key: " + key); 631 } 632 633 @Override 634 public Map<String, byte[]> get(Collection<String> keys) { 635 Map<String, byte[]> map = new HashMap<>(keys.size()); 636 getObjects(keys, (key, value) -> { 637 byte[] bytes = toBytes(value); 638 if (bytes == null) { 639 throw new UnsupportedOperationException(String.format("Value of class %s is not supported for key: %s", 640 value.getClass().getName(), key)); 641 } 642 map.put(key, bytes); 643 }); 644 return map; 645 } 646 647 @Override 648 public Map<String, String> getStrings(Collection<String> keys) { 649 Map<String, String> map = new HashMap<>(keys.size()); 650 getObjects(keys, (key, value) -> { 651 String string = toString(value); 652 if (string == null) { 653 throw new IllegalArgumentException("Value is not a String for key: " + key); 654 } 655 map.put(key, string); 656 }); 657 return map; 658 } 659 660 @Override 661 public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR 662 Map<String, Long> map = new HashMap<>(keys.size()); 663 getObjects(keys, (key, value) -> { 664 Long longValue = toLong(value); 665 if (longValue == null) { 666 throw new IllegalArgumentException("Value is not a Long for key: " + key); 667 } 668 map.put(key, longValue); 669 }); 670 return map; 671 } 672 673 protected Object getObject(String key) { 674 return runWithConnection(connection -> { 675 try (PreparedStatement ps = connection.prepareStatement(getSQL)) { 676 setToPreparedStatement(getSQL, ps, keyCol, key); 677 try (ResultSet rs = ps.executeQuery()) { 678 if (!rs.next()) { 679 if (logger.isLogEnabled()) { 680 logger.log(" -> null"); 681 } 682 return null; 683 } 684 Long longValue = (Long) longCol.getFromResultSet(rs, 1); 685 String string = (String) stringCol.getFromResultSet(rs, 2); 686 byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 3); 687 if (logger.isLogEnabled()) { 688 logger.logResultSet(rs, Arrays.asList(longCol, stringCol, bytesCol)); 689 } 690 if (string != null) { 691 return string; 692 } else if (longValue != null) { 693 return longValue; 694 } else { 695 return bytes; 696 } 697 } 698 } 699 }); 700 } 701 702 protected void getObjects(Collection<String> keys, BiConsumer<String, Object> consumer) { 703 if (keys.isEmpty()) { 704 return; 705 } 706 runWithConnection((Connection connection) -> { 707 String sql = String.format(getMultiSQL, nParams(keys.size())); 708 logger.logSQL(sql, keys); 709 try (PreparedStatement ps = connection.prepareStatement(sql)) { 710 int i = 1; 711 for (String key : keys) { 712 keyCol.setToPreparedStatement(ps, i++, key); 713 } 714 try (ResultSet rs = ps.executeQuery()) { 715 while (rs.next()) { 716 String key = (String) keyCol.getFromResultSet(rs, 1); 717 Long longVal = (Long) longCol.getFromResultSet(rs, 2); 718 String string = (String) stringCol.getFromResultSet(rs, 3); 719 byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 4); 720 if (logger.isLogEnabled()) { 721 logger.logResultSet(rs, Arrays.asList(keyCol, longCol, stringCol, bytesCol)); 722 } 723 Object value; 724 if (string != null) { 725 value = string; 726 } else if (longVal != null) { 727 value = longVal; 728 } else { 729 value = bytes; 730 } 731 if (value != null) { 732 consumer.accept(key, value); 733 } 734 } 735 } 736 } 737 }); 738 } 739 740 protected String nParams(int n) { 741 StringBuilder sb = new StringBuilder(); 742 for (int i = 0; i < n; i++) { 743 if (i != 0) { 744 sb.append(", "); 745 } 746 sb.append('?'); 747 } 748 return sb.toString(); 749 } 750 751 protected Long ttlToStorage(long ttl) { 752 return ttl == 0 ? null : getTTLValue(ttl); 753 } 754 755 protected Long getTTLValue(long ttl) { 756 return Long.valueOf(System.currentTimeMillis() / 1000 + ttl); 757 } 758 759 @Override 760 public void put(String key, byte[] bytes) { 761 put(key, toStorage(bytes), 0); 762 } 763 764 @Override 765 public void put(String key, byte[] bytes, long ttl) { 766 put(key, toStorage(bytes), ttl); 767 } 768 769 @Override 770 public void put(String key, String string) { 771 put(key, toStorage(string), 0); 772 } 773 774 @Override 775 public void put(String key, String string, long ttl) { 776 put(key, toStorage(string), ttl); 777 } 778 779 @Override 780 public void put(String key, Long value) { 781 put(key, (Object) value, 0); 782 } 783 784 @Override 785 public void put(String key, Long value, long ttl) { 786 put(key, (Object) value, ttl); 787 } 788 789 protected void put(String key, Object value, long ttl) { 790 runWithConnection((Connection connection) -> { 791 if (value == null) { 792 // delete 793 try (PreparedStatement ps = connection.prepareStatement(deleteSQL)) { 794 setToPreparedStatement(deleteSQL, ps, keyCol, key); 795 ps.execute(); 796 } 797 } else { 798 // upsert (update or insert) 799 Long longValue = value instanceof Long ? (Long) value : null; 800 String stringValue = value instanceof String ? (String) value : null; 801 byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null; 802 Long ttlValue = ttlToStorage(ttl); 803 List<Column> psColumns = new ArrayList<>(); 804 List<Serializable> psValues = new ArrayList<>(); 805 String sql = dialect.getUpsertSql(Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol), 806 Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue), psColumns, psValues); 807 for (int retry = 0; retry < MAX_RETRY; retry++) { 808 try { 809 try (PreparedStatement ps = connection.prepareStatement(sql)) { 810 setToPreparedStatement(sql, ps, psColumns, psValues); 811 ps.execute(); 812 } 813 return; 814 } catch (SQLException e) { 815 if (!dialect.isConcurrentUpdateException(e)) { 816 throw e; 817 } 818 // Oracle MERGE can throw DUP_VAL_ON_INDEX (ORA-0001) or NO_DATA_FOUND (ORA-01403) 819 // in that case retry a few times 820 } 821 sleepBeforeRetry(); 822 } 823 throw new ConcurrentUpdateException("Failed to do atomic put for key: " + key); 824 } 825 }); 826 } 827 828 @Override 829 public boolean setTTL(String key, long ttl) { 830 return runWithConnection((Connection connection) -> { 831 try (PreparedStatement ps = connection.prepareStatement(setTTLSQL)) { 832 setToPreparedStatement(setTTLSQL, ps, ttlCol, ttlToStorage(ttl), keyCol, key); 833 int count = ps.executeUpdate(); 834 boolean set = count == 1; 835 return set; 836 } 837 }).booleanValue(); 838 } 839 840 @Override 841 public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) { 842 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 843 } 844 845 @Override 846 public boolean compareAndSet(String key, String expected, String value, long ttl) { 847 return compareAndSet(key, toStorage(expected), toStorage(value), ttl); 848 } 849 850 protected boolean compareAndSet(String key, Object expected, Object value, long ttl) { 851 return runWithConnection((Connection connection) -> { 852 if (expected == null && value == null) { 853 // check that document doesn't exist 854 try (PreparedStatement ps = connection.prepareStatement(existsSQL)) { 855 setToPreparedStatement(existsSQL, ps, keyCol, key); 856 try (ResultSet rs = ps.executeQuery()) { 857 boolean set = !rs.next(); 858 if (logger.isLogEnabled()) { 859 logger.log(" -> " + (set ? "NOP" : "FAILED")); 860 } 861 return set; 862 } 863 } 864 } else if (expected == null) { 865 // set value if no document already exists: regular insert 866 try (PreparedStatement ps = connection.prepareStatement(insertSQL)) { 867 Long longValue = value instanceof Long ? (Long) value : null; 868 String stringValue = value instanceof String ? (String) value : null; 869 byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null; 870 setToPreparedStatement(insertSQL, ps, Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol), 871 Arrays.asList(key, longValue, stringValue, bytesValue, ttlToStorage(ttl))); 872 boolean set; 873 try { 874 ps.executeUpdate(); 875 set = true; 876 } catch (SQLException e) { 877 if (!dialect.isConcurrentUpdateException(e)) { 878 throw e; 879 } 880 set = false; 881 } 882 if (logger.isLogEnabled()) { 883 logger.log(" -> " + (set ? "SET" : "FAILED")); 884 } 885 return set; 886 } 887 } else if (value == null) { 888 // delete if previous value exists 889 String sql; 890 Column col; 891 if (expected instanceof Long) { 892 sql = deleteIfLongSQL; 893 col = longCol; 894 } else if (expected instanceof String) { 895 sql = deleteIfStringSQL; 896 col = stringCol; 897 } else { 898 sql = deleteIfBytesSQL; 899 col = bytesCol; 900 } 901 try (PreparedStatement ps = connection.prepareStatement(sql)) { 902 setToPreparedStatement(sql, ps, keyCol, key, col, (Serializable) expected); 903 int count = ps.executeUpdate(); 904 boolean set = count == 1; 905 if (logger.isLogEnabled()) { 906 logger.log(" -> " + (set ? "DEL" : "FAILED")); 907 } 908 return set; 909 } 910 } else { 911 // replace if previous value exists 912 Column expectedCol = expected instanceof Long ? longCol 913 : expected instanceof String ? stringCol : bytesCol; 914 Column valueCol = value instanceof Long ? longCol : value instanceof String ? stringCol : bytesCol; 915 if (expectedCol != valueCol) { 916 throw new NuxeoException("TODO expected and value have different types"); 917 // TODO in that case we must set to null the old value column 918 } 919 String sql = "UPDATE " + tableName + " SET " + valueCol.getQuotedName() + " = ?, " + ttlColName 920 + " = ? WHERE " + keyColName + " = ? AND " + dialect.getQuotedNameForExpression(expectedCol) 921 + " = ?"; 922 try (PreparedStatement ps = connection.prepareStatement(sql)) { 923 setToPreparedStatement(sql, ps, Arrays.asList(valueCol, ttlCol, keyCol, expectedCol), 924 Arrays.asList((Serializable) value, ttlToStorage(ttl), key, (Serializable) expected)); 925 int count = ps.executeUpdate(); 926 boolean set = count == 1; 927 if (logger.isLogEnabled()) { 928 logger.log(" -> " + (set ? "SET" : "FAILED")); 929 } 930 return set; 931 } 932 } 933 }).booleanValue(); 934 } 935 936 @Override 937 public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR 938 return runWithConnection((Connection connection) -> { 939 for (int retry = 0; retry < MAX_RETRY; retry++) { 940 String updateReturningSql; 941 boolean useReturnResultSet = false; 942 if (dialect instanceof DialectPostgreSQL) { 943 updateReturningSql = updateReturningPostgreSQLSql; 944 } else if (dialect instanceof DialectOracle) { 945 updateReturningSql = updateReturningOracleSql; 946 useReturnResultSet = true; 947 } else if (dialect instanceof DialectSQLServer) { 948 updateReturningSql = updateReturningSQLServerSql; 949 } else { 950 updateReturningSql = null; 951 } 952 if (updateReturningSql != null) { 953 List<Column> psColumns = Arrays.asList(longCol, keyCol); 954 List<Serializable> psValues = Arrays.asList(Long.valueOf(delta), key); 955 try (PreparedStatement ps = connection.prepareStatement(updateReturningSql)) { 956 setToPreparedStatement(updateReturningSql, ps, psColumns, psValues); 957 if (useReturnResultSet) { 958 dialect.registerReturnParameter(ps, 3, longCol.getJdbcType()); 959 } 960 boolean hasResultSet; 961 if (useReturnResultSet) { 962 int count = ps.executeUpdate(); 963 hasResultSet = count > 0; 964 } else { 965 hasResultSet = true; 966 } 967 if (hasResultSet) { 968 ResultSet rs; 969 if (useReturnResultSet) { 970 rs = dialect.getReturnResultSet(ps); 971 } else { 972 rs = ps.executeQuery(); 973 } 974 try { 975 if (rs.next()) { 976 Long longValue = (Long) longCol.getFromResultSet(rs, 1); 977 // We may get NULL here, because if the value is an empty string 978 // a normal database would not match any row, but Oracle treats 979 // "" as NULL and we end up trying to increment the long field 980 // which is also NULL. 981 if (longValue == null) { 982 throw new NumberFormatException("Value is not a Long for key: " + key); 983 } 984 return longValue; 985 } 986 } finally { 987 rs.close(); 988 } 989 } 990 } 991 } 992 // the dialect doesn't support UPDATE RETURNING, or 993 // there was no row for this key, or 994 // the row didn't contain a long 995 // -> retry using a full transaction doing check + insert 996 // start transaction 997 connection.setAutoCommit(false); 998 try { 999 // check value 1000 Long currentLong; 1001 try (PreparedStatement ps = connection.prepareStatement(getLongSQL)) { 1002 setToPreparedStatement(getLongSQL, ps, keyCol, key); 1003 try (ResultSet rs = ps.executeQuery()) { 1004 if (rs.next()) { 1005 currentLong = (Long) longCol.getFromResultSet(rs, 1); 1006 if (logger.isLogEnabled()) { 1007 logger.logResultSet(rs, Arrays.asList(longCol)); 1008 } 1009 if (currentLong == null) { 1010 throw new NumberFormatException("Value is not a Long for key: " + key); 1011 } 1012 } else { 1013 currentLong = null; 1014 } 1015 } 1016 } 1017 if (currentLong == null) { 1018 // try insert 1019 try (PreparedStatement ps = connection.prepareStatement(insertLongSQL)) { 1020 setToPreparedStatement(insertLongSQL, ps, keyCol, key, longCol, Long.valueOf(delta)); 1021 try { 1022 ps.executeUpdate(); 1023 return delta; 1024 } catch (SQLException e) { 1025 if (!dialect.isConcurrentUpdateException(e)) { 1026 throw e; 1027 } 1028 // if concurrent update, retry 1029 } 1030 } 1031 } else { 1032 // update existing value 1033 Long newLong = Long.valueOf(currentLong.longValue() + delta); 1034 try (PreparedStatement ps = connection.prepareStatement(updateLongSQL)) { 1035 setToPreparedStatement(updateLongSQL, ps, longCol, newLong, keyCol, key, longCol, 1036 currentLong); 1037 int count = ps.executeUpdate(); 1038 if (count == 1) { 1039 return newLong; 1040 } 1041 // else the value changed... 1042 // concurrent update, retry 1043 } 1044 } 1045 } finally { 1046 connection.commit(); 1047 connection.setAutoCommit(true); 1048 } 1049 // concurrent update on insert or update, retry a few times 1050 sleepBeforeRetry(); 1051 } 1052 throw new ConcurrentUpdateException("Failed to do atomic addAndGet for key: " + key); 1053 }).longValue(); 1054 } 1055 1056 protected void sleepBeforeRetry() { 1057 try { 1058 Thread.sleep(5); 1059 } catch (InterruptedException e) { 1060 Thread.currentThread().interrupt(); 1061 throw new NuxeoException(e); 1062 } 1063 } 1064 1065 // works on any representation that can be converted to a Long 1066 protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR 1067 for (;;) { 1068 Object value = getObject(key); 1069 long result; 1070 if (value == null) { 1071 result = delta; 1072 } else { 1073 Long base = toLong(value); 1074 if (base == null) { 1075 throw new NumberFormatException("Value is not a Long for key: " + key); 1076 } 1077 result = base.longValue() + delta; 1078 } 1079 Object newValue = Long.valueOf(result); 1080 if (compareAndSet(key, value, newValue, 0)) { 1081 return result; 1082 } 1083 // else loop to try again 1084 } 1085 } 1086 1087 @Override 1088 public String toString() { 1089 return getClass().getSimpleName() + "(" + name + ")"; 1090 } 1091 1092}