001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql.kv;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
023import static org.apache.commons.lang3.StringUtils.isBlank;
024
025import java.io.Serializable;
026import java.nio.charset.CharacterCodingException;
027import java.sql.Connection;
028import java.sql.DatabaseMetaData;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.function.BiConsumer;
040import java.util.stream.Stream;
041
042import org.apache.commons.lang3.StringUtils;
043import org.apache.logging.log4j.LogManager;
044import org.apache.logging.log4j.Logger;
045import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.storage.sql.ColumnType;
048import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger;
049import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
050import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl;
051import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
052import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
053import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectPostgreSQL;
054import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectSQLServer;
055import org.nuxeo.runtime.RuntimeMessage;
056import org.nuxeo.runtime.RuntimeMessage.Level;
057import org.nuxeo.runtime.RuntimeMessage.Source;
058import org.nuxeo.runtime.api.Framework;
059import org.nuxeo.runtime.datasource.ConnectionHelper;
060import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
061import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
062
063/**
064 * SQL implementation of a Key/Value Store Provider.
065 * <p>
066 * The following configuration properties are available:
067 * <ul>
068 * <li>datasource: the datasource to use.
069 * <li>table: the table to use. The default is the Store name.
070 * </ul>
071 * If a namespace is specified, it is used as a table name suffix, otherwise of the store name.
072 * <p>
073 * This implementation uses a table with a KEY column (unique and not NULL), and for the value one of these three
074 * columns is used: LONG, STRING, BYTES. If possible LONG is used, then STRING, otherwise BYTES.
075 * <p>
076 * The TTL is stored as an expiration time (seconds since epoch) in its own column. Expiration is done by a thread
077 * running a cleanup DELETE query every 60 seconds.
078 *
079 * @since 10.10
080 */
081public class SQLKeyValueStore extends AbstractKeyValueStoreProvider {
082
083    private static final Logger log = LogManager.getLogger(SQLKeyValueStore.class);
084
085    /** Datasource configuration property. */
086    public static final String DATASOURCE_PROP = "datasource";
087
088    /** Table configuration property. Default is the store name. The namespace is also used for disambiguation. */
089    public static final String TABLE_PROP = "table";
090
091    /** Key column, a short string. */
092    public static final String KEY_COL = "key";
093
094    /** Long column, or NULL if the value is not representable as a Long. */
095    public static final String LONG_COL = "long";
096
097    /** String column, or NULL if the value is representable as a Long or not representable as a String. */
098    public static final String STRING_COL = "string";
099
100    /** Bytes column, or NULL if the value is representable as a Long or String. */
101    public static final String BYTES_COL = "bytes";
102
103    /** TTL column, holding expiration date in seconds since epoch, or NULL if there is no expiration. */
104    public static final String TTL_COL = "ttl";
105
106    protected static final int TTL_EXPIRATION_FREQUENCY_MS = 60_000; // 60 seconds
107
108    // maximum number of retries in case of concurrency
109    protected static final int MAX_RETRY = 5;
110
111    protected JDBCLogger logger;
112
113    protected String dataSourceName;
114
115    protected Dialect dialect;
116
117    protected TableImpl table;
118
119    protected Column keyCol;
120
121    protected Column longCol;
122
123    protected Column stringCol;
124
125    protected Column bytesCol;
126
127    protected Column ttlCol;
128
129    protected String tableName;
130
131    protected String keyColName;
132
133    protected String longColName;
134
135    protected String stringColName;
136
137    protected String bytesColName;
138
139    protected String ttlColName;
140
141    protected Thread ttlThread;
142
143    protected String getSQL;
144
145    protected String getMultiSQL;
146
147    protected String getLongSQL;
148
149    protected String deleteAllSQL;
150
151    protected String deleteSQL;
152
153    protected String deleteIfLongSQL;
154
155    protected String deleteIfStringSQL;
156
157    protected String deleteIfBytesSQL;
158
159    protected String expireSQL;
160
161    protected String keyStreamSQL;
162
163    protected String keyStreamPrefixSQL;
164
165    protected String setTTLSQL;
166
167    protected String existsSQL;
168
169    protected String insertSQL;
170
171    protected String insertLongSQL;
172
173    protected String updateLongSQL;
174
175    protected String updateReturningPostgreSQLSql;
176
177    protected String updateReturningOracleSql;
178
179    protected String updateReturningSQLServerSql;
180
181    @Override
182    public void initialize(KeyValueStoreDescriptor descriptor) {
183        super.initialize(descriptor);
184        logger = new JDBCLogger(name);
185        Map<String, String> properties = descriptor.properties;
186        dataSourceName = properties.get(DATASOURCE_PROP);
187        if (StringUtils.isAllBlank(dataSourceName)) {
188            throw new NuxeoException("Missing " + DATASOURCE_PROP + " property in configuration");
189        }
190        String tableProp = properties.get(TABLE_PROP);
191        String namespace = descriptor.namespace;
192        String tbl;
193        if (isBlank(tableProp)) {
194            tbl = defaultIfBlank(namespace, name).trim();
195        } else if (isBlank(namespace)) {
196            tbl = tableProp.trim();
197        } else {
198            tbl = tableProp.trim() + "_" + namespace.trim();
199        }
200        // check connection, get dialect and create table if needed
201        try (Connection connection = getConnection()) {
202            dialect = Dialect.createDialect(connection, null);
203            getTable(connection, tbl);
204        } catch (SQLException e) {
205            throw new NuxeoException(e);
206        }
207        prepareSQL();
208        startTTLThread();
209    }
210
211    @Override
212    public void close() {
213        stopTTLThread();
214    }
215
216    protected void getTable(Connection connection, String tbl) throws SQLException {
217        String tablePhysicalName = dialect.getTableName(tbl);
218        table = new TableImpl(dialect, tablePhysicalName, tablePhysicalName);
219        keyCol = addColumn(KEY_COL, ColumnType.SYSNAME);
220        keyCol.setPrimary(true);
221        keyCol.setNullable(false);
222        longCol = addColumn(LONG_COL, ColumnType.LONG);
223        stringCol = addColumn(STRING_COL, ColumnType.CLOB);
224        bytesCol = addColumn(BYTES_COL, ColumnType.BLOB);
225        ttlCol = addColumn(TTL_COL, ColumnType.LONG);
226        table.addIndex(TTL_COL);
227        tableName = table.getQuotedName();
228        keyColName = keyCol.getQuotedName();
229        longColName = longCol.getQuotedName();
230        stringColName = stringCol.getQuotedName();
231        bytesColName = bytesCol.getQuotedName();
232        ttlColName = ttlCol.getQuotedName();
233        if (!tableExists(connection)) {
234            createTable(connection);
235        }
236        checkColumns(connection);
237    }
238
239    protected Column addColumn(String columnName, ColumnType type) {
240        String colPhysicalName = dialect.getColumnName(columnName);
241        Column column = new Column(table, colPhysicalName, type, columnName);
242        return table.addColumn(column.getKey(), column);
243    }
244
245    protected void prepareSQL() {
246        getSQL = "SELECT " + longColName + ", " + stringColName + ", " + bytesColName + " FROM " + tableName + " WHERE "
247                + keyColName + " = ?";
248        getMultiSQL = "SELECT " + keyColName + ", " + longColName + ", " + stringColName + ", " + bytesColName
249                + " FROM " + tableName + " WHERE " + keyColName + " IN (%s)";
250        getLongSQL = "SELECT " + longColName + " FROM " + tableName + " WHERE " + keyColName + " = ?";
251        deleteAllSQL = "DELETE FROM " + tableName;
252        deleteSQL = "DELETE FROM " + tableName + " WHERE " + keyColName + " = ?";
253        deleteIfLongSQL = deleteSQL + " AND " + longColName + " = ?";
254        deleteIfStringSQL = deleteSQL + " AND " + dialect.getQuotedNameForExpression(stringCol) + " = ?";
255        deleteIfBytesSQL = deleteSQL + " AND " + bytesColName + " = ?";
256        expireSQL = "DELETE FROM " + tableName + " WHERE " + ttlColName + " < ?";
257        keyStreamSQL = "SELECT " + keyColName + " FROM " + tableName;
258        keyStreamPrefixSQL = keyStreamSQL + " WHERE " + keyColName + " LIKE ?";
259        String esc = dialect.getLikeEscaping();
260        if (esc != null) {
261            keyStreamPrefixSQL += esc;
262        }
263        setTTLSQL = "UPDATE " + tableName + " SET " + ttlColName + " = ? WHERE " + keyColName + " = ?";
264        existsSQL = "SELECT 1 FROM " + tableName + " WHERE " + keyColName + " = ?";
265        insertSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ", " + stringColName + ", "
266                + bytesColName + ", " + ttlColName + ") VALUES (?, ?, ?, ?, ?)";
267        insertLongSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ") VALUES (?, ?)";
268        updateLongSQL = "UPDATE " + tableName + " SET " + longColName + " = ? WHERE " + keyColName + " = ? AND "
269                + longColName + " = ?";
270        updateReturningPostgreSQLSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName
271                + " + ? WHERE " + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName
272                + " IS NULL RETURNING " + longColName;
273        updateReturningOracleSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName + " + ? WHERE "
274                + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName + " IS NULL RETURNING "
275                + longColName + " INTO ?";
276        updateReturningSQLServerSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName
277                + " + ? OUTPUT INSERTED." + longColName + " WHERE " + keyColName + " = ? AND " + stringColName
278                + " IS NULL AND " + bytesColName + " IS NULL";
279    }
280
281    protected void startTTLThread() {
282        ttlThread = new Thread(this::expireTTLThread);
283        ttlThread.setName("Nuxeo-Expire-KeyValueStore-" + name);
284        ttlThread.setDaemon(true);
285        ttlThread.start();
286    }
287
288    protected void stopTTLThread() {
289        if (ttlThread == null) {
290            return;
291        }
292        ttlThread.interrupt();
293        ttlThread = null;
294    }
295
296    /**
297     * Runs in a thread to do TTL expiration.
298     */
299    protected void expireTTLThread() {
300        log.debug("Starting TTL expiration thread for KeyValueStore: {}", name);
301        try {
302            // for the initial wait, use a random duration to avoid thundering herd problems
303            Thread.sleep((long) (TTL_EXPIRATION_FREQUENCY_MS * Math.random()));
304            for (;;) {
305                if (Thread.currentThread().isInterrupted()) {
306                    break;
307                }
308                Thread.sleep(TTL_EXPIRATION_FREQUENCY_MS);
309                expireTTLOnce();
310            }
311        } catch (InterruptedException e) {
312            Thread.currentThread().interrupt();
313        }
314        log.debug("Stopping TTL expiration thread for KeyValueStore: {}", name);
315    }
316
317    protected String escapeLike(String prefix) {
318        return prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_");
319    }
320
321    /**
322     * Canonicalizes value for the database: use a String or a Long if possible.
323     */
324    protected Object toStorage(Object value) {
325        // try to convert byte array to UTF-8 string
326        if (value instanceof byte[]) {
327            try {
328                value = bytesToString((byte[]) value);
329            } catch (CharacterCodingException e) {
330                // ignore
331            }
332        }
333        // try to convert String to Long
334        if (value instanceof String) {
335            try {
336                value = Long.valueOf((String) value);
337            } catch (NumberFormatException e) {
338                // ignore
339            }
340        }
341        return value;
342    }
343
344    protected byte[] toBytes(Object value) {
345        if (value instanceof String) {
346            return ((String) value).getBytes(UTF_8);
347        } else if (value instanceof Long) {
348            return ((Long) value).toString().getBytes(UTF_8);
349        } else if (value instanceof byte[]) {
350            return (byte[]) value;
351        }
352        return null;
353    }
354
355    protected String toString(Object value) {
356        if (value instanceof String) {
357            return (String) value;
358        } else if (value instanceof Long) {
359            return ((Long) value).toString();
360        } else if (value instanceof byte[]) {
361            byte[] bytes = (byte[]) value;
362            try {
363                return bytesToString(bytes);
364            } catch (CharacterCodingException e) {
365                return null;
366            }
367        }
368        return null;
369    }
370
371    protected Long toLong(Object value) throws NumberFormatException { // NOSONAR
372        if (value instanceof Long) {
373            return (Long) value;
374        } else if (value instanceof String) {
375            return Long.valueOf((String) value);
376        } else if (value instanceof byte[]) {
377            byte[] bytes = (byte[]) value;
378            return bytesToLong(bytes);
379        }
380        return null;
381    }
382
383    protected Connection getConnection() throws SQLException {
384        // open connection in noSharing mode
385        return ConnectionHelper.getConnection(dataSourceName, true);
386    }
387
388    protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column, Serializable value)
389            throws SQLException {
390        setToPreparedStatement(sql, ps, Arrays.asList(column), Arrays.asList(value));
391    }
392
393    protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1,
394            Column column2, Serializable value2) throws SQLException {
395        setToPreparedStatement(sql, ps, Arrays.asList(column1, column2), Arrays.asList(value1, value2));
396    }
397
398    protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1,
399            Column column2, Serializable value2, Column column3, Serializable value3) throws SQLException {
400        setToPreparedStatement(sql, ps, Arrays.asList(column1, column2, column3),
401                Arrays.asList(value1, value2, value3));
402    }
403
404    protected void setToPreparedStatement(String sql, PreparedStatement ps, List<Column> columns,
405            List<? extends Serializable> values) throws SQLException {
406        if (columns.size() != values.size()) {
407            throw new IllegalStateException();
408        }
409        for (int i = 0; i < columns.size(); i++) {
410            columns.get(i).setToPreparedStatement(ps, i + 1, values.get(i));
411        }
412        if (logger.isLogEnabled()) {
413            logger.logSQL(sql, values);
414        }
415    }
416
417    protected boolean tableExists(Connection connection) throws SQLException {
418        DatabaseMetaData metadata = connection.getMetaData();
419        String schemaName = getDatabaseSchemaName(connection);
420        try (ResultSet rs = metadata.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" })) {
421            boolean exists = rs.next();
422            log.trace("Checking if table {} exists: {}", table.getPhysicalName(), exists);
423            return exists;
424        }
425    }
426
427    protected void createTable(Connection connection) throws SQLException {
428        try (Statement st = connection.createStatement()) {
429            String createSQL = table.getCreateSql();
430            logger.log(createSQL);
431            st.execute(createSQL);
432            for (String sql : table.getPostCreateSqls(null)) {
433                logger.log(sql);
434                st.execute(sql);
435            }
436        }
437    }
438
439    /**
440     * Checks that columns have expected JDBC types.
441     */
442    protected void checkColumns(Connection connection) throws SQLException {
443        DatabaseMetaData metadata = connection.getMetaData();
444        String schemaName = getDatabaseSchemaName(connection);
445        try (ResultSet rs = metadata.getColumns(null, schemaName, table.getPhysicalName(), "%")) {
446            while (rs.next()) {
447                String schema = rs.getString("TABLE_SCHEM");
448                if (schema != null) { // null for MySQL, doh!
449                    if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
450                        // H2 returns some system tables (locks)
451                        continue;
452                    }
453                }
454                String columnName = rs.getString("COLUMN_NAME").toUpperCase();
455                int actual = rs.getInt("DATA_TYPE");
456                String actualName = rs.getString("TYPE_NAME");
457                int actualSize = rs.getInt("COLUMN_SIZE");
458                Column column = null;
459                for (Column c : table.getColumns()) {
460                    String upperName = c.getPhysicalName().toUpperCase();
461                    if (upperName.equals(columnName)) {
462                        column = c;
463                    }
464                }
465                if (column == null) {
466                    log.error("Column not found: {} in table: {}", columnName, tableName);
467                    continue;
468                }
469                String message = column.checkJdbcType(actual, actualName, actualSize);
470                if (message != null) {
471                    log.error(message);
472                    Framework.getRuntime()
473                             .getMessageHandler()
474                             .addMessage(
475                                     new RuntimeMessage(Level.ERROR, message, Source.CODE, this.getClass().getName()));
476                }
477            }
478        }
479    }
480
481    protected String getDatabaseSchemaName(Connection connection) throws SQLException {
482        String schemaName = null;
483        if (dialect instanceof DialectOracle) {
484            try (Statement st = connection.createStatement()) {
485                String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL";
486                logger.log(sql);
487                try (ResultSet rs = st.executeQuery(sql)) {
488                    if (rs.next()) {
489                        schemaName = rs.getString(1);
490                        logger.log("  -> schema: " + schemaName);
491                    }
492                }
493            }
494        }
495        return schemaName;
496    }
497
498    protected void expireTTLOnce() {
499        try (Connection connection = getConnection(); //
500                PreparedStatement ps = connection.prepareStatement(expireSQL)) {
501            Long ttlDeadline = getTTLValue(0);
502            setToPreparedStatement(expireSQL, ps, ttlCol, ttlDeadline);
503            int count = ps.executeUpdate();
504            logger.logCount(count);
505        } catch (SQLException e) {
506            if (dialect.isConcurrentUpdateException(e)) {
507                // ignore
508                return;
509            }
510            log.debug("Exception during TTL expiration", e);
511        }
512    }
513
514    @Override
515    public void clear() {
516        try (Connection connection = getConnection(); //
517                Statement st = connection.createStatement()) {
518            logger.log(deleteAllSQL);
519            st.execute(deleteAllSQL);
520        } catch (SQLException e) {
521            throw new NuxeoException(e);
522        }
523    }
524
525    @Override
526    public Stream<String> keyStream() {
527        return keyStream(null);
528    }
529
530    @Override
531    public Stream<String> keyStream(String prefix) {
532        try (Connection connection = getConnection()) {
533            return keyStream(connection, prefix);
534        } catch (SQLException e) {
535            throw new NuxeoException(e);
536        }
537    }
538
539    protected Stream<String> keyStream(Connection connection, String prefix) throws SQLException {
540        String sql = prefix == null ? keyStreamSQL : keyStreamPrefixSQL;
541        List<String> keys = new ArrayList<>();
542        try (PreparedStatement ps = connection.prepareStatement(sql)) {
543            if (prefix != null) {
544                String like = escapeLike(prefix) + "%";
545                setToPreparedStatement(sql, ps, keyCol, like);
546            }
547            try (ResultSet rs = ps.executeQuery()) {
548                while (rs.next()) {
549                    String key = (String) keyCol.getFromResultSet(rs, 1);
550                    keys.add(key);
551                }
552            }
553        }
554        return keys.stream();
555    }
556
557    @Override
558    public byte[] get(String key) {
559        Object value = getObject(key);
560        if (value == null) {
561            return null;
562        }
563        byte[] bytes = toBytes(value);
564        if (bytes != null) {
565            return bytes;
566        }
567        throw new UnsupportedOperationException(value.getClass().getName());
568    }
569
570    @Override
571    public String getString(String key) {
572        Object value = getObject(key);
573        if (value == null) {
574            return null;
575        }
576        String string = toString(value);
577        if (string != null) {
578            return string;
579        }
580        throw new IllegalArgumentException("Value is not a String for key: " + key);
581    }
582
583    @Override
584    public Long getLong(String key) throws NumberFormatException { // NOSONAR
585        Object value = getObject(key);
586        if (value == null) {
587            return null;
588        }
589        Long longValue = toLong(value);
590        if (longValue != null) {
591            return longValue;
592        }
593        throw new NumberFormatException("Value is not a Long for key: " + key);
594    }
595
596    @Override
597    public Map<String, byte[]> get(Collection<String> keys) {
598        Map<String, byte[]> map = new HashMap<>(keys.size());
599        getObjects(keys, (key, value) -> {
600            byte[] bytes = toBytes(value);
601            if (bytes == null) {
602                throw new UnsupportedOperationException(String.format("Value of class %s is not supported for key: %s",
603                        value.getClass().getName(), key));
604            }
605            map.put(key, bytes);
606        });
607        return map;
608    }
609
610    @Override
611    public Map<String, String> getStrings(Collection<String> keys) {
612        Map<String, String> map = new HashMap<>(keys.size());
613        getObjects(keys, (key, value) -> {
614            String string = toString(value);
615            if (string == null) {
616                throw new IllegalArgumentException("Value is not a String for key: " + key);
617            }
618            map.put(key, string);
619        });
620        return map;
621    }
622
623    @Override
624    public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR
625        Map<String, Long> map = new HashMap<>(keys.size());
626        getObjects(keys, (key, value) -> {
627            Long longValue = toLong(value);
628            if (longValue == null) {
629                throw new IllegalArgumentException("Value is not a Long for key: " + key);
630            }
631            map.put(key, longValue);
632        });
633        return map;
634    }
635
636    protected Object getObject(String key) {
637        try (Connection connection = getConnection(); //
638                PreparedStatement ps = connection.prepareStatement(getSQL)) {
639            setToPreparedStatement(getSQL, ps, keyCol, key);
640            try (ResultSet rs = ps.executeQuery()) {
641                if (!rs.next()) {
642                    if (logger.isLogEnabled()) {
643                        logger.log("  -> null");
644                    }
645                    return null;
646                }
647                Long longValue = (Long) longCol.getFromResultSet(rs, 1);
648                String string = (String) stringCol.getFromResultSet(rs, 2);
649                byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 3);
650                if (logger.isLogEnabled()) {
651                    logger.logResultSet(rs, Arrays.asList(longCol, stringCol, bytesCol));
652                }
653                if (string != null) {
654                    return string;
655                } else if (longValue != null) {
656                    return longValue;
657                } else {
658                    return bytes;
659                }
660            }
661        } catch (SQLException e) {
662            throw new NuxeoException(e);
663        }
664    }
665
666    protected void getObjects(Collection<String> keys, BiConsumer<String, Object> consumer) {
667        if (keys.isEmpty()) {
668            return;
669        }
670        String sql = String.format(getMultiSQL, nParams(keys.size()));
671        logger.logSQL(sql, keys);
672        try (Connection connection = getConnection(); //
673                PreparedStatement ps = connection.prepareStatement(sql)) {
674            int i = 1;
675            for (String key : keys) {
676                keyCol.setToPreparedStatement(ps, i++, key);
677            }
678            try (ResultSet rs = ps.executeQuery()) {
679                while (rs.next()) {
680                    String key = (String) keyCol.getFromResultSet(rs, 1);
681                    Long longVal = (Long) longCol.getFromResultSet(rs, 2);
682                    String string = (String) stringCol.getFromResultSet(rs, 3);
683                    byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 4);
684                    if (logger.isLogEnabled()) {
685                        logger.logResultSet(rs, Arrays.asList(keyCol, longCol, stringCol, bytesCol));
686                    }
687                    Object value;
688                    if (string != null) {
689                        value = string;
690                    } else if (longVal != null) {
691                        value = longVal;
692                    } else {
693                        value = bytes;
694                    }
695                    if (value != null) {
696                        consumer.accept(key, value);
697                    }
698                }
699            }
700        } catch (SQLException e) {
701            throw new NuxeoException(e);
702        }
703    }
704
705    protected String nParams(int n) {
706        StringBuilder sb = new StringBuilder();
707        for (int i = 0; i < n; i++) {
708            if (i != 0) {
709                sb.append(", ");
710            }
711            sb.append('?');
712        }
713        return sb.toString();
714    }
715
716    protected Long ttlToStorage(long ttl) {
717        return ttl == 0 ? null : getTTLValue(ttl);
718    }
719
720    protected Long getTTLValue(long ttl) {
721        return Long.valueOf(System.currentTimeMillis() / 1000 + ttl);
722    }
723
724    @Override
725    public void put(String key, byte[] bytes) {
726        put(key, toStorage(bytes), 0);
727    }
728
729    @Override
730    public void put(String key, byte[] bytes, long ttl) {
731        put(key, toStorage(bytes), ttl);
732    }
733
734    @Override
735    public void put(String key, String string) {
736        put(key, toStorage(string), 0);
737    }
738
739    @Override
740    public void put(String key, String string, long ttl) {
741        put(key, toStorage(string), ttl);
742    }
743
744    @Override
745    public void put(String key, Long value) {
746        put(key, (Object) value, 0);
747    }
748
749    @Override
750    public void put(String key, Long value, long ttl) {
751        put(key, (Object) value, ttl);
752    }
753
754    protected void put(String key, Object value, long ttl) {
755        try (Connection connection = getConnection()) {
756            if (value == null) {
757                // delete
758                try (PreparedStatement ps = connection.prepareStatement(deleteSQL)) {
759                    setToPreparedStatement(deleteSQL, ps, keyCol, key);
760                    ps.execute();
761                }
762            } else {
763                // upsert (update or insert)
764                Long longValue = value instanceof Long ? (Long) value : null;
765                String stringValue = value instanceof String ? (String) value : null;
766                byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null;
767                Long ttlValue = ttlToStorage(ttl);
768                List<Column> psColumns = new ArrayList<>();
769                List<Serializable> psValues = new ArrayList<>();
770                String sql = dialect.getUpsertSql(Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol),
771                        Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue), psColumns, psValues);
772                for (int retry = 0; retry < MAX_RETRY; retry++) {
773                    try {
774                        try (PreparedStatement ps = connection.prepareStatement(sql)) {
775                            setToPreparedStatement(sql, ps, psColumns, psValues);
776                            ps.execute();
777                        }
778                        return;
779                    } catch (SQLException e) {
780                        if (!dialect.isConcurrentUpdateException(e)) {
781                            throw e;
782                        }
783                        // Oracle MERGE can throw DUP_VAL_ON_INDEX (ORA-0001) or NO_DATA_FOUND (ORA-01403)
784                        // in that case retry a few times
785                    }
786                    sleepBeforeRetry();
787                }
788                throw new ConcurrentUpdateException("Failed to do atomic put for key: " + key);
789            }
790        } catch (SQLException e) {
791            throw new NuxeoException(e);
792        }
793    }
794
795    @Override
796    public boolean setTTL(String key, long ttl) {
797        try (Connection connection = getConnection(); //
798                PreparedStatement ps = connection.prepareStatement(setTTLSQL)) {
799            setToPreparedStatement(setTTLSQL, ps, ttlCol, ttlToStorage(ttl), keyCol, key);
800            int count = ps.executeUpdate();
801            boolean set = count == 1;
802            return set;
803        } catch (SQLException e) {
804            throw new NuxeoException(e);
805        }
806    }
807
808    @Override
809    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
810        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
811    }
812
813    @Override
814    public boolean compareAndSet(String key, String expected, String value, long ttl) {
815        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
816    }
817
818    protected boolean compareAndSet(String key, Object expected, Object value, long ttl) {
819        try (Connection connection = getConnection()) {
820            if (expected == null && value == null) {
821                // check that document doesn't exist
822                try (PreparedStatement ps = connection.prepareStatement(existsSQL)) {
823                    setToPreparedStatement(existsSQL, ps, keyCol, key);
824                    try (ResultSet rs = ps.executeQuery()) {
825                        boolean set = !rs.next();
826                        if (logger.isLogEnabled()) {
827                            logger.log("  -> " + (set ? "NOP" : "FAILED"));
828                        }
829                        return set;
830                    }
831                }
832            } else if (expected == null) {
833                // set value if no document already exists: regular insert detecting duplicate row
834                Long longValue = value instanceof Long ? (Long) value : null;
835                String stringValue = value instanceof String ? (String) value : null;
836                byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null;
837                Long ttlValue = ttlToStorage(ttl);
838                List<Column> psColumns = new ArrayList<>();
839                List<Serializable> psValues = new ArrayList<>();
840                String insertOnConflictDoNothingSql = dialect.getInsertOnConflictDoNothingSql(
841                        Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol),
842                        Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue), psColumns, psValues);
843                boolean set;
844                if (insertOnConflictDoNothingSql != null) {
845                    try (PreparedStatement ps = connection.prepareStatement(insertOnConflictDoNothingSql)) {
846                        setToPreparedStatement(insertOnConflictDoNothingSql, ps, psColumns, psValues);
847                        int count = ps.executeUpdate();
848                        set = count == 1;
849                    }
850                } else {
851                    try (PreparedStatement ps = connection.prepareStatement(insertSQL)) {
852                        setToPreparedStatement(insertSQL, ps,
853                                Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol),
854                                Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue));
855                        try {
856                            ps.executeUpdate();
857                            set = true;
858                        } catch (SQLException e) {
859                            if (!dialect.isConcurrentUpdateException(e)) {
860                                throw e;
861                            }
862                            set = false;
863                        }
864                    }
865                }
866                if (logger.isLogEnabled()) {
867                    logger.log("  -> " + (set ? "SET" : "FAILED"));
868                }
869                return set;
870            } else if (value == null) {
871                // delete if previous value exists
872                String sql;
873                Column col;
874                if (expected instanceof Long) {
875                    sql = deleteIfLongSQL;
876                    col = longCol;
877                } else if (expected instanceof String) {
878                    sql = deleteIfStringSQL;
879                    col = stringCol;
880                } else {
881                    sql = deleteIfBytesSQL;
882                    col = bytesCol;
883                }
884                try (PreparedStatement ps = connection.prepareStatement(sql)) {
885                    setToPreparedStatement(sql, ps, keyCol, key, col, (Serializable) expected);
886                    int count = ps.executeUpdate();
887                    boolean set = count == 1;
888                    if (logger.isLogEnabled()) {
889                        logger.log("  -> " + (set ? "DEL" : "FAILED"));
890                    }
891                    return set;
892                }
893            } else {
894                // replace if previous value exists
895                Column expectedCol = expected instanceof Long ? longCol
896                        : expected instanceof String ? stringCol : bytesCol;
897                Column valueCol = value instanceof Long ? longCol : value instanceof String ? stringCol : bytesCol;
898                if (expectedCol != valueCol) {
899                    throw new NuxeoException("TODO expected and value have different types");
900                    // TODO in that case we must set to null the old value column
901                }
902                // Prevent ORA-24816 by setting the long value at the end
903                String sql = "UPDATE " + tableName + " SET " + ttlColName + " = ?, " + valueCol.getQuotedName()
904                        + " = ? WHERE " + keyColName + " = ? AND " + dialect.getQuotedNameForExpression(expectedCol)
905                        + " = ?";
906                try (PreparedStatement ps = connection.prepareStatement(sql)) {
907                    setToPreparedStatement(sql, ps, Arrays.asList(ttlCol, valueCol, keyCol, expectedCol),
908                            Arrays.asList(ttlToStorage(ttl), (Serializable) value, key, (Serializable) expected));
909                    int count = ps.executeUpdate();
910                    boolean set = count == 1;
911                    if (logger.isLogEnabled()) {
912                        logger.log("  -> " + (set ? "SET" : "FAILED"));
913                    }
914                    return set;
915                }
916            }
917        } catch (SQLException e) {
918            throw new NuxeoException(e);
919        }
920    }
921
922    @Override
923    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
924        try (Connection connection = getConnection()) {
925            for (int retry = 0; retry < MAX_RETRY; retry++) {
926                String updateReturningSql;
927                boolean useReturnResultSet = false;
928                if (dialect instanceof DialectPostgreSQL) {
929                    updateReturningSql = updateReturningPostgreSQLSql;
930                } else if (dialect instanceof DialectOracle) {
931                    updateReturningSql = updateReturningOracleSql;
932                    useReturnResultSet = true;
933                } else if (dialect instanceof DialectSQLServer) {
934                    updateReturningSql = updateReturningSQLServerSql;
935                } else {
936                    updateReturningSql = null;
937                }
938                if (updateReturningSql != null) {
939                    List<Column> psColumns = Arrays.asList(longCol, keyCol);
940                    List<Serializable> psValues = Arrays.asList(Long.valueOf(delta), key);
941                    try (PreparedStatement ps = connection.prepareStatement(updateReturningSql)) {
942                        setToPreparedStatement(updateReturningSql, ps, psColumns, psValues);
943                        if (useReturnResultSet) {
944                            dialect.registerReturnParameter(ps, 3, longCol.getJdbcType());
945                        }
946                        boolean hasResultSet;
947                        if (useReturnResultSet) {
948                            int count = ps.executeUpdate();
949                            hasResultSet = count > 0;
950                        } else {
951                            hasResultSet = true;
952                        }
953                        if (hasResultSet) {
954                            ResultSet rs;
955                            if (useReturnResultSet) {
956                                rs = dialect.getReturnResultSet(ps);
957                            } else {
958                                rs = ps.executeQuery();
959                            }
960                            try {
961                                if (rs.next()) {
962                                    Long longValue = (Long) longCol.getFromResultSet(rs, 1);
963                                    // We may get NULL here, because if the value is an empty string
964                                    // a normal database would not match any row, but Oracle treats
965                                    // "" as NULL and we end up trying to increment the long field
966                                    // which is also NULL.
967                                    if (longValue == null) {
968                                        throw new NumberFormatException("Value is not a Long for key: " + key);
969                                    }
970                                    return longValue;
971                                }
972                            } finally {
973                                rs.close();
974                            }
975                        }
976                    }
977                }
978                // the dialect doesn't support UPDATE RETURNING, or
979                // there was no row for this key, or
980                // the row didn't contain a long
981                // -> retry using a full transaction doing check + insert
982                // start transaction
983                connection.setAutoCommit(false);
984                try {
985                    // check value
986                    Long currentLong;
987                    try (PreparedStatement ps = connection.prepareStatement(getLongSQL)) {
988                        setToPreparedStatement(getLongSQL, ps, keyCol, key);
989                        try (ResultSet rs = ps.executeQuery()) {
990                            if (rs.next()) {
991                                currentLong = (Long) longCol.getFromResultSet(rs, 1);
992                                if (logger.isLogEnabled()) {
993                                    logger.logResultSet(rs, Arrays.asList(longCol));
994                                }
995                                if (currentLong == null) {
996                                    throw new NumberFormatException("Value is not a Long for key: " + key);
997                                }
998                            } else {
999                                currentLong = null;
1000                            }
1001                        }
1002                    }
1003                    if (currentLong == null) {
1004                        // try insert
1005                        try (PreparedStatement ps = connection.prepareStatement(insertLongSQL)) {
1006                            setToPreparedStatement(insertLongSQL, ps, keyCol, key, longCol, Long.valueOf(delta));
1007                            try {
1008                                ps.executeUpdate();
1009                                return delta;
1010                            } catch (SQLException e) {
1011                                if (!dialect.isConcurrentUpdateException(e)) {
1012                                    throw e;
1013                                }
1014                                // if concurrent update, retry
1015                            }
1016                        }
1017                    } else {
1018                        // update existing value
1019                        Long newLong = Long.valueOf(currentLong.longValue() + delta);
1020                        try (PreparedStatement ps = connection.prepareStatement(updateLongSQL)) {
1021                            setToPreparedStatement(updateLongSQL, ps, longCol, newLong, keyCol, key, longCol,
1022                                    currentLong);
1023                            int count = ps.executeUpdate();
1024                            if (count == 1) {
1025                                return newLong;
1026                            }
1027                            // else the value changed...
1028                            // concurrent update, retry
1029                        }
1030                    }
1031                } finally {
1032                    connection.commit();
1033                    connection.setAutoCommit(true);
1034                }
1035                // concurrent update on insert or update, retry a few times
1036                sleepBeforeRetry();
1037            }
1038            throw new ConcurrentUpdateException("Failed to do atomic addAndGet for key: " + key);
1039        } catch (SQLException e) {
1040            throw new NuxeoException(e);
1041        }
1042    }
1043
1044    protected void sleepBeforeRetry() {
1045        try {
1046            Thread.sleep(5);
1047        } catch (InterruptedException e) {
1048            Thread.currentThread().interrupt();
1049            throw new NuxeoException(e);
1050        }
1051    }
1052
1053    // works on any representation that can be converted to a Long
1054    protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR
1055        for (;;) {
1056            Object value = getObject(key);
1057            long result;
1058            if (value == null) {
1059                result = delta;
1060            } else {
1061                Long base = toLong(value);
1062                if (base == null) {
1063                    throw new NumberFormatException("Value is not a Long for key: " + key);
1064                }
1065                result = base.longValue() + delta;
1066            }
1067            Object newValue = Long.valueOf(result);
1068            if (compareAndSet(key, value, newValue, 0)) {
1069                return result;
1070            }
1071            // else loop to try again
1072        }
1073    }
1074
1075    @Override
1076    public String toString() {
1077        return getClass().getSimpleName() + "(" + name + ")";
1078    }
1079
1080}