001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023
024import java.io.File;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.PrintStream;
029import java.io.Serializable;
030import java.security.MessageDigest;
031import java.security.NoSuchAlgorithmException;
032import java.sql.Array;
033import java.sql.DatabaseMetaData;
034import java.sql.PreparedStatement;
035import java.sql.ResultSet;
036import java.sql.SQLDataException;
037import java.sql.SQLException;
038import java.sql.Statement;
039import java.sql.Types;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Calendar;
043import java.util.Collection;
044import java.util.Collections;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.Map.Entry;
051import java.util.Set;
052import java.util.UUID;
053import java.util.concurrent.ConcurrentHashMap;
054
055import javax.transaction.xa.XAException;
056import javax.transaction.xa.XAResource;
057import javax.transaction.xa.Xid;
058
059import org.apache.commons.logging.Log;
060import org.apache.commons.logging.LogFactory;
061import org.nuxeo.common.Environment;
062import org.nuxeo.ecm.core.api.IterableQueryResult;
063import org.nuxeo.ecm.core.api.Lock;
064import org.nuxeo.ecm.core.api.NuxeoException;
065import org.nuxeo.ecm.core.api.PartialList;
066import org.nuxeo.ecm.core.api.ScrollResult;
067import org.nuxeo.ecm.core.api.ScrollResultImpl;
068import org.nuxeo.ecm.core.blob.DocumentBlobManager;
069import org.nuxeo.ecm.core.model.LockManager;
070import org.nuxeo.ecm.core.query.QueryFilter;
071import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
072import org.nuxeo.ecm.core.storage.sql.ColumnType;
073import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
074import org.nuxeo.ecm.core.storage.sql.Invalidations;
075import org.nuxeo.ecm.core.storage.sql.Mapper;
076import org.nuxeo.ecm.core.storage.sql.Model;
077import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
078import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
079import org.nuxeo.ecm.core.storage.sql.Row;
080import org.nuxeo.ecm.core.storage.sql.RowId;
081import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
082import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
089import org.nuxeo.runtime.api.Framework;
090
091/**
092 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
093 * computes statements.
094 * <p>
095 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
096 * statements recorded in the {@link SQLInfo}.
097 */
098public class JDBCMapper extends JDBCRowMapper implements Mapper {
099
100    private static final Log log = LogFactory.getLog(JDBCMapper.class);
101
102    public static Map<String, Serializable> testProps = new HashMap<>();
103
104    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
105
106    public static final String TEST_UPGRADE = "testUpgrade";
107
108    // property in sql.txt file
109    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
110
111    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
112
113    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
114
115    public static final String TEST_UPGRADE_SYS_CHANGE_TOKEN = "testUpgradeSysChangeToken";
116
117    protected TableUpgrader tableUpgrader;
118
119    private final QueryMakerService queryMakerService;
120
121    private final PathResolver pathResolver;
122
123    private final RepositoryImpl repository;
124
125    protected boolean clusteringEnabled;
126
127    protected static final String NOSCROLL_ID = "noscroll";
128
129    /**
130     * Creates a new Mapper.
131     *
132     * @param model the model
133     * @param pathResolver the path resolver (used for startswith queries)
134     * @param sqlInfo the sql info
135     * @param clusterInvalidator the cluster invalidator
136     * @param repository the repository
137     */
138    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator,
139            RepositoryImpl repository) {
140        super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator());
141        this.pathResolver = pathResolver;
142        this.repository = repository;
143        clusteringEnabled = clusterInvalidator != null;
144        queryMakerService = Framework.getService(QueryMakerService.class);
145
146        tableUpgrader = new TableUpgrader(this);
147        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
148                TEST_UPGRADE_VERSIONS);
149        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
150        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
151        tableUpgrader.add(Model.HIER_TABLE_NAME, Model.MAIN_SYS_CHANGE_TOKEN_KEY, "upgradeSysChangeToken",
152                TEST_UPGRADE_SYS_CHANGE_TOKEN);
153    }
154
155    @Override
156    public int getTableSize(String tableName) {
157        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
158    }
159
160    /*
161     * ----- Root -----
162     */
163
164    @Override
165    public void createDatabase(String ddlMode) {
166        // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it
167        try {
168            if (!connection.getAutoCommit()) {
169                throw new NuxeoException("connection should not run in transactional mode for DDL operations");
170            }
171            createTables(ddlMode);
172        } catch (SQLException e) {
173            throw new NuxeoException(e);
174        }
175    }
176
177    protected String getTableName(String origName) {
178
179        if (dialect instanceof DialectOracle) {
180            if (origName.length() > 30) {
181
182                StringBuilder sb = new StringBuilder(origName.length());
183
184                try {
185                    MessageDigest digest = MessageDigest.getInstance("MD5");
186                    sb.append(origName.substring(0, 15));
187                    sb.append('_');
188
189                    digest.update(origName.getBytes());
190                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
191
192                    return sb.toString();
193
194                } catch (NoSuchAlgorithmException e) {
195                    throw new RuntimeException("Error", e);
196                }
197            }
198        }
199
200        return origName;
201    }
202
203    protected void createTables(String ddlMode) throws SQLException {
204        ListCollector ddlCollector = new ListCollector();
205
206        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
207        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
208        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
209        if (testProps.containsKey(TEST_UPGRADE)) {
210            // create "old" tables
211            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
212        }
213
214        String schemaName = dialect.getConnectionSchema(connection);
215        DatabaseMetaData metadata = connection.getMetaData();
216        Set<String> tableNames = findTableNames(metadata, schemaName);
217        Database database = sqlInfo.getDatabase();
218        Map<String, List<Column>> added = new HashMap<>();
219
220        for (Table table : database.getTables()) {
221            String tableName = getTableName(table.getPhysicalName());
222            if (!tableNames.contains(tableName.toUpperCase())) {
223
224                /*
225                 * Create missing table.
226                 */
227
228                ddlCollector.add(table.getCreateSql());
229                ddlCollector.addAll(table.getPostCreateSqls(model));
230
231                added.put(table.getKey(), null); // null = table created
232
233                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
234
235            } else {
236
237                /*
238                 * Get existing columns.
239                 */
240
241                Map<String, Integer> columnTypes = new HashMap<>();
242                Map<String, String> columnTypeNames = new HashMap<>();
243                Map<String, Integer> columnTypeSizes = new HashMap<>();
244                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
245                    while (rs.next()) {
246                        String schema = rs.getString("TABLE_SCHEM");
247                        if (schema != null) { // null for MySQL, doh!
248                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
249                                // H2 returns some system tables (locks)
250                                continue;
251                            }
252                        }
253                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
254                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
255                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
256                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
257                    }
258                }
259
260                /*
261                 * Update types and create missing columns.
262                 */
263
264                List<Column> addedColumns = new LinkedList<>();
265                for (Column column : table.getColumns()) {
266                    String upperName = column.getPhysicalName().toUpperCase();
267                    Integer type = columnTypes.remove(upperName);
268                    if (type == null) {
269                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
270                        ddlCollector.add(table.getAddColumnSql(column));
271                        ddlCollector.addAll(table.getPostAddSqls(column, model));
272                        addedColumns.add(column);
273                    } else {
274                        int expected = column.getJdbcType();
275                        int actual = type.intValue();
276                        String actualName = columnTypeNames.get(upperName);
277                        Integer actualSize = columnTypeSizes.get(upperName);
278                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
279                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
280                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName,
281                                    actualSize));
282                        }
283                    }
284                }
285                for (String col : dialect.getIgnoredColumns(table)) {
286                    columnTypes.remove(col.toUpperCase());
287                }
288                if (!columnTypes.isEmpty()) {
289                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
290                            + String.join(", ", columnTypes.keySet()));
291                }
292                if (!addedColumns.isEmpty()) {
293                    if (added.containsKey(table.getKey())) {
294                        throw new AssertionError();
295                    }
296                    added.put(table.getKey(), addedColumns);
297                }
298            }
299        }
300
301        if (testProps.containsKey(TEST_UPGRADE)) {
302            // create "old" content in tables
303            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
304        }
305
306        // run upgrade for each table if added columns or test
307        for (Entry<String, List<Column>> en : added.entrySet()) {
308            List<Column> addedColumns = en.getValue();
309            String tableKey = en.getKey();
310            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
311        }
312
313        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
314        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
315
316        // aclr_permission check for PostgreSQL
317        dialect.performAdditionalStatements(connection);
318
319        /*
320         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
321         */
322
323        // ddlMode may be:
324        // ignore (not treated here, nothing done)
325        // dump (implies execute)
326        // dump,execute
327        // dump,ignore (no execute)
328        // execute
329        // abort (implies dump)
330        // compat can be used instead of execute to always recreate stored procedures
331
332        List<String> ddl = ddlCollector.getStrings();
333        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
334        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
335        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
336        if (dump || abort) {
337
338            /*
339             * Dump DDL if not empty.
340             */
341
342            if (!ddl.isEmpty()) {
343                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
344                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
345                    for (String sql : dialect.getDumpStart()) {
346                        ps.println(sql);
347                    }
348                    for (String sql : ddl) {
349                        sql = sql.trim();
350                        if (sql.endsWith(";")) {
351                            sql = sql.substring(0, sql.length() - 1);
352                        }
353                        ps.println(dialect.getSQLForDump(sql));
354                    }
355                    for (String sql : dialect.getDumpStop()) {
356                        ps.println(sql);
357                    }
358                } catch (IOException e) {
359                    throw new NuxeoException(e);
360                }
361
362                /*
363                 * Abort if requested.
364                 */
365
366                if (abort) {
367                    log.error("Dumped DDL to: " + dumpFile);
368                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
369                            + ", DDL must be executed: " + dumpFile);
370                }
371            }
372        }
373        if (!ignore) {
374
375            /*
376             * Execute DDL.
377             */
378
379            try (Statement st = connection.createStatement()) {
380                for (String sql : ddl) {
381                    logger.log(sql.replace("\n", "\n    ")); // indented
382                    try {
383                        st.execute(sql);
384                    } catch (SQLException e) {
385                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
386                    }
387                    countExecute();
388                }
389            }
390
391            /*
392             * Execute post-DDL stuff.
393             */
394
395            try (Statement st = connection.createStatement()) {
396                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
397                    logger.log(sql.replace("\n", "\n    ")); // indented
398                    try {
399                        st.execute(sql);
400                    } catch (SQLException e) {
401                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
402                    }
403                    countExecute();
404                }
405            }
406        }
407    }
408
409    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
410            throws SQLException {
411        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
412    }
413
414    /** Finds uppercase table names. */
415    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
416        Set<String> tableNames = new HashSet<>();
417        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
418        while (rs.next()) {
419            String tableName = rs.getString("TABLE_NAME");
420            tableNames.add(tableName.toUpperCase());
421        }
422        rs.close();
423        return tableNames;
424    }
425
426    @Override
427    public int getClusterNodeIdType() {
428        return sqlInfo.getClusterNodeIdType();
429    }
430
431    @Override
432    public void createClusterNode(Serializable nodeId) {
433        Calendar now = Calendar.getInstance();
434        String sql = sqlInfo.getCreateClusterNodeSql();
435        List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
436        try (PreparedStatement ps = connection.prepareStatement(sql)) {
437            if (logger.isLogEnabled()) {
438                logger.logSQL(sql, Arrays.asList(nodeId, now));
439            }
440            columns.get(0).setToPreparedStatement(ps, 1, nodeId);
441            columns.get(1).setToPreparedStatement(ps, 2, now);
442            ps.execute();
443
444        } catch (SQLException e) {
445            throw new NuxeoException(e);
446        }
447    }
448
449    @Override
450    public void removeClusterNode(Serializable nodeId) {
451        // delete from cluster_nodes
452        String sql = sqlInfo.getDeleteClusterNodeSql();
453        Column column = sqlInfo.getDeleteClusterNodeColumn();
454        try (PreparedStatement ps = connection.prepareStatement(sql)) {
455            if (logger.isLogEnabled()) {
456                logger.logSQL(sql, Collections.singletonList(nodeId));
457            }
458            column.setToPreparedStatement(ps, 1, nodeId);
459            ps.execute();
460            // delete un-processed invals from cluster_invals
461            deleteClusterInvals(nodeId);
462        } catch (SQLException e) {
463            throw new NuxeoException(e);
464        }
465    }
466
467    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
468        String sql = sqlInfo.getDeleteClusterInvalsSql();
469        Column column = sqlInfo.getDeleteClusterInvalsColumn();
470        try (PreparedStatement ps = connection.prepareStatement(sql)) {
471            if (logger.isLogEnabled()) {
472                logger.logSQL(sql, Collections.singletonList(nodeId));
473            }
474            column.setToPreparedStatement(ps, 1, nodeId);
475            int n = ps.executeUpdate();
476            countExecute();
477            if (logger.isLogEnabled()) {
478                logger.logCount(n);
479            }
480        }
481    }
482
483    @Override
484    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
485        String sql = dialect.getClusterInsertInvalidations();
486        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
487        try (PreparedStatement ps = connection.prepareStatement(sql)) {
488            int kind = Invalidations.MODIFIED;
489            while (true) {
490                Set<RowId> rowIds = invalidations.getKindSet(kind);
491
492                // reorganize by id
493                Map<Serializable, Set<String>> res = new HashMap<>();
494                for (RowId rowId : rowIds) {
495                    Set<String> tableNames = res.get(rowId.id);
496                    if (tableNames == null) {
497                        res.put(rowId.id, tableNames = new HashSet<>());
498                    }
499                    tableNames.add(rowId.tableName);
500                }
501
502                // do inserts
503                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
504                    Serializable id = en.getKey();
505                    String fragments = join(en.getValue(), ' ');
506                    if (logger.isLogEnabled()) {
507                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
508                    }
509                    Serializable frags;
510                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
511                        frags = fragments.split(" ");
512                    } else {
513                        frags = fragments;
514                    }
515                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
516                    columns.get(1).setToPreparedStatement(ps, 2, id);
517                    columns.get(2).setToPreparedStatement(ps, 3, frags);
518                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
519                    ps.execute();
520                    countExecute();
521                }
522                if (kind == Invalidations.MODIFIED) {
523                    kind = Invalidations.DELETED;
524                } else {
525                    break;
526                }
527            }
528        } catch (SQLException e) {
529            throw new NuxeoException("Could not invalidate", e);
530        }
531    }
532
533    // join that works on a set
534    protected static String join(Collection<String> strings, char sep) {
535        if (strings.isEmpty()) {
536            throw new RuntimeException();
537        }
538        if (strings.size() == 1) {
539            return strings.iterator().next();
540        }
541        int size = 0;
542        for (String word : strings) {
543            size += word.length() + 1;
544        }
545        StringBuilder buf = new StringBuilder(size);
546        for (String word : strings) {
547            buf.append(word);
548            buf.append(sep);
549        }
550        buf.setLength(size - 1);
551        return buf.toString();
552    }
553
554    @Override
555    public Invalidations getClusterInvalidations(Serializable nodeId) {
556        Invalidations invalidations = new Invalidations();
557        String sql = dialect.getClusterGetInvalidations();
558        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
559        if (logger.isLogEnabled()) {
560            logger.logSQL(sql, Collections.singletonList(nodeId));
561        }
562        try (PreparedStatement ps = connection.prepareStatement(sql)) {
563            setToPreparedStatement(ps, 1, nodeId);
564            try (ResultSet rs = ps.executeQuery()) {
565                countExecute();
566                while (rs.next()) {
567                    // first column ignored, it's the node id
568                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
569                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
570                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
571                    String[] fragments;
572                    if (dialect.supportsArrays() && frags instanceof String[]) {
573                        fragments = (String[]) frags;
574                    } else {
575                        fragments = ((String) frags).split(" ");
576                    }
577                    invalidations.add(id, fragments, kind);
578                }
579            }
580            if (logger.isLogEnabled()) {
581                // logCount(n);
582                logger.log("  -> " + invalidations);
583            }
584            if (dialect.isClusteringDeleteNeeded()) {
585                deleteClusterInvals(nodeId);
586            }
587            return invalidations;
588        } catch (SQLException e) {
589            throw new NuxeoException("Could not invalidate", e);
590        }
591    }
592
593    @Override
594    public Serializable getRootId(String repositoryId) {
595        String sql = sqlInfo.getSelectRootIdSql();
596        if (logger.isLogEnabled()) {
597            logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
598        }
599        try (PreparedStatement ps = connection.prepareStatement(sql)) {
600            ps.setString(1, repositoryId);
601            try (ResultSet rs = ps.executeQuery()) {
602                countExecute();
603                if (!rs.next()) {
604                    if (logger.isLogEnabled()) {
605                        logger.log("  -> (none)");
606                    }
607                    return null;
608                }
609                Column column = sqlInfo.getSelectRootIdWhatColumn();
610                Serializable id = column.getFromResultSet(rs, 1);
611                if (logger.isLogEnabled()) {
612                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
613                }
614                // check that we didn't get several rows
615                if (rs.next()) {
616                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
617                }
618                return id;
619            }
620        } catch (SQLException e) {
621            throw new NuxeoException("Could not select: " + sql, e);
622        }
623    }
624
625    @Override
626    public void setRootId(Serializable repositoryId, Serializable id) {
627        String sql = sqlInfo.getInsertRootIdSql();
628        try (PreparedStatement ps = connection.prepareStatement(sql)) {
629            List<Column> columns = sqlInfo.getInsertRootIdColumns();
630            List<Serializable> debugValues = null;
631            if (logger.isLogEnabled()) {
632                debugValues = new ArrayList<>(2);
633            }
634            int i = 0;
635            for (Column column : columns) {
636                i++;
637                String key = column.getKey();
638                Serializable v;
639                if (key.equals(Model.MAIN_KEY)) {
640                    v = id;
641                } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
642                    v = repositoryId;
643                } else {
644                    throw new RuntimeException(key);
645                }
646                column.setToPreparedStatement(ps, i, v);
647                if (debugValues != null) {
648                    debugValues.add(v);
649                }
650            }
651            if (debugValues != null) {
652                logger.logSQL(sql, debugValues);
653                debugValues.clear();
654            }
655            ps.execute();
656            countExecute();
657        } catch (SQLException e) {
658            throw new NuxeoException("Could not insert: " + sql, e);
659        }
660    }
661
662    protected QueryMaker findQueryMaker(String queryType) {
663        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
664            QueryMaker queryMaker;
665            try {
666                queryMaker = klass.newInstance();
667            } catch (ReflectiveOperationException e) {
668                throw new NuxeoException(e);
669            }
670            if (queryMaker.accepts(queryType)) {
671                return queryMaker;
672            }
673        }
674        return null;
675    }
676
677    protected void prepareUserReadAcls(QueryFilter queryFilter) {
678        String sql = dialect.getPrepareUserReadAclsSql();
679        Serializable principals = queryFilter.getPrincipals();
680        if (sql == null || principals == null) {
681            return;
682        }
683        if (!dialect.supportsArrays()) {
684            principals = String.join(Dialect.ARRAY_SEP, (String[]) principals);
685        }
686        try (PreparedStatement ps = connection.prepareStatement(sql)) {
687            if (logger.isLogEnabled()) {
688                logger.logSQL(sql, Collections.singleton(principals));
689            }
690            setToPreparedStatement(ps, 1, principals);
691            ps.execute();
692            countExecute();
693        } catch (SQLException e) {
694            throw new NuxeoException("Failed to prepare user read acl cache", e);
695        }
696    }
697
698    @Override
699    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
700            boolean countTotal) {
701        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
702    }
703
704    @Override
705    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
706        PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo,
707                (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1));
708
709        if (logger.isLogEnabled()) {
710            logger.logIds(result.list, countUpTo != 0, result.totalSize);
711        }
712
713        return result;
714    }
715
716    // queryFilter used for principals and permissions
717    @Override
718    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
719            boolean distinctDocuments, Object... params) {
720        if (dialect.needsPrepareUserReadAcls()) {
721            prepareUserReadAcls(queryFilter);
722        }
723        QueryMaker queryMaker = findQueryMaker(queryType);
724        if (queryMaker == null) {
725            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
726        }
727        query = computeDistinctDocuments(query, distinctDocuments);
728        try {
729            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
730        } catch (SQLException e) {
731            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
732        }
733    }
734
735    @Override
736    public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType,
737            QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) {
738        query = computeDistinctDocuments(query, distinctDocuments);
739        PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo,
740                (info, rs) -> info.mapMaker.makeMap(rs), params);
741
742        if (logger.isLogEnabled()) {
743            logger.logMaps(result.list, countUpTo != 0, result.totalSize);
744        }
745
746        return result;
747    }
748
749    protected String computeDistinctDocuments(String query, boolean distinctDocuments) {
750        if (distinctDocuments) {
751            String q = query.toLowerCase();
752            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
753                // Replace "select" by "select distinct", split at "select ".length() index
754                query = "SELECT DISTINCT " + query.substring(7);
755            }
756        }
757        return query;
758    }
759
760    protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter,
761            long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) {
762        if (dialect.needsPrepareUserReadAcls()) {
763            prepareUserReadAcls(queryFilter);
764        }
765        QueryMaker queryMaker = findQueryMaker(queryType);
766        if (queryMaker == null) {
767            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
768        }
769        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params);
770
771        if (q == null) {
772            logger.log("Query cannot return anything due to conflicting clauses");
773            return new PartialList<>(Collections.emptyList(), 0);
774        }
775        long limit = queryFilter.getLimit();
776        long offset = queryFilter.getOffset();
777
778        if (logger.isLogEnabled()) {
779            String sql = q.selectInfo.sql;
780            if (limit != 0) {
781                sql += " -- LIMIT " + limit + " OFFSET " + offset;
782            }
783            if (countUpTo != 0) {
784                sql += " -- COUNT TOTAL UP TO " + countUpTo;
785            }
786            logger.logSQL(sql, q.selectParams);
787        }
788
789        String sql = q.selectInfo.sql;
790
791        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
792            // full result set not needed for counting
793            sql = dialect.addPagingClause(sql, limit, offset);
794            limit = 0;
795            offset = 0;
796        } else if (countUpTo > 0 && dialect.supportsPaging()) {
797            // ask one more row
798            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
799        }
800
801        try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
802                ResultSet.CONCUR_READ_ONLY)) {
803            int i = 1;
804            for (Serializable object : q.selectParams) {
805                setToPreparedStatement(ps, i++, object);
806            }
807            try (ResultSet rs = ps.executeQuery()) {
808                countExecute();
809
810                // limit/offset
811                long totalSize = -1;
812                boolean available;
813                if ((limit == 0) || (offset == 0)) {
814                    available = rs.first();
815                    if (!available) {
816                        totalSize = 0;
817                    }
818                    if (limit == 0) {
819                        limit = -1; // infinite
820                    }
821                } else {
822                    available = rs.absolute((int) offset + 1);
823                }
824
825                List<T> projections = new LinkedList<>();
826                int rowNum = 0;
827                while (available && (limit != 0)) {
828                    try {
829                        T projection = extractor.apply(q.selectInfo, rs);
830                        projections.add(projection);
831                        rowNum = rs.getRow();
832                        available = rs.next();
833                        limit--;
834                    } catch (SQLDataException e) {
835                        // actually no data available, MariaDB Connector/J lied, stop now
836                        available = false;
837                    }
838                }
839
840                // total size
841                if (countUpTo != 0 && (totalSize == -1)) {
842                    if (!available && (rowNum != 0)) {
843                        // last row read was the actual last
844                        totalSize = rowNum;
845                    } else {
846                        // available if limit reached with some left
847                        // rowNum == 0 if skipped too far
848                        rs.last();
849                        totalSize = rs.getRow();
850                    }
851                    if (countUpTo > 0 && totalSize > countUpTo) {
852                        // the result where truncated we don't know the total size
853                        totalSize = -2;
854                    }
855                }
856
857                return new PartialList<>(projections, totalSize);
858            }
859        } catch (SQLException e) {
860            throw new NuxeoException("Invalid query: " + query, e);
861        }
862    }
863
864    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
865        if (object instanceof Calendar) {
866            Calendar cal = (Calendar) object;
867            ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal);
868        } else if (object instanceof java.sql.Date) {
869            ps.setDate(i, (java.sql.Date) object);
870        } else if (object instanceof Long) {
871            ps.setLong(i, ((Long) object).longValue());
872        } else if (object instanceof WrappedId) {
873            dialect.setId(ps, i, object.toString());
874        } else if (object instanceof Object[]) {
875            int jdbcType;
876            if (object instanceof String[]) {
877                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
878            } else if (object instanceof Boolean[]) {
879                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
880            } else if (object instanceof Long[]) {
881                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
882            } else if (object instanceof Double[]) {
883                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
884            } else if (object instanceof java.sql.Date[]) {
885                jdbcType = Types.DATE;
886            } else if (object instanceof java.sql.Clob[]) {
887                jdbcType = Types.CLOB;
888            } else if (object instanceof Calendar[]) {
889                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
890                object = dialect.getTimestampFromCalendar((Calendar) object);
891            } else if (object instanceof Integer[]) {
892                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
893            } else {
894                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
895            }
896            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
897            ps.setArray(i, array);
898        } else {
899            ps.setObject(i, object);
900        }
901        return i;
902    }
903
904    @Override
905    public ScrollResult scroll(String query, int batchSize, int keepAliveSeconds) {
906        if (!dialect.supportsScroll()) {
907            return defaultScroll(query);
908        }
909        checkForTimedoutScroll();
910        return scrollSearch(query, batchSize, keepAliveSeconds);
911    }
912
913    protected void checkForTimedoutScroll() {
914        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
915    }
916
917    protected ScrollResult scrollSearch(String query, int batchSize, int keepAliveSeconds) {
918        QueryMaker queryMaker = findQueryMaker("NXQL");
919        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
920        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
921        if (q == null) {
922            logger.log("Query cannot return anything due to conflicting clauses");
923            throw new NuxeoException("Query cannot return anything due to conflicting clauses");
924        }
925        if (logger.isLogEnabled()) {
926            logger.logSQL(q.selectInfo.sql, q.selectParams);
927        }
928        try {
929            if (connection.getAutoCommit()) {
930                throw new NuxeoException("Scroll should be done inside a transaction");
931            }
932            // ps MUST NOT be auto-closed because it's referenced by a cursor
933            PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY,
934                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
935            ps.setFetchSize(batchSize);
936            int i = 1;
937            for (Serializable object : q.selectParams) {
938                setToPreparedStatement(ps, i++, object);
939            }
940            // rs MUST NOT be auto-closed because it's referenced by a cursor
941            ResultSet rs = ps.executeQuery();
942            String scrollId = UUID.randomUUID().toString();
943            registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds);
944            return scroll(scrollId);
945        } catch (SQLException e) {
946            throw new NuxeoException("Error on query", e);
947        }
948    }
949
950    protected class CursorResult {
951        protected final int keepAliveSeconds;
952
953        protected final PreparedStatement preparedStatement;
954
955        protected final ResultSet resultSet;
956
957        protected final int batchSize;
958
959        protected long lastCallTimestamp;
960
961        CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) {
962            this.preparedStatement = preparedStatement;
963            this.resultSet = resultSet;
964            this.batchSize = batchSize;
965            this.keepAliveSeconds = keepAliveSeconds;
966            lastCallTimestamp = System.currentTimeMillis();
967        }
968
969        boolean timedOut(String scrollId) {
970            long now = System.currentTimeMillis();
971            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
972                if (unregisterCursor(scrollId)) {
973                    log.warn("Scroll " + scrollId + " timed out");
974                }
975                return true;
976            }
977            return false;
978        }
979
980        void touch() {
981            lastCallTimestamp = System.currentTimeMillis();
982        }
983
984        synchronized void close() throws SQLException {
985            if (resultSet != null) {
986                resultSet.close();
987            }
988            if (preparedStatement != null) {
989                preparedStatement.close();
990            }
991        }
992    }
993
994    protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize,
995            int keepAliveSeconds) {
996        cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds));
997    }
998
999    protected boolean unregisterCursor(String scrollId) {
1000        CursorResult cursor = cursorResults.remove(scrollId);
1001        if (cursor != null) {
1002            try {
1003                cursor.close();
1004                return true;
1005            } catch (SQLException e) {
1006                log.error("Failed to close cursor for scroll: " + scrollId, e);
1007                // do not propagate exception on cleaning
1008            }
1009        }
1010        return false;
1011    }
1012
1013    protected ScrollResult defaultScroll(String query) {
1014        // the database has no proper support for cursor just return everything in one batch
1015        QueryMaker queryMaker = findQueryMaker("NXQL");
1016        List<String> ids;
1017        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
1018        try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) {
1019            ids = new ArrayList<>((int) ret.size());
1020            for (Map<String, Serializable> map : ret) {
1021                ids.add(map.get("ecm:uuid").toString());
1022            }
1023        } catch (SQLException e) {
1024            throw new NuxeoException("Invalid scroll query: " + query, e);
1025        }
1026        return new ScrollResultImpl(NOSCROLL_ID, ids);
1027    }
1028
1029    @Override
1030    public ScrollResult scroll(String scrollId) {
1031        if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) {
1032            // there is only one batch in this case
1033            return emptyResult();
1034        }
1035        CursorResult cursorResult = cursorResults.get(scrollId);
1036        if (cursorResult == null) {
1037            throw new NuxeoException("Unknown or timed out scrollId");
1038        } else if (cursorResult.timedOut(scrollId)) {
1039            throw new NuxeoException("Timed out scrollId");
1040        }
1041        cursorResult.touch();
1042        List<String> ids = new ArrayList<>(cursorResult.batchSize);
1043        synchronized (cursorResult) {
1044            try {
1045                if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) {
1046                    unregisterCursor(scrollId);
1047                    return emptyResult();
1048                }
1049                while (ids.size() < cursorResult.batchSize) {
1050                    if (cursorResult.resultSet.next()) {
1051                        ids.add(cursorResult.resultSet.getString(1));
1052                    } else {
1053                        cursorResult.close();
1054                        if (ids.isEmpty()) {
1055                            unregisterCursor(scrollId);
1056                        }
1057                        break;
1058                    }
1059                }
1060            } catch (SQLException e) {
1061                throw new NuxeoException("Error during scroll", e);
1062            }
1063        }
1064        return new ScrollResultImpl(scrollId, ids);
1065    }
1066
1067    @Override
1068    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
1069        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
1070        if (select == null) {
1071            return getAncestorsIdsIterative(ids);
1072        }
1073        Serializable whereIds = newIdArray(ids);
1074        Set<Serializable> res = new HashSet<>();
1075        if (logger.isLogEnabled()) {
1076            logger.logSQL(select.sql, Collections.singleton(whereIds));
1077        }
1078        Column what = select.whatColumns.get(0);
1079        try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1080            setToPreparedStatementIdArray(ps, 1, whereIds);
1081            try (ResultSet rs = ps.executeQuery()) {
1082                countExecute();
1083                List<Serializable> debugIds = null;
1084                if (logger.isLogEnabled()) {
1085                    debugIds = new LinkedList<>();
1086                }
1087                while (rs.next()) {
1088                    if (dialect.supportsArraysReturnInsteadOfRows()) {
1089                        Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
1090                        for (Serializable id : resultIds) {
1091                            if (id != null) {
1092                                res.add(id);
1093                                if (logger.isLogEnabled()) {
1094                                    debugIds.add(id);
1095                                }
1096                            }
1097                        }
1098                    } else {
1099                        Serializable id = what.getFromResultSet(rs, 1);
1100                        if (id != null) {
1101                            res.add(id);
1102                            if (logger.isLogEnabled()) {
1103                                debugIds.add(id);
1104                            }
1105                        }
1106                    }
1107                }
1108                if (logger.isLogEnabled()) {
1109                    logger.logIds(debugIds, false, 0);
1110                }
1111            }
1112            return res;
1113        } catch (SQLException e) {
1114            throw new NuxeoException("Failed to get ancestors ids", e);
1115        }
1116    }
1117
1118    /**
1119     * Uses iterative parentid selection.
1120     */
1121    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
1122        try {
1123            LinkedList<Serializable> todo = new LinkedList<>(ids);
1124            Set<Serializable> done = new HashSet<>();
1125            Set<Serializable> res = new HashSet<>();
1126            while (!todo.isEmpty()) {
1127                done.addAll(todo);
1128                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
1129                if (logger.isLogEnabled()) {
1130                    logger.logSQL(select.sql, todo);
1131                }
1132                Column what = select.whatColumns.get(0);
1133                Column where = select.whereColumns.get(0);
1134                try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1135                    int i = 1;
1136                    for (Serializable id : todo) {
1137                        where.setToPreparedStatement(ps, i++, id);
1138                    }
1139                    try (ResultSet rs = ps.executeQuery()) {
1140                        countExecute();
1141                        todo = new LinkedList<>();
1142                        List<Serializable> debugIds = null;
1143                        if (logger.isLogEnabled()) {
1144                            debugIds = new LinkedList<>();
1145                        }
1146                        while (rs.next()) {
1147                            Serializable id = what.getFromResultSet(rs, 1);
1148                            if (id != null) {
1149                                res.add(id);
1150                                if (!done.contains(id)) {
1151                                    todo.add(id);
1152                                }
1153                                if (logger.isLogEnabled()) {
1154                                    debugIds.add(id);
1155                                }
1156                            }
1157                        }
1158                        if (logger.isLogEnabled()) {
1159                            logger.logIds(debugIds, false, 0);
1160                        }
1161                    }
1162                }
1163            }
1164            return res;
1165        } catch (SQLException e) {
1166            throw new NuxeoException("Failed to get ancestors ids", e);
1167        }
1168    }
1169
1170    @Override
1171    public void updateReadAcls() {
1172        if (!dialect.supportsReadAcl()) {
1173            return;
1174        }
1175        if (log.isDebugEnabled()) {
1176            log.debug("updateReadAcls: updating");
1177        }
1178        try (Statement st = connection.createStatement()) {
1179            String sql = dialect.getUpdateReadAclsSql();
1180            if (logger.isLogEnabled()) {
1181                logger.log(sql);
1182            }
1183            st.execute(sql);
1184            countExecute();
1185        } catch (SQLException e) {
1186            checkConcurrentUpdate(e);
1187            throw new NuxeoException("Failed to update read acls", e);
1188        }
1189        if (log.isDebugEnabled()) {
1190            log.debug("updateReadAcls: done.");
1191        }
1192    }
1193
1194    @Override
1195    public void rebuildReadAcls() {
1196        if (!dialect.supportsReadAcl()) {
1197            return;
1198        }
1199        log.debug("rebuildReadAcls: rebuilding ...");
1200        try (Statement st = connection.createStatement()) {
1201            String sql = dialect.getRebuildReadAclsSql();
1202            logger.log(sql);
1203            st.execute(sql);
1204            countExecute();
1205        } catch (SQLException e) {
1206            throw new NuxeoException("Failed to rebuild read acls", e);
1207        }
1208        log.debug("rebuildReadAcls: done.");
1209    }
1210
1211    /*
1212     * ----- Locking -----
1213     */
1214
1215    @Override
1216    public Lock getLock(Serializable id) {
1217        if (log.isDebugEnabled()) {
1218            try {
1219                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1220            } catch (SQLException e) {
1221                throw new RuntimeException(e);
1222            }
1223        }
1224        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1225        Row row = readSimpleRow(rowId);
1226        return row == null ? null
1227                : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY));
1228    }
1229
1230    @Override
1231    public Lock setLock(final Serializable id, final Lock lock) {
1232        if (log.isDebugEnabled()) {
1233            log.debug("setLock " + id + " owner=" + lock.getOwner());
1234        }
1235        Lock oldLock = getLock(id);
1236        if (oldLock == null) {
1237            Row row = new Row(Model.LOCK_TABLE_NAME, id);
1238            row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1239            row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1240            insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1241        }
1242        return oldLock;
1243    }
1244
1245    @Override
1246    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1247        if (log.isDebugEnabled()) {
1248            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1249        }
1250        Lock oldLock = force ? null : getLock(id);
1251        if (!force && owner != null) {
1252            if (oldLock == null) {
1253                // not locked, nothing to do
1254                return null;
1255            }
1256            if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1257                // existing mismatched lock, flag failure
1258                return new Lock(oldLock, true);
1259            }
1260        }
1261        if (force || oldLock != null) {
1262            deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1263        }
1264        return oldLock;
1265    }
1266
1267    @Override
1268    public void markReferencedBinaries() {
1269        log.debug("Starting binaries GC mark");
1270        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
1271        String repositoryName = getRepositoryName();
1272        try (Statement st = connection.createStatement()) {
1273            int i = -1;
1274            for (String sql : sqlInfo.getBinariesSql) {
1275                i++;
1276                Column col = sqlInfo.getBinariesColumns.get(i);
1277                if (logger.isLogEnabled()) {
1278                    logger.log(sql);
1279                }
1280                try (ResultSet rs = st.executeQuery(sql)) {
1281                    countExecute();
1282                    int n = 0;
1283                    while (rs.next()) {
1284                        n++;
1285                        String key = (String) col.getFromResultSet(rs, 1);
1286                        if (key != null) {
1287                            blobManager.markReferencedBinary(key, repositoryName);
1288                        }
1289                    }
1290                    if (logger.isLogEnabled()) {
1291                        logger.logCount(n);
1292                    }
1293                }
1294            }
1295        } catch (SQLException e) {
1296            throw new RuntimeException("Failed to mark binaries for gC", e);
1297        }
1298        log.debug("End of binaries GC mark");
1299    }
1300
1301    /*
1302     * ----- XAResource -----
1303     */
1304
1305    protected static String systemToString(Object o) {
1306        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1307    }
1308
1309    @Override
1310    public void start(Xid xid, int flags) throws XAException {
1311        try {
1312            xaresource.start(xid, flags);
1313            if (logger.isLogEnabled()) {
1314                logger.log("XA start on " + systemToString(xid));
1315            }
1316        } catch (NuxeoException e) {
1317            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1318        } catch (XAException e) {
1319            logger.error("XA start error on " + systemToString(xid), e);
1320            throw e;
1321        }
1322    }
1323
1324    @Override
1325    public void end(Xid xid, int flags) throws XAException {
1326        try {
1327            xaresource.end(xid, flags);
1328            if (logger.isLogEnabled()) {
1329                logger.log("XA end on " + systemToString(xid));
1330            }
1331        } catch (NullPointerException e) {
1332            // H2 when no active transaction
1333            logger.error("XA end error on " + systemToString(xid), e);
1334            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1335        } catch (XAException e) {
1336            if (flags != XAResource.TMFAIL) {
1337                logger.error("XA end error on " + systemToString(xid), e);
1338            }
1339            throw e;
1340        }
1341    }
1342
1343    @Override
1344    public int prepare(Xid xid) throws XAException {
1345        try {
1346            return xaresource.prepare(xid);
1347        } catch (XAException e) {
1348            logger.error("XA prepare error on  " + systemToString(xid), e);
1349            throw e;
1350        }
1351    }
1352
1353    @Override
1354    public void commit(Xid xid, boolean onePhase) throws XAException {
1355        try {
1356            xaresource.commit(xid, onePhase);
1357        } catch (XAException e) {
1358            logger.error("XA commit error on  " + systemToString(xid), e);
1359            throw e;
1360        }
1361    }
1362
1363    // rollback interacts with caches so is in RowMapper
1364
1365    @Override
1366    public void forget(Xid xid) throws XAException {
1367        xaresource.forget(xid);
1368    }
1369
1370    @Override
1371    public Xid[] recover(int flag) throws XAException {
1372        return xaresource.recover(flag);
1373    }
1374
1375    @Override
1376    public boolean setTransactionTimeout(int seconds) throws XAException {
1377        return xaresource.setTransactionTimeout(seconds);
1378    }
1379
1380    @Override
1381    public int getTransactionTimeout() throws XAException {
1382        return xaresource.getTransactionTimeout();
1383    }
1384
1385    @Override
1386    public boolean isSameRM(XAResource xares) throws XAException {
1387        throw new UnsupportedOperationException();
1388    }
1389
1390    @Override
1391    public boolean isConnected() {
1392        return connection != null;
1393    }
1394
1395    @Override
1396    public void connect(boolean noSharing) {
1397        openConnections(noSharing);
1398    }
1399
1400    @Override
1401    public void disconnect() {
1402        closeConnections();
1403    }
1404
1405    /**
1406     * @since 7.10-HF25, 8.10-HF06, 9.2
1407     */
1408    @FunctionalInterface
1409    protected interface BiFunctionSQLException<T, U, R> {
1410
1411        /**
1412         * Applies this function to the given arguments.
1413         *
1414         * @param t the first function argument
1415         * @param u the second function argument
1416         * @return the function result
1417         */
1418        R apply(T t, U u) throws SQLException;
1419
1420    }
1421
1422}