001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.storage.sql.kv;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
023import static org.apache.commons.lang3.StringUtils.isBlank;
024
025import java.io.Serializable;
026import java.nio.charset.CharacterCodingException;
027import java.sql.Connection;
028import java.sql.DatabaseMetaData;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.function.BiConsumer;
040import java.util.stream.Stream;
041
042import org.apache.commons.lang3.StringUtils;
043import org.apache.logging.log4j.LogManager;
044import org.apache.logging.log4j.Logger;
045import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
046import org.nuxeo.ecm.core.api.NuxeoException;
047import org.nuxeo.ecm.core.storage.sql.ColumnType;
048import org.nuxeo.ecm.core.storage.sql.jdbc.JDBCLogger;
049import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
050import org.nuxeo.ecm.core.storage.sql.jdbc.db.TableImpl;
051import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
052import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
053import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectPostgreSQL;
054import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectSQLServer;
055import org.nuxeo.runtime.api.Framework;
056import org.nuxeo.runtime.datasource.ConnectionHelper;
057import org.nuxeo.runtime.kv.AbstractKeyValueStoreProvider;
058import org.nuxeo.runtime.kv.KeyValueStoreDescriptor;
059import org.nuxeo.runtime.transaction.TransactionHelper;
060
061/**
062 * SQL implementation of a Key/Value Store Provider.
063 * <p>
064 * The following configuration properties are available:
065 * <ul>
066 * <li>datasource: the datasource to use.
067 * <li>table: the table to use. The default is the Store name.
068 * </ul>
069 * If a namespace is specified, it is used as a table name suffix, otherwise of the store name.
070 * <p>
071 * This implementation uses a table with a KEY column (unique and not NULL), and for the value one of these three
072 * columns is used: LONG, STRING, BYTES. If possible LONG is used, then STRING, otherwise BYTES.
073 * <p>
074 * The TTL is stored as an expiration time (seconds since epoch) in its own column. Expiration is done by a thread
075 * running a cleanup DELETE query every 60 seconds.
076 *
077 * @since 10.10
078 */
079public class SQLKeyValueStore extends AbstractKeyValueStoreProvider {
080
081    private static final Logger log = LogManager.getLogger(SQLKeyValueStore.class);
082
083    /** Datasource configuration property. */
084    public static final String DATASOURCE_PROP = "datasource";
085
086    /** Table configuration property. Default is the store name. The namespace is also used for disambiguation. */
087    public static final String TABLE_PROP = "table";
088
089    /** Key column, a short string. */
090    public static final String KEY_COL = "key";
091
092    /** Long column, or NULL if the value is not representable as a Long. */
093    public static final String LONG_COL = "long";
094
095    /** String column, or NULL if the value is representable as a Long or not representable as a String. */
096    public static final String STRING_COL = "string";
097
098    /** Bytes column, or NULL if the value is representable as a Long or String. */
099    public static final String BYTES_COL = "bytes";
100
101    /** TTL column, holding expiration date in seconds since epoch, or NULL if there is no expiration. */
102    public static final String TTL_COL = "ttl";
103
104    protected static final int TTL_EXPIRATION_FREQUENCY_MS = 60_000; // 60 seconds
105
106    // maximum number of retries in case of concurrency
107    protected static final int MAX_RETRY = 5;
108
109    protected JDBCLogger logger;
110
111    protected String dataSourceName;
112
113    protected Dialect dialect;
114
115    protected TableImpl table;
116
117    protected Column keyCol;
118
119    protected Column longCol;
120
121    protected Column stringCol;
122
123    protected Column bytesCol;
124
125    protected Column ttlCol;
126
127    protected String tableName;
128
129    protected String keyColName;
130
131    protected String longColName;
132
133    protected String stringColName;
134
135    protected String bytesColName;
136
137    protected String ttlColName;
138
139    protected Thread ttlThread;
140
141    protected String getSQL;
142
143    protected String getMultiSQL;
144
145    protected String getLongSQL;
146
147    protected String deleteAllSQL;
148
149    protected String deleteSQL;
150
151    protected String deleteIfLongSQL;
152
153    protected String deleteIfStringSQL;
154
155    protected String deleteIfBytesSQL;
156
157    protected String expireSQL;
158
159    protected String keyStreamSQL;
160
161    protected String keyStreamPrefixSQL;
162
163    protected String setTTLSQL;
164
165    protected String existsSQL;
166
167    protected String insertSQL;
168
169    protected String insertLongSQL;
170
171    protected String updateLongSQL;
172
173    protected String updateReturningPostgreSQLSql;
174
175    protected String updateReturningOracleSql;
176
177    protected String updateReturningSQLServerSql;
178
179    @Override
180    public void initialize(KeyValueStoreDescriptor descriptor) {
181        super.initialize(descriptor);
182        logger = new JDBCLogger(name);
183        Map<String, String> properties = descriptor.properties;
184        dataSourceName = properties.get(DATASOURCE_PROP);
185        if (StringUtils.isAllBlank(dataSourceName)) {
186            throw new NuxeoException("Missing " + DATASOURCE_PROP + " property in configuration");
187        }
188        String tableProp = properties.get(TABLE_PROP);
189        String namespace = descriptor.namespace;
190        String tbl;
191        if (isBlank(tableProp)) {
192            tbl = defaultIfBlank(namespace, name).trim();
193        } else if (isBlank(namespace)) {
194            tbl = tableProp.trim();
195        } else {
196            tbl = tableProp.trim() + "_" + namespace.trim();
197        }
198        // check connection, get dialect and create table if needed
199        runWithConnection(connection -> {
200            dialect = Dialect.createDialect(connection, null);
201            getTable(connection, tbl);
202        });
203        prepareSQL();
204        startTTLThread();
205    }
206
207    @Override
208    public void close() {
209        stopTTLThread();
210    }
211
212    protected void getTable(Connection connection, String tbl) throws SQLException {
213        String tablePhysicalName = dialect.getTableName(tbl);
214        table = new TableImpl(dialect, tablePhysicalName, tablePhysicalName);
215        keyCol = addColumn(KEY_COL, ColumnType.SYSNAME);
216        keyCol.setPrimary(true);
217        keyCol.setNullable(false);
218        longCol = addColumn(LONG_COL, ColumnType.LONG);
219        stringCol = addColumn(STRING_COL, ColumnType.CLOB);
220        bytesCol = addColumn(BYTES_COL, ColumnType.BLOB);
221        ttlCol = addColumn(TTL_COL, ColumnType.LONG);
222        table.addIndex(TTL_COL);
223        tableName = table.getQuotedName();
224        keyColName = keyCol.getQuotedName();
225        longColName = longCol.getQuotedName();
226        stringColName = stringCol.getQuotedName();
227        bytesColName = bytesCol.getQuotedName();
228        ttlColName = ttlCol.getQuotedName();
229        if (!tableExists(connection)) {
230            createTable(connection);
231        }
232        checkColumns(connection);
233    }
234
235    protected Column addColumn(String columnName, ColumnType type) {
236        String colPhysicalName = dialect.getColumnName(columnName);
237        Column column = new Column(table, colPhysicalName, type, columnName);
238        return table.addColumn(column.getKey(), column);
239    }
240
241    protected void prepareSQL() {
242        getSQL = "SELECT " + longColName + ", " + stringColName + ", " + bytesColName + " FROM " + tableName + " WHERE "
243                + keyColName + " = ?";
244        getMultiSQL = "SELECT " + keyColName + ", " + longColName + ", " + stringColName + ", " + bytesColName
245                + " FROM " + tableName + " WHERE " + keyColName + " IN (%s)";
246        getLongSQL = "SELECT " + longColName + " FROM " + tableName + " WHERE " + keyColName + " = ?";
247        deleteAllSQL = "DELETE FROM " + tableName;
248        deleteSQL = "DELETE FROM " + tableName + " WHERE " + keyColName + " = ?";
249        deleteIfLongSQL = deleteSQL + " AND " + longColName + " = ?";
250        deleteIfStringSQL = deleteSQL + " AND " + dialect.getQuotedNameForExpression(stringCol) + " = ?";
251        deleteIfBytesSQL = deleteSQL + " AND " + bytesColName + " = ?";
252        expireSQL = "DELETE FROM " + tableName + " WHERE " + ttlColName + " < ?";
253        keyStreamSQL = "SELECT " + keyColName + " FROM " + tableName;
254        keyStreamPrefixSQL = keyStreamSQL + " WHERE " + keyColName + " LIKE ?";
255        String esc = dialect.getLikeEscaping();
256        if (esc != null) {
257            keyStreamPrefixSQL += esc;
258        }
259        setTTLSQL = "UPDATE " + tableName + " SET " + ttlColName + " = ? WHERE " + keyColName + " = ?";
260        existsSQL = "SELECT 1 FROM " + tableName + " WHERE " + keyColName + " = ?";
261        insertSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ", " + stringColName + ", "
262                + bytesColName + ", " + ttlColName + ") VALUES (?, ?, ?, ?, ?)";
263        insertLongSQL = "INSERT INTO " + tableName + "(" + keyColName + ", " + longColName + ") VALUES (?, ?)";
264        updateLongSQL = "UPDATE " + tableName + " SET " + longColName + " = ? WHERE " + keyColName + " = ? AND "
265                + longColName + " = ?";
266        updateReturningPostgreSQLSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName
267                + " + ? WHERE " + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName
268                + " IS NULL RETURNING " + longColName;
269        updateReturningOracleSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName + " + ? WHERE "
270                + keyColName + " = ? AND " + stringColName + " IS NULL AND " + bytesColName + " IS NULL RETURNING "
271                + longColName + " INTO ?";
272        updateReturningSQLServerSql = "UPDATE " + tableName + " SET " + longColName + " = " + longColName
273                + " + ? OUTPUT INSERTED." + longColName + " WHERE " + keyColName + " = ? AND " + stringColName
274                + " IS NULL AND " + bytesColName + " IS NULL";
275    }
276
277    protected void startTTLThread() {
278        ttlThread = new Thread(this::expireTTLThread);
279        ttlThread.setName("Nuxeo-Expire-KeyValueStore-" + name);
280        ttlThread.setDaemon(true);
281        ttlThread.start();
282    }
283
284    protected void stopTTLThread() {
285        if (ttlThread == null) {
286            return;
287        }
288        ttlThread.interrupt();
289        ttlThread = null;
290    }
291
292    /**
293     * Runs in a thread to do TTL expiration.
294     */
295    protected void expireTTLThread() {
296        log.debug("Starting TTL expiration thread for KeyValueStore: {}", name);
297        try {
298            // for the initial wait, use a random duration to avoid thundering herd problems
299            Thread.sleep((long) (TTL_EXPIRATION_FREQUENCY_MS * Math.random()));
300            for (;;) {
301                if (Thread.currentThread().isInterrupted()) {
302                    break;
303                }
304                Thread.sleep(TTL_EXPIRATION_FREQUENCY_MS);
305                expireTTLOnce();
306            }
307        } catch (InterruptedException e) {
308            Thread.currentThread().interrupt();
309        }
310        log.debug("Stopping TTL expiration thread for KeyValueStore: {}", name);
311    }
312
313    protected String escapeLike(String prefix) {
314        return prefix.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_");
315    }
316
317    /**
318     * Canonicalizes value for the database: use a String or a Long if possible.
319     */
320    protected Object toStorage(Object value) {
321        // try to convert byte array to UTF-8 string
322        if (value instanceof byte[]) {
323            try {
324                value = bytesToString((byte[]) value);
325            } catch (CharacterCodingException e) {
326                // ignore
327            }
328        }
329        // try to convert String to Long
330        if (value instanceof String) {
331            try {
332                value = Long.valueOf((String) value);
333            } catch (NumberFormatException e) {
334                // ignore
335            }
336        }
337        return value;
338    }
339
340    protected byte[] toBytes(Object value) {
341        if (value instanceof String) {
342            return ((String) value).getBytes(UTF_8);
343        } else if (value instanceof Long) {
344            return ((Long) value).toString().getBytes(UTF_8);
345        } else if (value instanceof byte[]) {
346            return (byte[]) value;
347        }
348        return null;
349    }
350
351    protected String toString(Object value) {
352        if (value instanceof String) {
353            return (String) value;
354        } else if (value instanceof Long) {
355            return ((Long) value).toString();
356        } else if (value instanceof byte[]) {
357            byte[] bytes = (byte[]) value;
358            try {
359                return bytesToString(bytes);
360            } catch (CharacterCodingException e) {
361                return null;
362            }
363        }
364        return null;
365    }
366
367    protected Long toLong(Object value) throws NumberFormatException { // NOSONAR
368        if (value instanceof Long) {
369            return (Long) value;
370        } else if (value instanceof String) {
371            return Long.valueOf((String) value);
372        } else if (value instanceof byte[]) {
373            byte[] bytes = (byte[]) value;
374            return bytesToLong(bytes);
375        }
376        return null;
377    }
378
379    /** A {@link java.util.function.Consumer Consumer} that can throw {@link SQLException}. */
380    @FunctionalInterface
381    protected interface SQLConsumer<T> {
382
383        /**
384         * Performs this operation on the given argument.
385         *
386         * @param t the input argument
387         * @throws SQLException
388         */
389        void accept(T t) throws SQLException;
390    }
391
392    /** A {@link java.util.function.Function Function} that can throw {@link SQLException}. */
393    @FunctionalInterface
394    protected interface SQLFunction<T, R> {
395
396        /**
397         * Applies this function to the given argument.
398         *
399         * @param t the function argument
400         * @return the function result
401         * @throws SQLException
402         */
403        R apply(T t) throws SQLException;
404    }
405
406    protected void runWithConnection(SQLConsumer<Connection> consumer) {
407        TransactionHelper.runWithoutTransaction(() -> {
408            try (Connection connection = getConnection()) {
409                consumer.accept(connection);
410            } catch (SQLException e) {
411                throw new NuxeoException(e);
412            }
413        });
414    }
415
416    protected <R> R runWithConnection(SQLFunction<Connection, R> function) {
417        return TransactionHelper.runWithoutTransaction(() -> {
418            try (Connection connection = getConnection()) {
419                return function.apply(connection);
420            } catch (SQLException e) {
421                throw new NuxeoException(e);
422            }
423        });
424    }
425
426    protected Connection getConnection() throws SQLException {
427        return ConnectionHelper.getConnection(dataSourceName);
428    }
429
430    protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column, Serializable value)
431            throws SQLException {
432        setToPreparedStatement(sql, ps, Arrays.asList(column), Arrays.asList(value));
433    }
434
435    protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1,
436            Column column2, Serializable value2) throws SQLException {
437        setToPreparedStatement(sql, ps, Arrays.asList(column1, column2), Arrays.asList(value1, value2));
438    }
439
440    protected void setToPreparedStatement(String sql, PreparedStatement ps, Column column1, Serializable value1,
441            Column column2, Serializable value2, Column column3, Serializable value3) throws SQLException {
442        setToPreparedStatement(sql, ps, Arrays.asList(column1, column2, column3),
443                Arrays.asList(value1, value2, value3));
444    }
445
446    protected void setToPreparedStatement(String sql, PreparedStatement ps, List<Column> columns,
447            List<? extends Serializable> values) throws SQLException {
448        if (columns.size() != values.size()) {
449            throw new IllegalStateException();
450        }
451        for (int i = 0; i < columns.size(); i++) {
452            columns.get(i).setToPreparedStatement(ps, i + 1, values.get(i));
453        }
454        if (logger.isLogEnabled()) {
455            logger.logSQL(sql, values);
456        }
457    }
458
459    protected boolean tableExists(Connection connection) throws SQLException {
460        DatabaseMetaData metadata = connection.getMetaData();
461        String schemaName = getDatabaseSchemaName(connection);
462        try (ResultSet rs = metadata.getTables(null, schemaName, table.getPhysicalName(), new String[] { "TABLE" })) {
463            boolean exists = rs.next();
464            log.trace("Checking if table {} exists: {}", table.getPhysicalName(), exists);
465            return exists;
466        }
467    }
468
469    protected void createTable(Connection connection) throws SQLException {
470        try (Statement st = connection.createStatement()) {
471            String createSQL = table.getCreateSql();
472            logger.log(createSQL);
473            st.execute(createSQL);
474            for (String sql : table.getPostCreateSqls(null)) {
475                logger.log(sql);
476                st.execute(sql);
477            }
478        }
479    }
480
481    /**
482     * Checks that columns have expected JDBC types.
483     */
484    protected void checkColumns(Connection connection) throws SQLException {
485        DatabaseMetaData metadata = connection.getMetaData();
486        String schemaName = getDatabaseSchemaName(connection);
487        try (ResultSet rs = metadata.getColumns(null, schemaName, table.getPhysicalName(), "%")) {
488            while (rs.next()) {
489                String schema = rs.getString("TABLE_SCHEM");
490                if (schema != null) { // null for MySQL, doh!
491                    if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
492                        // H2 returns some system tables (locks)
493                        continue;
494                    }
495                }
496                String columnName = rs.getString("COLUMN_NAME").toUpperCase();
497                int actual = rs.getInt("DATA_TYPE");
498                String actualName = rs.getString("TYPE_NAME");
499                int actualSize = rs.getInt("COLUMN_SIZE");
500                Column column = null;
501                for (Column c : table.getColumns()) {
502                    String upperName = c.getPhysicalName().toUpperCase();
503                    if (upperName.equals(columnName)) {
504                        column = c;
505                    }
506                }
507                if (column == null) {
508                    log.error("Column not found: {} in table: {}", columnName, tableName);
509                    continue;
510                }
511                String message = column.checkJdbcType(actual, actualName, actualSize);
512                if (message != null) {
513                    log.error(message);
514                    Framework.getRuntime().getMessageHandler().addError(message);
515                }
516            }
517        }
518    }
519
520    protected String getDatabaseSchemaName(Connection connection) throws SQLException {
521        String schemaName = null;
522        if (dialect instanceof DialectOracle) {
523            try (Statement st = connection.createStatement()) {
524                String sql = "SELECT SYS_CONTEXT('USERENV', 'SESSION_USER') FROM DUAL";
525                logger.log(sql);
526                try (ResultSet rs = st.executeQuery(sql)) {
527                    if (rs.next()) {
528                        schemaName = rs.getString(1);
529                        logger.log("  -> schema: " + schemaName);
530                    }
531                }
532            }
533        }
534        return schemaName;
535    }
536
537    protected void expireTTLOnce() {
538        runWithConnection(connection -> {
539            try {
540                try (PreparedStatement ps = connection.prepareStatement(expireSQL)) {
541                    Long ttlDeadline = getTTLValue(0);
542                    setToPreparedStatement(expireSQL, ps, ttlCol, ttlDeadline);
543                    int count = ps.executeUpdate();
544                    logger.logCount(count);
545                }
546            } catch (SQLException e) {
547                if (dialect.isConcurrentUpdateException(e)) {
548                    // ignore
549                    return;
550                }
551                log.debug("Exception during TTL expiration", e);
552            }
553        });
554    }
555
556    @Override
557    public void clear() {
558        runWithConnection(connection -> {
559            try (Statement st = connection.createStatement()) {
560                logger.log(deleteAllSQL);
561                st.execute(deleteAllSQL);
562            }
563        });
564    }
565
566    @Override
567    public Stream<String> keyStream() {
568        return runWithConnection((Connection connection) -> keyStream(connection, null));
569    }
570
571    @Override
572    public Stream<String> keyStream(String prefix) {
573        return runWithConnection((Connection connection) -> keyStream(connection, prefix));
574    }
575
576    protected Stream<String> keyStream(Connection connection, String prefix) throws SQLException {
577        String sql = prefix == null ? keyStreamSQL : keyStreamPrefixSQL;
578        List<String> keys = new ArrayList<>();
579        try (PreparedStatement ps = connection.prepareStatement(sql)) {
580            if (prefix != null) {
581                String like = escapeLike(prefix) + "%";
582                setToPreparedStatement(sql, ps, keyCol, like);
583            }
584            try (ResultSet rs = ps.executeQuery()) {
585                while (rs.next()) {
586                    String key = (String) keyCol.getFromResultSet(rs, 1);
587                    keys.add(key);
588                }
589            }
590        }
591        return keys.stream();
592    }
593
594    @Override
595    public byte[] get(String key) {
596        Object value = getObject(key);
597        if (value == null) {
598            return null;
599        }
600        byte[] bytes = toBytes(value);
601        if (bytes != null) {
602            return bytes;
603        }
604        throw new UnsupportedOperationException(value.getClass().getName());
605    }
606
607    @Override
608    public String getString(String key) {
609        Object value = getObject(key);
610        if (value == null) {
611            return null;
612        }
613        String string = toString(value);
614        if (string != null) {
615            return string;
616        }
617        throw new IllegalArgumentException("Value is not a String for key: " + key);
618    }
619
620    @Override
621    public Long getLong(String key) throws NumberFormatException { // NOSONAR
622        Object value = getObject(key);
623        if (value == null) {
624            return null;
625        }
626        Long longValue = toLong(value);
627        if (longValue != null) {
628            return longValue;
629        }
630        throw new NumberFormatException("Value is not a Long for key: " + key);
631    }
632
633    @Override
634    public Map<String, byte[]> get(Collection<String> keys) {
635        Map<String, byte[]> map = new HashMap<>(keys.size());
636        getObjects(keys, (key, value) -> {
637            byte[] bytes = toBytes(value);
638            if (bytes == null) {
639                throw new UnsupportedOperationException(String.format("Value of class %s is not supported for key: %s",
640                        value.getClass().getName(), key));
641            }
642            map.put(key, bytes);
643        });
644        return map;
645    }
646
647    @Override
648    public Map<String, String> getStrings(Collection<String> keys) {
649        Map<String, String> map = new HashMap<>(keys.size());
650        getObjects(keys, (key, value) -> {
651            String string = toString(value);
652            if (string == null) {
653                throw new IllegalArgumentException("Value is not a String for key: " + key);
654            }
655            map.put(key, string);
656        });
657        return map;
658    }
659
660    @Override
661    public Map<String, Long> getLongs(Collection<String> keys) throws NumberFormatException { // NOSONAR
662        Map<String, Long> map = new HashMap<>(keys.size());
663        getObjects(keys, (key, value) -> {
664            Long longValue = toLong(value);
665            if (longValue == null) {
666                throw new IllegalArgumentException("Value is not a Long for key: " + key);
667            }
668            map.put(key, longValue);
669        });
670        return map;
671    }
672
673    protected Object getObject(String key) {
674        return runWithConnection(connection -> {
675            try (PreparedStatement ps = connection.prepareStatement(getSQL)) {
676                setToPreparedStatement(getSQL, ps, keyCol, key);
677                try (ResultSet rs = ps.executeQuery()) {
678                    if (!rs.next()) {
679                        if (logger.isLogEnabled()) {
680                            logger.log("  -> null");
681                        }
682                        return null;
683                    }
684                    Long longValue = (Long) longCol.getFromResultSet(rs, 1);
685                    String string = (String) stringCol.getFromResultSet(rs, 2);
686                    byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 3);
687                    if (logger.isLogEnabled()) {
688                        logger.logResultSet(rs, Arrays.asList(longCol, stringCol, bytesCol));
689                    }
690                    if (string != null) {
691                        return string;
692                    } else if (longValue != null) {
693                        return longValue;
694                    } else {
695                        return bytes;
696                    }
697                }
698            }
699        });
700    }
701
702    protected void getObjects(Collection<String> keys, BiConsumer<String, Object> consumer) {
703        if (keys.isEmpty()) {
704            return;
705        }
706        runWithConnection((Connection connection) -> {
707            String sql = String.format(getMultiSQL, nParams(keys.size()));
708            logger.logSQL(sql, keys);
709            try (PreparedStatement ps = connection.prepareStatement(sql)) {
710                int i = 1;
711                for (String key : keys) {
712                    keyCol.setToPreparedStatement(ps, i++, key);
713                }
714                try (ResultSet rs = ps.executeQuery()) {
715                    while (rs.next()) {
716                        String key = (String) keyCol.getFromResultSet(rs, 1);
717                        Long longVal = (Long) longCol.getFromResultSet(rs, 2);
718                        String string = (String) stringCol.getFromResultSet(rs, 3);
719                        byte[] bytes = (byte[]) bytesCol.getFromResultSet(rs, 4);
720                        if (logger.isLogEnabled()) {
721                            logger.logResultSet(rs, Arrays.asList(keyCol, longCol, stringCol, bytesCol));
722                        }
723                        Object value;
724                        if (string != null) {
725                            value = string;
726                        } else if (longVal != null) {
727                            value = longVal;
728                        } else {
729                            value = bytes;
730                        }
731                        if (value != null) {
732                            consumer.accept(key, value);
733                        }
734                    }
735                }
736            }
737        });
738    }
739
740    protected String nParams(int n) {
741        StringBuilder sb = new StringBuilder();
742        for (int i = 0; i < n; i++) {
743            if (i != 0) {
744                sb.append(", ");
745            }
746            sb.append('?');
747        }
748        return sb.toString();
749    }
750
751    protected Long ttlToStorage(long ttl) {
752        return ttl == 0 ? null : getTTLValue(ttl);
753    }
754
755    protected Long getTTLValue(long ttl) {
756        return Long.valueOf(System.currentTimeMillis() / 1000 + ttl);
757    }
758
759    @Override
760    public void put(String key, byte[] bytes) {
761        put(key, toStorage(bytes), 0);
762    }
763
764    @Override
765    public void put(String key, byte[] bytes, long ttl) {
766        put(key, toStorage(bytes), ttl);
767    }
768
769    @Override
770    public void put(String key, String string) {
771        put(key, toStorage(string), 0);
772    }
773
774    @Override
775    public void put(String key, String string, long ttl) {
776        put(key, toStorage(string), ttl);
777    }
778
779    @Override
780    public void put(String key, Long value) {
781        put(key, (Object) value, 0);
782    }
783
784    @Override
785    public void put(String key, Long value, long ttl) {
786        put(key, (Object) value, ttl);
787    }
788
789    protected void put(String key, Object value, long ttl) {
790        runWithConnection((Connection connection) -> {
791            if (value == null) {
792                // delete
793                try (PreparedStatement ps = connection.prepareStatement(deleteSQL)) {
794                    setToPreparedStatement(deleteSQL, ps, keyCol, key);
795                    ps.execute();
796                }
797            } else {
798                // upsert (update or insert)
799                Long longValue = value instanceof Long ? (Long) value : null;
800                String stringValue = value instanceof String ? (String) value : null;
801                byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null;
802                Long ttlValue = ttlToStorage(ttl);
803                List<Column> psColumns = new ArrayList<>();
804                List<Serializable> psValues = new ArrayList<>();
805                String sql = dialect.getUpsertSql(Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol),
806                        Arrays.asList(key, longValue, stringValue, bytesValue, ttlValue), psColumns, psValues);
807                for (int retry = 0; retry < MAX_RETRY; retry++) {
808                    try {
809                        try (PreparedStatement ps = connection.prepareStatement(sql)) {
810                            setToPreparedStatement(sql, ps, psColumns, psValues);
811                            ps.execute();
812                        }
813                        return;
814                    } catch (SQLException e) {
815                        if (!dialect.isConcurrentUpdateException(e)) {
816                            throw e;
817                        }
818                        // Oracle MERGE can throw DUP_VAL_ON_INDEX (ORA-0001) or NO_DATA_FOUND (ORA-01403)
819                        // in that case retry a few times
820                    }
821                    sleepBeforeRetry();
822                }
823                throw new ConcurrentUpdateException("Failed to do atomic put for key: " + key);
824            }
825        });
826    }
827
828    @Override
829    public boolean setTTL(String key, long ttl) {
830        return runWithConnection((Connection connection) -> {
831            try (PreparedStatement ps = connection.prepareStatement(setTTLSQL)) {
832                setToPreparedStatement(setTTLSQL, ps, ttlCol, ttlToStorage(ttl), keyCol, key);
833                int count = ps.executeUpdate();
834                boolean set = count == 1;
835                return set;
836            }
837        }).booleanValue();
838    }
839
840    @Override
841    public boolean compareAndSet(String key, byte[] expected, byte[] value, long ttl) {
842        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
843    }
844
845    @Override
846    public boolean compareAndSet(String key, String expected, String value, long ttl) {
847        return compareAndSet(key, toStorage(expected), toStorage(value), ttl);
848    }
849
850    protected boolean compareAndSet(String key, Object expected, Object value, long ttl) {
851        return runWithConnection((Connection connection) -> {
852            if (expected == null && value == null) {
853                // check that document doesn't exist
854                try (PreparedStatement ps = connection.prepareStatement(existsSQL)) {
855                    setToPreparedStatement(existsSQL, ps, keyCol, key);
856                    try (ResultSet rs = ps.executeQuery()) {
857                        boolean set = !rs.next();
858                        if (logger.isLogEnabled()) {
859                            logger.log("  -> " + (set ? "NOP" : "FAILED"));
860                        }
861                        return set;
862                    }
863                }
864            } else if (expected == null) {
865                // set value if no document already exists: regular insert
866                try (PreparedStatement ps = connection.prepareStatement(insertSQL)) {
867                    Long longValue = value instanceof Long ? (Long) value : null;
868                    String stringValue = value instanceof String ? (String) value : null;
869                    byte[] bytesValue = value instanceof byte[] ? (byte[]) value : null;
870                    setToPreparedStatement(insertSQL, ps, Arrays.asList(keyCol, longCol, stringCol, bytesCol, ttlCol),
871                            Arrays.asList(key, longValue, stringValue, bytesValue, ttlToStorage(ttl)));
872                    boolean set;
873                    try {
874                        ps.executeUpdate();
875                        set = true;
876                    } catch (SQLException e) {
877                        if (!dialect.isConcurrentUpdateException(e)) {
878                            throw e;
879                        }
880                        set = false;
881                    }
882                    if (logger.isLogEnabled()) {
883                        logger.log("  -> " + (set ? "SET" : "FAILED"));
884                    }
885                    return set;
886                }
887            } else if (value == null) {
888                // delete if previous value exists
889                String sql;
890                Column col;
891                if (expected instanceof Long) {
892                    sql = deleteIfLongSQL;
893                    col = longCol;
894                } else if (expected instanceof String) {
895                    sql = deleteIfStringSQL;
896                    col = stringCol;
897                } else {
898                    sql = deleteIfBytesSQL;
899                    col = bytesCol;
900                }
901                try (PreparedStatement ps = connection.prepareStatement(sql)) {
902                    setToPreparedStatement(sql, ps, keyCol, key, col, (Serializable) expected);
903                    int count = ps.executeUpdate();
904                    boolean set = count == 1;
905                    if (logger.isLogEnabled()) {
906                        logger.log("  -> " + (set ? "DEL" : "FAILED"));
907                    }
908                    return set;
909                }
910            } else {
911                // replace if previous value exists
912                Column expectedCol = expected instanceof Long ? longCol
913                        : expected instanceof String ? stringCol : bytesCol;
914                Column valueCol = value instanceof Long ? longCol : value instanceof String ? stringCol : bytesCol;
915                if (expectedCol != valueCol) {
916                    throw new NuxeoException("TODO expected and value have different types");
917                    // TODO in that case we must set to null the old value column
918                }
919                String sql = "UPDATE " + tableName + " SET " + valueCol.getQuotedName() + " = ?, " + ttlColName
920                        + " = ? WHERE " + keyColName + " = ? AND " + dialect.getQuotedNameForExpression(expectedCol)
921                        + " = ?";
922                try (PreparedStatement ps = connection.prepareStatement(sql)) {
923                    setToPreparedStatement(sql, ps, Arrays.asList(valueCol, ttlCol, keyCol, expectedCol),
924                            Arrays.asList((Serializable) value, ttlToStorage(ttl), key, (Serializable) expected));
925                    int count = ps.executeUpdate();
926                    boolean set = count == 1;
927                    if (logger.isLogEnabled()) {
928                        logger.log("  -> " + (set ? "SET" : "FAILED"));
929                    }
930                    return set;
931                }
932            }
933        }).booleanValue();
934    }
935
936    @Override
937    public long addAndGet(String key, long delta) throws NumberFormatException { // NOSONAR
938        return runWithConnection((Connection connection) -> {
939            for (int retry = 0; retry < MAX_RETRY; retry++) {
940                String updateReturningSql;
941                boolean useReturnResultSet = false;
942                if (dialect instanceof DialectPostgreSQL) {
943                    updateReturningSql = updateReturningPostgreSQLSql;
944                } else if (dialect instanceof DialectOracle) {
945                    updateReturningSql = updateReturningOracleSql;
946                    useReturnResultSet = true;
947                } else if (dialect instanceof DialectSQLServer) {
948                    updateReturningSql = updateReturningSQLServerSql;
949                } else {
950                    updateReturningSql = null;
951                }
952                if (updateReturningSql != null) {
953                    List<Column> psColumns = Arrays.asList(longCol, keyCol);
954                    List<Serializable> psValues = Arrays.asList(Long.valueOf(delta), key);
955                    try (PreparedStatement ps = connection.prepareStatement(updateReturningSql)) {
956                        setToPreparedStatement(updateReturningSql, ps, psColumns, psValues);
957                        if (useReturnResultSet) {
958                            dialect.registerReturnParameter(ps, 3, longCol.getJdbcType());
959                        }
960                        boolean hasResultSet;
961                        if (useReturnResultSet) {
962                            int count = ps.executeUpdate();
963                            hasResultSet = count > 0;
964                        } else {
965                            hasResultSet = true;
966                        }
967                        if (hasResultSet) {
968                            ResultSet rs;
969                            if (useReturnResultSet) {
970                                rs = dialect.getReturnResultSet(ps);
971                            } else {
972                                rs = ps.executeQuery();
973                            }
974                            try {
975                                if (rs.next()) {
976                                    Long longValue = (Long) longCol.getFromResultSet(rs, 1);
977                                    // We may get NULL here, because if the value is an empty string
978                                    // a normal database would not match any row, but Oracle treats
979                                    // "" as NULL and we end up trying to increment the long field
980                                    // which is also NULL.
981                                    if (longValue == null) {
982                                        throw new NumberFormatException("Value is not a Long for key: " + key);
983                                    }
984                                    return longValue;
985                                }
986                            } finally {
987                                rs.close();
988                            }
989                        }
990                    }
991                }
992                // the dialect doesn't support UPDATE RETURNING, or
993                // there was no row for this key, or
994                // the row didn't contain a long
995                // -> retry using a full transaction doing check + insert
996                // start transaction
997                connection.setAutoCommit(false);
998                try {
999                    // check value
1000                    Long currentLong;
1001                    try (PreparedStatement ps = connection.prepareStatement(getLongSQL)) {
1002                        setToPreparedStatement(getLongSQL, ps, keyCol, key);
1003                        try (ResultSet rs = ps.executeQuery()) {
1004                            if (rs.next()) {
1005                                currentLong = (Long) longCol.getFromResultSet(rs, 1);
1006                                if (logger.isLogEnabled()) {
1007                                    logger.logResultSet(rs, Arrays.asList(longCol));
1008                                }
1009                                if (currentLong == null) {
1010                                    throw new NumberFormatException("Value is not a Long for key: " + key);
1011                                }
1012                            } else {
1013                                currentLong = null;
1014                            }
1015                        }
1016                    }
1017                    if (currentLong == null) {
1018                        // try insert
1019                        try (PreparedStatement ps = connection.prepareStatement(insertLongSQL)) {
1020                            setToPreparedStatement(insertLongSQL, ps, keyCol, key, longCol, Long.valueOf(delta));
1021                            try {
1022                                ps.executeUpdate();
1023                                return delta;
1024                            } catch (SQLException e) {
1025                                if (!dialect.isConcurrentUpdateException(e)) {
1026                                    throw e;
1027                                }
1028                                // if concurrent update, retry
1029                            }
1030                        }
1031                    } else {
1032                        // update existing value
1033                        Long newLong = Long.valueOf(currentLong.longValue() + delta);
1034                        try (PreparedStatement ps = connection.prepareStatement(updateLongSQL)) {
1035                            setToPreparedStatement(updateLongSQL, ps, longCol, newLong, keyCol, key, longCol,
1036                                    currentLong);
1037                            int count = ps.executeUpdate();
1038                            if (count == 1) {
1039                                return newLong;
1040                            }
1041                            // else the value changed...
1042                            // concurrent update, retry
1043                        }
1044                    }
1045                } finally {
1046                    connection.commit();
1047                    connection.setAutoCommit(true);
1048                }
1049                // concurrent update on insert or update, retry a few times
1050                sleepBeforeRetry();
1051            }
1052            throw new ConcurrentUpdateException("Failed to do atomic addAndGet for key: " + key);
1053        }).longValue();
1054    }
1055
1056    protected void sleepBeforeRetry() {
1057        try {
1058            Thread.sleep(5);
1059        } catch (InterruptedException e) {
1060            Thread.currentThread().interrupt();
1061            throw new NuxeoException(e);
1062        }
1063    }
1064
1065    // works on any representation that can be converted to a Long
1066    protected long addAndGetGeneric(String key, long delta) throws NumberFormatException { // NOSONAR
1067        for (;;) {
1068            Object value = getObject(key);
1069            long result;
1070            if (value == null) {
1071                result = delta;
1072            } else {
1073                Long base = toLong(value);
1074                if (base == null) {
1075                    throw new NumberFormatException("Value is not a Long for key: " + key);
1076                }
1077                result = base.longValue() + delta;
1078            }
1079            Object newValue = Long.valueOf(result);
1080            if (compareAndSet(key, value, newValue, 0)) {
1081                return result;
1082            }
1083            // else loop to try again
1084        }
1085    }
1086
1087    @Override
1088    public String toString() {
1089        return getClass().getSimpleName() + "(" + name + ")";
1090    }
1091
1092}