001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023
024import java.io.File;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.PrintStream;
029import java.io.Serializable;
030import java.security.MessageDigest;
031import java.security.NoSuchAlgorithmException;
032import java.sql.Array;
033import java.sql.DatabaseMetaData;
034import java.sql.PreparedStatement;
035import java.sql.ResultSet;
036import java.sql.SQLDataException;
037import java.sql.SQLException;
038import java.sql.Statement;
039import java.sql.Types;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Calendar;
043import java.util.Collection;
044import java.util.Collections;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.Map.Entry;
051import java.util.Set;
052import java.util.UUID;
053import java.util.concurrent.ConcurrentHashMap;
054
055import javax.transaction.xa.XAException;
056import javax.transaction.xa.XAResource;
057import javax.transaction.xa.Xid;
058
059import org.apache.commons.logging.Log;
060import org.apache.commons.logging.LogFactory;
061import org.nuxeo.common.Environment;
062import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
063import org.nuxeo.ecm.core.api.IterableQueryResult;
064import org.nuxeo.ecm.core.api.Lock;
065import org.nuxeo.ecm.core.api.NuxeoException;
066import org.nuxeo.ecm.core.api.PartialList;
067import org.nuxeo.ecm.core.api.ScrollResult;
068import org.nuxeo.ecm.core.api.ScrollResultImpl;
069import org.nuxeo.ecm.core.blob.DocumentBlobManager;
070import org.nuxeo.ecm.core.model.LockManager;
071import org.nuxeo.ecm.core.query.QueryFilter;
072import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
073import org.nuxeo.ecm.core.storage.sql.ColumnType;
074import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
075import org.nuxeo.ecm.core.storage.sql.Invalidations;
076import org.nuxeo.ecm.core.storage.sql.Mapper;
077import org.nuxeo.ecm.core.storage.sql.Model;
078import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
079import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
080import org.nuxeo.ecm.core.storage.sql.Row;
081import org.nuxeo.ecm.core.storage.sql.RowId;
082import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
083import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
086import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
089import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
090import org.nuxeo.runtime.api.Framework;
091
092/**
093 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
094 * computes statements.
095 * <p>
096 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
097 * statements recorded in the {@link SQLInfo}.
098 */
099public class JDBCMapper extends JDBCRowMapper implements Mapper {
100
101    private static final Log log = LogFactory.getLog(JDBCMapper.class);
102
103    public static Map<String, Serializable> testProps = new HashMap<>();
104
105    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
106
107    public static final String TEST_UPGRADE = "testUpgrade";
108
109    // property in sql.txt file
110    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
111
112    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
113
114    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
115
116    public static final String TEST_UPGRADE_SYS_CHANGE_TOKEN = "testUpgradeSysChangeToken";
117
118    protected TableUpgrader tableUpgrader;
119
120    private final QueryMakerService queryMakerService;
121
122    private final PathResolver pathResolver;
123
124    private final RepositoryImpl repository;
125
126    protected boolean clusteringEnabled;
127
128    protected static final String NOSCROLL_ID = "noscroll";
129
130    /**
131     * Creates a new Mapper.
132     *
133     * @param model the model
134     * @param pathResolver the path resolver (used for startswith queries)
135     * @param sqlInfo the sql info
136     * @param clusterInvalidator the cluster invalidator
137     * @param repository the repository
138     */
139    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator,
140            RepositoryImpl repository) {
141        super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator());
142        this.pathResolver = pathResolver;
143        this.repository = repository;
144        clusteringEnabled = clusterInvalidator != null;
145        queryMakerService = Framework.getService(QueryMakerService.class);
146
147        tableUpgrader = new TableUpgrader(this);
148        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
149                TEST_UPGRADE_VERSIONS);
150        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
151        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
152        tableUpgrader.add(Model.HIER_TABLE_NAME, Model.MAIN_SYS_CHANGE_TOKEN_KEY, "upgradeSysChangeToken",
153                TEST_UPGRADE_SYS_CHANGE_TOKEN);
154    }
155
156    @Override
157    public int getTableSize(String tableName) {
158        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
159    }
160
161    /*
162     * ----- Root -----
163     */
164
165    @Override
166    public void createDatabase(String ddlMode) {
167        // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it
168        try {
169            if (!connection.getAutoCommit()) {
170                throw new NuxeoException("connection should not run in transactional mode for DDL operations");
171            }
172            createTables(ddlMode);
173        } catch (SQLException e) {
174            throw new NuxeoException(e);
175        }
176    }
177
178    protected String getTableName(String origName) {
179
180        if (dialect instanceof DialectOracle) {
181            if (origName.length() > 30) {
182
183                StringBuilder sb = new StringBuilder(origName.length());
184
185                try {
186                    MessageDigest digest = MessageDigest.getInstance("MD5");
187                    sb.append(origName.substring(0, 15));
188                    sb.append('_');
189
190                    digest.update(origName.getBytes());
191                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
192
193                    return sb.toString();
194
195                } catch (NoSuchAlgorithmException e) {
196                    throw new RuntimeException("Error", e);
197                }
198            }
199        }
200
201        return origName;
202    }
203
204    protected void createTables(String ddlMode) throws SQLException {
205        ListCollector ddlCollector = new ListCollector();
206
207        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
208        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
209        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
210        if (testProps.containsKey(TEST_UPGRADE)) {
211            // create "old" tables
212            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
213        }
214
215        String schemaName = dialect.getConnectionSchema(connection);
216        DatabaseMetaData metadata = connection.getMetaData();
217        Set<String> tableNames = findTableNames(metadata, schemaName);
218        Database database = sqlInfo.getDatabase();
219        Map<String, List<Column>> added = new HashMap<>();
220
221        for (Table table : database.getTables()) {
222            String tableName = getTableName(table.getPhysicalName());
223            if (!tableNames.contains(tableName.toUpperCase())) {
224
225                /*
226                 * Create missing table.
227                 */
228
229                ddlCollector.add(table.getCreateSql());
230                ddlCollector.addAll(table.getPostCreateSqls(model));
231
232                added.put(table.getKey(), null); // null = table created
233
234                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
235
236            } else {
237
238                /*
239                 * Get existing columns.
240                 */
241
242                Map<String, Integer> columnTypes = new HashMap<>();
243                Map<String, String> columnTypeNames = new HashMap<>();
244                Map<String, Integer> columnTypeSizes = new HashMap<>();
245                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
246                    while (rs.next()) {
247                        String schema = rs.getString("TABLE_SCHEM");
248                        if (schema != null) { // null for MySQL, doh!
249                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
250                                // H2 returns some system tables (locks)
251                                continue;
252                            }
253                        }
254                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
255                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
256                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
257                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
258                    }
259                }
260
261                /*
262                 * Update types and create missing columns.
263                 */
264
265                List<Column> addedColumns = new LinkedList<>();
266                for (Column column : table.getColumns()) {
267                    String upperName = column.getPhysicalName().toUpperCase();
268                    Integer type = columnTypes.remove(upperName);
269                    if (type == null) {
270                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
271                        ddlCollector.add(table.getAddColumnSql(column));
272                        ddlCollector.addAll(table.getPostAddSqls(column, model));
273                        addedColumns.add(column);
274                    } else {
275                        int expected = column.getJdbcType();
276                        int actual = type.intValue();
277                        String actualName = columnTypeNames.get(upperName);
278                        Integer actualSize = columnTypeSizes.get(upperName);
279                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
280                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
281                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName,
282                                    actualSize));
283                        }
284                    }
285                }
286                for (String col : dialect.getIgnoredColumns(table)) {
287                    columnTypes.remove(col.toUpperCase());
288                }
289                if (!columnTypes.isEmpty()) {
290                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
291                            + String.join(", ", columnTypes.keySet()));
292                }
293                if (!addedColumns.isEmpty()) {
294                    if (added.containsKey(table.getKey())) {
295                        throw new AssertionError();
296                    }
297                    added.put(table.getKey(), addedColumns);
298                }
299            }
300        }
301
302        if (testProps.containsKey(TEST_UPGRADE)) {
303            // create "old" content in tables
304            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
305        }
306
307        // run upgrade for each table if added columns or test
308        for (Entry<String, List<Column>> en : added.entrySet()) {
309            List<Column> addedColumns = en.getValue();
310            String tableKey = en.getKey();
311            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
312        }
313
314        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
315        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
316
317        // aclr_permission check for PostgreSQL
318        dialect.performAdditionalStatements(connection);
319
320        /*
321         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
322         */
323
324        // ddlMode may be:
325        // ignore (not treated here, nothing done)
326        // dump (implies execute)
327        // dump,execute
328        // dump,ignore (no execute)
329        // execute
330        // abort (implies dump)
331        // compat can be used instead of execute to always recreate stored procedures
332
333        List<String> ddl = ddlCollector.getStrings();
334        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
335        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
336        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
337        if (dump || abort) {
338
339            /*
340             * Dump DDL if not empty.
341             */
342
343            if (!ddl.isEmpty()) {
344                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
345                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
346                    for (String sql : dialect.getDumpStart()) {
347                        ps.println(sql);
348                    }
349                    for (String sql : ddl) {
350                        sql = sql.trim();
351                        if (sql.endsWith(";")) {
352                            sql = sql.substring(0, sql.length() - 1);
353                        }
354                        ps.println(dialect.getSQLForDump(sql));
355                    }
356                    for (String sql : dialect.getDumpStop()) {
357                        ps.println(sql);
358                    }
359                } catch (IOException e) {
360                    throw new NuxeoException(e);
361                }
362
363                /*
364                 * Abort if requested.
365                 */
366
367                if (abort) {
368                    log.error("Dumped DDL to: " + dumpFile);
369                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
370                            + ", DDL must be executed: " + dumpFile);
371                }
372            }
373        }
374        if (!ignore) {
375
376            /*
377             * Execute DDL.
378             */
379
380            try (Statement st = connection.createStatement()) {
381                for (String sql : ddl) {
382                    logger.log(sql.replace("\n", "\n    ")); // indented
383                    try {
384                        st.execute(sql);
385                    } catch (SQLException e) {
386                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
387                    }
388                    countExecute();
389                }
390            }
391
392            /*
393             * Execute post-DDL stuff.
394             */
395
396            try (Statement st = connection.createStatement()) {
397                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
398                    logger.log(sql.replace("\n", "\n    ")); // indented
399                    try {
400                        st.execute(sql);
401                    } catch (SQLException e) {
402                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
403                    }
404                    countExecute();
405                }
406            }
407        }
408    }
409
410    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
411            throws SQLException {
412        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
413    }
414
415    /** Finds uppercase table names. */
416    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
417        Set<String> tableNames = new HashSet<>();
418        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
419        while (rs.next()) {
420            String tableName = rs.getString("TABLE_NAME");
421            tableNames.add(tableName.toUpperCase());
422        }
423        rs.close();
424        return tableNames;
425    }
426
427    @Override
428    public int getClusterNodeIdType() {
429        return sqlInfo.getClusterNodeIdType();
430    }
431
432    @Override
433    public void createClusterNode(Serializable nodeId) {
434        Calendar now = Calendar.getInstance();
435        String sql = sqlInfo.getCreateClusterNodeSql();
436        List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
437        try (PreparedStatement ps = connection.prepareStatement(sql)) {
438            if (logger.isLogEnabled()) {
439                logger.logSQL(sql, Arrays.asList(nodeId, now));
440            }
441            columns.get(0).setToPreparedStatement(ps, 1, nodeId);
442            columns.get(1).setToPreparedStatement(ps, 2, now);
443            ps.execute();
444
445        } catch (SQLException e) {
446            try {
447                checkConcurrentUpdate(e);
448            } catch (ConcurrentUpdateException cue) {
449                cue.addInfo(
450                        "Duplicate cluster node with id: " + nodeId
451                                + " (a crashed node must be cleaned up, or the repository.clustering.id configuration fixed)");
452                throw cue;
453            }
454            throw new NuxeoException(e);
455        }
456    }
457
458    @Override
459    public void removeClusterNode(Serializable nodeId) {
460        // delete from cluster_nodes
461        String sql = sqlInfo.getDeleteClusterNodeSql();
462        Column column = sqlInfo.getDeleteClusterNodeColumn();
463        try (PreparedStatement ps = connection.prepareStatement(sql)) {
464            if (logger.isLogEnabled()) {
465                logger.logSQL(sql, Collections.singletonList(nodeId));
466            }
467            column.setToPreparedStatement(ps, 1, nodeId);
468            ps.execute();
469            // delete un-processed invals from cluster_invals
470            deleteClusterInvals(nodeId);
471        } catch (SQLException e) {
472            throw new NuxeoException(e);
473        }
474    }
475
476    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
477        String sql = sqlInfo.getDeleteClusterInvalsSql();
478        Column column = sqlInfo.getDeleteClusterInvalsColumn();
479        try (PreparedStatement ps = connection.prepareStatement(sql)) {
480            if (logger.isLogEnabled()) {
481                logger.logSQL(sql, Collections.singletonList(nodeId));
482            }
483            column.setToPreparedStatement(ps, 1, nodeId);
484            int n = ps.executeUpdate();
485            countExecute();
486            if (logger.isLogEnabled()) {
487                logger.logCount(n);
488            }
489        }
490    }
491
492    @Override
493    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
494        String sql = dialect.getClusterInsertInvalidations();
495        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
496        try (PreparedStatement ps = connection.prepareStatement(sql)) {
497            int kind = Invalidations.MODIFIED;
498            while (true) {
499                Set<RowId> rowIds = invalidations.getKindSet(kind);
500
501                // reorganize by id
502                Map<Serializable, Set<String>> res = new HashMap<>();
503                for (RowId rowId : rowIds) {
504                    Set<String> tableNames = res.get(rowId.id);
505                    if (tableNames == null) {
506                        res.put(rowId.id, tableNames = new HashSet<>());
507                    }
508                    tableNames.add(rowId.tableName);
509                }
510
511                // do inserts
512                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
513                    Serializable id = en.getKey();
514                    String fragments = join(en.getValue(), ' ');
515                    if (logger.isLogEnabled()) {
516                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
517                    }
518                    Serializable frags;
519                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
520                        frags = fragments.split(" ");
521                    } else {
522                        frags = fragments;
523                    }
524                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
525                    columns.get(1).setToPreparedStatement(ps, 2, id);
526                    columns.get(2).setToPreparedStatement(ps, 3, frags);
527                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
528                    ps.execute();
529                    countExecute();
530                }
531                if (kind == Invalidations.MODIFIED) {
532                    kind = Invalidations.DELETED;
533                } else {
534                    break;
535                }
536            }
537        } catch (SQLException e) {
538            throw new NuxeoException("Could not invalidate", e);
539        }
540    }
541
542    // join that works on a set
543    protected static String join(Collection<String> strings, char sep) {
544        if (strings.isEmpty()) {
545            throw new RuntimeException();
546        }
547        if (strings.size() == 1) {
548            return strings.iterator().next();
549        }
550        int size = 0;
551        for (String word : strings) {
552            size += word.length() + 1;
553        }
554        StringBuilder buf = new StringBuilder(size);
555        for (String word : strings) {
556            buf.append(word);
557            buf.append(sep);
558        }
559        buf.setLength(size - 1);
560        return buf.toString();
561    }
562
563    @Override
564    public Invalidations getClusterInvalidations(Serializable nodeId) {
565        Invalidations invalidations = new Invalidations();
566        String sql = dialect.getClusterGetInvalidations();
567        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
568        if (logger.isLogEnabled()) {
569            logger.logSQL(sql, Collections.singletonList(nodeId));
570        }
571        try (PreparedStatement ps = connection.prepareStatement(sql)) {
572            setToPreparedStatement(ps, 1, nodeId);
573            try (ResultSet rs = ps.executeQuery()) {
574                countExecute();
575                while (rs.next()) {
576                    // first column ignored, it's the node id
577                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
578                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
579                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
580                    String[] fragments;
581                    if (dialect.supportsArrays() && frags instanceof String[]) {
582                        fragments = (String[]) frags;
583                    } else {
584                        fragments = ((String) frags).split(" ");
585                    }
586                    invalidations.add(id, fragments, kind);
587                }
588            }
589            if (logger.isLogEnabled()) {
590                // logCount(n);
591                logger.log("  -> " + invalidations);
592            }
593            if (dialect.isClusteringDeleteNeeded()) {
594                deleteClusterInvals(nodeId);
595            }
596            return invalidations;
597        } catch (SQLException e) {
598            throw new NuxeoException("Could not invalidate", e);
599        }
600    }
601
602    @Override
603    public Serializable getRootId(String repositoryId) {
604        String sql = sqlInfo.getSelectRootIdSql();
605        if (logger.isLogEnabled()) {
606            logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
607        }
608        try (PreparedStatement ps = connection.prepareStatement(sql)) {
609            ps.setString(1, repositoryId);
610            try (ResultSet rs = ps.executeQuery()) {
611                countExecute();
612                if (!rs.next()) {
613                    if (logger.isLogEnabled()) {
614                        logger.log("  -> (none)");
615                    }
616                    return null;
617                }
618                Column column = sqlInfo.getSelectRootIdWhatColumn();
619                Serializable id = column.getFromResultSet(rs, 1);
620                if (logger.isLogEnabled()) {
621                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
622                }
623                // check that we didn't get several rows
624                if (rs.next()) {
625                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
626                }
627                return id;
628            }
629        } catch (SQLException e) {
630            throw new NuxeoException("Could not select: " + sql, e);
631        }
632    }
633
634    @Override
635    public void setRootId(Serializable repositoryId, Serializable id) {
636        String sql = sqlInfo.getInsertRootIdSql();
637        try (PreparedStatement ps = connection.prepareStatement(sql)) {
638            List<Column> columns = sqlInfo.getInsertRootIdColumns();
639            List<Serializable> debugValues = null;
640            if (logger.isLogEnabled()) {
641                debugValues = new ArrayList<>(2);
642            }
643            int i = 0;
644            for (Column column : columns) {
645                i++;
646                String key = column.getKey();
647                Serializable v;
648                if (key.equals(Model.MAIN_KEY)) {
649                    v = id;
650                } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
651                    v = repositoryId;
652                } else {
653                    throw new RuntimeException(key);
654                }
655                column.setToPreparedStatement(ps, i, v);
656                if (debugValues != null) {
657                    debugValues.add(v);
658                }
659            }
660            if (debugValues != null) {
661                logger.logSQL(sql, debugValues);
662                debugValues.clear();
663            }
664            ps.execute();
665            countExecute();
666        } catch (SQLException e) {
667            throw new NuxeoException("Could not insert: " + sql, e);
668        }
669    }
670
671    protected QueryMaker findQueryMaker(String queryType) {
672        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
673            QueryMaker queryMaker;
674            try {
675                queryMaker = klass.newInstance();
676            } catch (ReflectiveOperationException e) {
677                throw new NuxeoException(e);
678            }
679            if (queryMaker.accepts(queryType)) {
680                return queryMaker;
681            }
682        }
683        return null;
684    }
685
686    protected void prepareUserReadAcls(QueryFilter queryFilter) {
687        String sql = dialect.getPrepareUserReadAclsSql();
688        Serializable principals = queryFilter.getPrincipals();
689        if (sql == null || principals == null) {
690            return;
691        }
692        if (!dialect.supportsArrays()) {
693            principals = String.join(Dialect.ARRAY_SEP, (String[]) principals);
694        }
695        try (PreparedStatement ps = connection.prepareStatement(sql)) {
696            if (logger.isLogEnabled()) {
697                logger.logSQL(sql, Collections.singleton(principals));
698            }
699            setToPreparedStatement(ps, 1, principals);
700            ps.execute();
701            countExecute();
702        } catch (SQLException e) {
703            throw new NuxeoException("Failed to prepare user read acl cache", e);
704        }
705    }
706
707    @Override
708    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
709            boolean countTotal) {
710        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
711    }
712
713    @Override
714    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
715        PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo,
716                (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1));
717
718        if (logger.isLogEnabled()) {
719            logger.logIds(result, countUpTo != 0, result.totalSize());
720        }
721
722        return result;
723    }
724
725    // queryFilter used for principals and permissions
726    @Override
727    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
728            boolean distinctDocuments, Object... params) {
729        if (dialect.needsPrepareUserReadAcls()) {
730            prepareUserReadAcls(queryFilter);
731        }
732        QueryMaker queryMaker = findQueryMaker(queryType);
733        if (queryMaker == null) {
734            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
735        }
736        query = computeDistinctDocuments(query, distinctDocuments);
737        try {
738            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
739        } catch (SQLException e) {
740            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
741        }
742    }
743
744    @Override
745    public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType,
746            QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) {
747        query = computeDistinctDocuments(query, distinctDocuments);
748        PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo,
749                (info, rs) -> info.mapMaker.makeMap(rs), params);
750
751        if (logger.isLogEnabled()) {
752            logger.logMaps(result, countUpTo != 0, result.totalSize());
753        }
754
755        return result;
756    }
757
758    protected String computeDistinctDocuments(String query, boolean distinctDocuments) {
759        if (distinctDocuments) {
760            String q = query.toLowerCase();
761            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
762                // Replace "select" by "select distinct", split at "select ".length() index
763                query = "SELECT DISTINCT " + query.substring(7);
764            }
765        }
766        return query;
767    }
768
769    protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter,
770            long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) {
771        if (dialect.needsPrepareUserReadAcls()) {
772            prepareUserReadAcls(queryFilter);
773        }
774        QueryMaker queryMaker = findQueryMaker(queryType);
775        if (queryMaker == null) {
776            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
777        }
778        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params);
779
780        if (q == null) {
781            logger.log("Query cannot return anything due to conflicting clauses");
782            return new PartialList<>(Collections.emptyList(), 0);
783        }
784        long limit = queryFilter.getLimit();
785        long offset = queryFilter.getOffset();
786
787        if (logger.isLogEnabled()) {
788            String sql = q.selectInfo.sql;
789            if (limit != 0) {
790                sql += " -- LIMIT " + limit + " OFFSET " + offset;
791            }
792            if (countUpTo != 0) {
793                sql += " -- COUNT TOTAL UP TO " + countUpTo;
794            }
795            logger.logSQL(sql, q.selectParams);
796        }
797
798        String sql = q.selectInfo.sql;
799
800        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
801            // full result set not needed for counting
802            sql = dialect.addPagingClause(sql, limit, offset);
803            limit = 0;
804            offset = 0;
805        } else if (countUpTo > 0 && dialect.supportsPaging()) {
806            // ask one more row
807            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
808        }
809
810        try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
811                ResultSet.CONCUR_READ_ONLY)) {
812            int i = 1;
813            for (Serializable object : q.selectParams) {
814                setToPreparedStatement(ps, i++, object);
815            }
816            try (ResultSet rs = ps.executeQuery()) {
817                countExecute();
818
819                // limit/offset
820                long totalSize = -1;
821                boolean available;
822                if ((limit == 0) || (offset == 0)) {
823                    available = rs.first();
824                    if (!available) {
825                        totalSize = 0;
826                    }
827                    if (limit == 0) {
828                        limit = -1; // infinite
829                    }
830                } else {
831                    available = rs.absolute((int) offset + 1);
832                }
833
834                List<T> projections = new LinkedList<>();
835                int rowNum = 0;
836                while (available && (limit != 0)) {
837                    try {
838                        T projection = extractor.apply(q.selectInfo, rs);
839                        projections.add(projection);
840                        rowNum = rs.getRow();
841                        available = rs.next();
842                        limit--;
843                    } catch (SQLDataException e) {
844                        // actually no data available, MariaDB Connector/J lied, stop now
845                        available = false;
846                    }
847                }
848
849                // total size
850                if (countUpTo != 0 && (totalSize == -1)) {
851                    if (!available && (rowNum != 0)) {
852                        // last row read was the actual last
853                        totalSize = rowNum;
854                    } else {
855                        // available if limit reached with some left
856                        // rowNum == 0 if skipped too far
857                        rs.last();
858                        totalSize = rs.getRow();
859                    }
860                    if (countUpTo > 0 && totalSize > countUpTo) {
861                        // the result where truncated we don't know the total size
862                        totalSize = -2;
863                    }
864                }
865
866                return new PartialList<>(projections, totalSize);
867            }
868        } catch (SQLException e) {
869            throw new NuxeoException("Invalid query: " + query, e);
870        }
871    }
872
873    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
874        if (object instanceof Calendar) {
875            dialect.setToPreparedStatementTimestamp(ps, i, object, null);
876        } else if (object instanceof java.sql.Date) {
877            ps.setDate(i, (java.sql.Date) object);
878        } else if (object instanceof Long) {
879            ps.setLong(i, ((Long) object).longValue());
880        } else if (object instanceof WrappedId) {
881            dialect.setId(ps, i, object.toString());
882        } else if (object instanceof Object[]) {
883            int jdbcType;
884            if (object instanceof String[]) {
885                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
886            } else if (object instanceof Boolean[]) {
887                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
888            } else if (object instanceof Long[]) {
889                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
890            } else if (object instanceof Double[]) {
891                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
892            } else if (object instanceof java.sql.Date[]) {
893                jdbcType = Types.DATE;
894            } else if (object instanceof java.sql.Clob[]) {
895                jdbcType = Types.CLOB;
896            } else if (object instanceof Calendar[]) {
897                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
898                object = dialect.getTimestampFromCalendar((Calendar[]) object);
899            } else if (object instanceof Integer[]) {
900                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
901            } else {
902                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
903            }
904            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
905            ps.setArray(i, array);
906        } else {
907            ps.setObject(i, object);
908        }
909        return i;
910    }
911
912    @Override
913    public ScrollResult<String> scroll(String query, int batchSize, int keepAliveSeconds) {
914        if (!dialect.supportsScroll()) {
915            return defaultScroll(query);
916        }
917        checkForTimedoutScroll();
918        return scrollSearch(query, batchSize, keepAliveSeconds);
919    }
920
921    protected void checkForTimedoutScroll() {
922        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
923    }
924
925    protected ScrollResult<String> scrollSearch(String query, int batchSize, int keepAliveSeconds) {
926        QueryMaker queryMaker = findQueryMaker("NXQL");
927        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
928        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
929        if (q == null) {
930            logger.log("Query cannot return anything due to conflicting clauses");
931            throw new NuxeoException("Query cannot return anything due to conflicting clauses");
932        }
933        if (logger.isLogEnabled()) {
934            logger.logSQL(q.selectInfo.sql, q.selectParams);
935        }
936        try {
937            if (connection.getAutoCommit()) {
938                throw new NuxeoException("Scroll should be done inside a transaction");
939            }
940            // ps MUST NOT be auto-closed because it's referenced by a cursor
941            PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY,
942                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
943            ps.setFetchSize(batchSize);
944            int i = 1;
945            for (Serializable object : q.selectParams) {
946                setToPreparedStatement(ps, i++, object);
947            }
948            // rs MUST NOT be auto-closed because it's referenced by a cursor
949            ResultSet rs = ps.executeQuery();
950            String scrollId = UUID.randomUUID().toString();
951            registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds);
952            return scroll(scrollId);
953        } catch (SQLException e) {
954            throw new NuxeoException("Error on query", e);
955        }
956    }
957
958    protected class CursorResult {
959        protected final int keepAliveSeconds;
960
961        protected final PreparedStatement preparedStatement;
962
963        protected final ResultSet resultSet;
964
965        protected final int batchSize;
966
967        protected long lastCallTimestamp;
968
969        CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) {
970            this.preparedStatement = preparedStatement;
971            this.resultSet = resultSet;
972            this.batchSize = batchSize;
973            this.keepAliveSeconds = keepAliveSeconds;
974            lastCallTimestamp = System.currentTimeMillis();
975        }
976
977        boolean timedOut(String scrollId) {
978            long now = System.currentTimeMillis();
979            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
980                if (unregisterCursor(scrollId)) {
981                    log.warn("Scroll " + scrollId + " timed out");
982                }
983                return true;
984            }
985            return false;
986        }
987
988        void touch() {
989            lastCallTimestamp = System.currentTimeMillis();
990        }
991
992        synchronized void close() throws SQLException {
993            if (resultSet != null) {
994                resultSet.close();
995            }
996            if (preparedStatement != null) {
997                preparedStatement.close();
998            }
999        }
1000    }
1001
1002    protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize,
1003            int keepAliveSeconds) {
1004        cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds));
1005    }
1006
1007    protected boolean unregisterCursor(String scrollId) {
1008        CursorResult cursor = cursorResults.remove(scrollId);
1009        if (cursor != null) {
1010            try {
1011                cursor.close();
1012                return true;
1013            } catch (SQLException e) {
1014                log.error("Failed to close cursor for scroll: " + scrollId, e);
1015                // do not propagate exception on cleaning
1016            }
1017        }
1018        return false;
1019    }
1020
1021    protected ScrollResult<String> defaultScroll(String query) {
1022        // the database has no proper support for cursor just return everything in one batch
1023        QueryMaker queryMaker = findQueryMaker("NXQL");
1024        List<String> ids;
1025        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
1026        try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) {
1027            ids = new ArrayList<>((int) ret.size());
1028            for (Map<String, Serializable> map : ret) {
1029                ids.add(map.get("ecm:uuid").toString());
1030            }
1031        } catch (SQLException e) {
1032            throw new NuxeoException("Invalid scroll query: " + query, e);
1033        }
1034        return new ScrollResultImpl<>(NOSCROLL_ID, ids);
1035    }
1036
1037    @Override
1038    public ScrollResult<String> scroll(String scrollId) {
1039        if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) {
1040            // there is only one batch in this case
1041            return emptyResult();
1042        }
1043        CursorResult cursorResult = cursorResults.get(scrollId);
1044        if (cursorResult == null) {
1045            throw new NuxeoException("Unknown or timed out scrollId");
1046        } else if (cursorResult.timedOut(scrollId)) {
1047            throw new NuxeoException("Timed out scrollId");
1048        }
1049        cursorResult.touch();
1050        List<String> ids = new ArrayList<>(cursorResult.batchSize);
1051        synchronized (cursorResult) {
1052            try {
1053                if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) {
1054                    unregisterCursor(scrollId);
1055                    return emptyResult();
1056                }
1057                while (ids.size() < cursorResult.batchSize) {
1058                    if (cursorResult.resultSet.next()) {
1059                        ids.add(cursorResult.resultSet.getString(1));
1060                    } else {
1061                        cursorResult.close();
1062                        if (ids.isEmpty()) {
1063                            unregisterCursor(scrollId);
1064                        }
1065                        break;
1066                    }
1067                }
1068            } catch (SQLException e) {
1069                throw new NuxeoException("Error during scroll", e);
1070            }
1071        }
1072        return new ScrollResultImpl<>(scrollId, ids);
1073    }
1074
1075    @Override
1076    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
1077        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
1078        if (select == null) {
1079            return getAncestorsIdsIterative(ids);
1080        }
1081        Serializable whereIds = newIdArray(ids);
1082        Set<Serializable> res = new HashSet<>();
1083        if (logger.isLogEnabled()) {
1084            logger.logSQL(select.sql, Collections.singleton(whereIds));
1085        }
1086        Column what = select.whatColumns.get(0);
1087        try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1088            setToPreparedStatementIdArray(ps, 1, whereIds);
1089            try (ResultSet rs = ps.executeQuery()) {
1090                countExecute();
1091                List<Serializable> debugIds = null;
1092                if (logger.isLogEnabled()) {
1093                    debugIds = new LinkedList<>();
1094                }
1095                while (rs.next()) {
1096                    if (dialect.supportsArraysReturnInsteadOfRows()) {
1097                        Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
1098                        for (Serializable id : resultIds) {
1099                            if (id != null) {
1100                                res.add(id);
1101                                if (logger.isLogEnabled()) {
1102                                    debugIds.add(id);
1103                                }
1104                            }
1105                        }
1106                    } else {
1107                        Serializable id = what.getFromResultSet(rs, 1);
1108                        if (id != null) {
1109                            res.add(id);
1110                            if (logger.isLogEnabled()) {
1111                                debugIds.add(id);
1112                            }
1113                        }
1114                    }
1115                }
1116                if (logger.isLogEnabled()) {
1117                    logger.logIds(debugIds, false, 0);
1118                }
1119            }
1120            return res;
1121        } catch (SQLException e) {
1122            throw new NuxeoException("Failed to get ancestors ids", e);
1123        }
1124    }
1125
1126    /**
1127     * Uses iterative parentid selection.
1128     */
1129    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
1130        try {
1131            LinkedList<Serializable> todo = new LinkedList<>(ids);
1132            Set<Serializable> done = new HashSet<>();
1133            Set<Serializable> res = new HashSet<>();
1134            while (!todo.isEmpty()) {
1135                done.addAll(todo);
1136                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
1137                if (logger.isLogEnabled()) {
1138                    logger.logSQL(select.sql, todo);
1139                }
1140                Column what = select.whatColumns.get(0);
1141                Column where = select.whereColumns.get(0);
1142                try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1143                    int i = 1;
1144                    for (Serializable id : todo) {
1145                        where.setToPreparedStatement(ps, i++, id);
1146                    }
1147                    try (ResultSet rs = ps.executeQuery()) {
1148                        countExecute();
1149                        todo = new LinkedList<>();
1150                        List<Serializable> debugIds = null;
1151                        if (logger.isLogEnabled()) {
1152                            debugIds = new LinkedList<>();
1153                        }
1154                        while (rs.next()) {
1155                            Serializable id = what.getFromResultSet(rs, 1);
1156                            if (id != null) {
1157                                res.add(id);
1158                                if (!done.contains(id)) {
1159                                    todo.add(id);
1160                                }
1161                                if (logger.isLogEnabled()) {
1162                                    debugIds.add(id);
1163                                }
1164                            }
1165                        }
1166                        if (logger.isLogEnabled()) {
1167                            logger.logIds(debugIds, false, 0);
1168                        }
1169                    }
1170                }
1171            }
1172            return res;
1173        } catch (SQLException e) {
1174            throw new NuxeoException("Failed to get ancestors ids", e);
1175        }
1176    }
1177
1178    @Override
1179    public void updateReadAcls() {
1180        if (!dialect.supportsReadAcl()) {
1181            return;
1182        }
1183        if (log.isDebugEnabled()) {
1184            log.debug("updateReadAcls: updating");
1185        }
1186        try (Statement st = connection.createStatement()) {
1187            String sql = dialect.getUpdateReadAclsSql();
1188            if (logger.isLogEnabled()) {
1189                logger.log(sql);
1190            }
1191            st.execute(sql);
1192            countExecute();
1193        } catch (SQLException e) {
1194            checkConcurrentUpdate(e);
1195            throw new NuxeoException("Failed to update read acls", e);
1196        }
1197        if (log.isDebugEnabled()) {
1198            log.debug("updateReadAcls: done.");
1199        }
1200    }
1201
1202    @Override
1203    public void rebuildReadAcls() {
1204        if (!dialect.supportsReadAcl()) {
1205            return;
1206        }
1207        log.debug("rebuildReadAcls: rebuilding ...");
1208        try (Statement st = connection.createStatement()) {
1209            String sql = dialect.getRebuildReadAclsSql();
1210            logger.log(sql);
1211            st.execute(sql);
1212            countExecute();
1213        } catch (SQLException e) {
1214            throw new NuxeoException("Failed to rebuild read acls", e);
1215        }
1216        log.debug("rebuildReadAcls: done.");
1217    }
1218
1219    /*
1220     * ----- Locking -----
1221     */
1222
1223    @Override
1224    public Lock getLock(Serializable id) {
1225        if (log.isDebugEnabled()) {
1226            try {
1227                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1228            } catch (SQLException e) {
1229                throw new RuntimeException(e);
1230            }
1231        }
1232        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1233        Row row = readSimpleRow(rowId);
1234        return row == null ? null
1235                : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY));
1236    }
1237
1238    @Override
1239    public Lock setLock(final Serializable id, final Lock lock) {
1240        if (log.isDebugEnabled()) {
1241            log.debug("setLock " + id + " owner=" + lock.getOwner());
1242        }
1243        Lock oldLock = getLock(id);
1244        if (oldLock == null) {
1245            Row row = new Row(Model.LOCK_TABLE_NAME, id);
1246            row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1247            row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1248            insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1249        }
1250        return oldLock;
1251    }
1252
1253    @Override
1254    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1255        if (log.isDebugEnabled()) {
1256            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1257        }
1258        Lock oldLock = force ? null : getLock(id);
1259        if (!force && owner != null) {
1260            if (oldLock == null) {
1261                // not locked, nothing to do
1262                return null;
1263            }
1264            if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1265                // existing mismatched lock, flag failure
1266                return new Lock(oldLock, true);
1267            }
1268        }
1269        if (force || oldLock != null) {
1270            deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1271        }
1272        return oldLock;
1273    }
1274
1275    @Override
1276    public void markReferencedBinaries() {
1277        log.debug("Starting binaries GC mark");
1278        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
1279        String repositoryName = getRepositoryName();
1280        try (Statement st = connection.createStatement()) {
1281            int i = -1;
1282            for (String sql : sqlInfo.getBinariesSql) {
1283                i++;
1284                Column col = sqlInfo.getBinariesColumns.get(i);
1285                if (logger.isLogEnabled()) {
1286                    logger.log(sql);
1287                }
1288                try (ResultSet rs = st.executeQuery(sql)) {
1289                    countExecute();
1290                    int n = 0;
1291                    while (rs.next()) {
1292                        n++;
1293                        String key = (String) col.getFromResultSet(rs, 1);
1294                        if (key != null) {
1295                            blobManager.markReferencedBinary(key, repositoryName);
1296                        }
1297                    }
1298                    if (logger.isLogEnabled()) {
1299                        logger.logCount(n);
1300                    }
1301                }
1302            }
1303        } catch (SQLException e) {
1304            throw new RuntimeException("Failed to mark binaries for gC", e);
1305        }
1306        log.debug("End of binaries GC mark");
1307    }
1308
1309    /*
1310     * ----- XAResource -----
1311     */
1312
1313    protected static String systemToString(Object o) {
1314        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1315    }
1316
1317    @Override
1318    public void start(Xid xid, int flags) throws XAException {
1319        try {
1320            xaresource.start(xid, flags);
1321            if (logger.isLogEnabled()) {
1322                logger.log("XA start on " + systemToString(xid));
1323            }
1324        } catch (NuxeoException e) {
1325            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1326        } catch (XAException e) {
1327            logger.error("XA start error on " + systemToString(xid), e);
1328            throw e;
1329        }
1330    }
1331
1332    @Override
1333    public void end(Xid xid, int flags) throws XAException {
1334        try {
1335            xaresource.end(xid, flags);
1336            if (logger.isLogEnabled()) {
1337                logger.log("XA end on " + systemToString(xid));
1338            }
1339        } catch (NullPointerException e) {
1340            // H2 when no active transaction
1341            logger.error("XA end error on " + systemToString(xid), e);
1342            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1343        } catch (XAException e) {
1344            if (flags != XAResource.TMFAIL) {
1345                logger.error("XA end error on " + systemToString(xid), e);
1346            }
1347            throw e;
1348        }
1349    }
1350
1351    @Override
1352    public int prepare(Xid xid) throws XAException {
1353        try {
1354            return xaresource.prepare(xid);
1355        } catch (XAException e) {
1356            logger.error("XA prepare error on  " + systemToString(xid), e);
1357            throw e;
1358        }
1359    }
1360
1361    @Override
1362    public void commit(Xid xid, boolean onePhase) throws XAException {
1363        try {
1364            xaresource.commit(xid, onePhase);
1365        } catch (XAException e) {
1366            logger.error("XA commit error on  " + systemToString(xid), e);
1367            throw e;
1368        }
1369    }
1370
1371    // rollback interacts with caches so is in RowMapper
1372
1373    @Override
1374    public void forget(Xid xid) throws XAException {
1375        xaresource.forget(xid);
1376    }
1377
1378    @Override
1379    public Xid[] recover(int flag) throws XAException {
1380        return xaresource.recover(flag);
1381    }
1382
1383    @Override
1384    public boolean setTransactionTimeout(int seconds) throws XAException {
1385        return xaresource.setTransactionTimeout(seconds);
1386    }
1387
1388    @Override
1389    public int getTransactionTimeout() throws XAException {
1390        return xaresource.getTransactionTimeout();
1391    }
1392
1393    @Override
1394    public boolean isSameRM(XAResource xares) throws XAException {
1395        throw new UnsupportedOperationException();
1396    }
1397
1398    @Override
1399    public boolean isConnected() {
1400        return connection != null;
1401    }
1402
1403    @Override
1404    public void connect(boolean noSharing) {
1405        openConnections(noSharing);
1406    }
1407
1408    @Override
1409    public void disconnect() {
1410        closeConnections();
1411    }
1412
1413    /**
1414     * @since 7.10-HF25, 8.10-HF06, 9.2
1415     */
1416    @FunctionalInterface
1417    protected interface BiFunctionSQLException<T, U, R> {
1418
1419        /**
1420         * Applies this function to the given arguments.
1421         *
1422         * @param t the first function argument
1423         * @param u the second function argument
1424         * @return the function result
1425         */
1426        R apply(T t, U u) throws SQLException;
1427
1428    }
1429
1430}