001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023
024import java.io.File;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.PrintStream;
029import java.io.Serializable;
030import java.security.MessageDigest;
031import java.security.NoSuchAlgorithmException;
032import java.sql.Array;
033import java.sql.DatabaseMetaData;
034import java.sql.PreparedStatement;
035import java.sql.ResultSet;
036import java.sql.SQLDataException;
037import java.sql.SQLException;
038import java.sql.Statement;
039import java.sql.Types;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Calendar;
043import java.util.Collection;
044import java.util.Collections;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.Map.Entry;
051import java.util.Set;
052import java.util.UUID;
053import java.util.concurrent.ConcurrentHashMap;
054
055import javax.transaction.xa.XAException;
056import javax.transaction.xa.XAResource;
057import javax.transaction.xa.Xid;
058
059import org.apache.commons.logging.Log;
060import org.apache.commons.logging.LogFactory;
061import org.nuxeo.common.Environment;
062import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
063import org.nuxeo.ecm.core.api.IterableQueryResult;
064import org.nuxeo.ecm.core.api.Lock;
065import org.nuxeo.ecm.core.api.NuxeoException;
066import org.nuxeo.ecm.core.api.PartialList;
067import org.nuxeo.ecm.core.api.ScrollResult;
068import org.nuxeo.ecm.core.api.ScrollResultImpl;
069import org.nuxeo.ecm.core.blob.DocumentBlobManager;
070import org.nuxeo.ecm.core.model.LockManager;
071import org.nuxeo.ecm.core.query.QueryFilter;
072import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
073import org.nuxeo.ecm.core.storage.sql.ColumnType;
074import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
075import org.nuxeo.ecm.core.storage.sql.Invalidations;
076import org.nuxeo.ecm.core.storage.sql.Mapper;
077import org.nuxeo.ecm.core.storage.sql.Model;
078import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
079import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
080import org.nuxeo.ecm.core.storage.sql.Row;
081import org.nuxeo.ecm.core.storage.sql.RowId;
082import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
083import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
086import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
089import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
090import org.nuxeo.runtime.api.Framework;
091
092/**
093 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
094 * computes statements.
095 * <p>
096 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
097 * statements recorded in the {@link SQLInfo}.
098 */
099public class JDBCMapper extends JDBCRowMapper implements Mapper {
100
101    private static final Log log = LogFactory.getLog(JDBCMapper.class);
102
103    public static Map<String, Serializable> testProps = new HashMap<>();
104
105    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
106
107    public static final String TEST_UPGRADE = "testUpgrade";
108
109    // property in sql.txt file
110    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
111
112    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
113
114    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
115
116    public static final String TEST_UPGRADE_SYS_CHANGE_TOKEN = "testUpgradeSysChangeToken";
117
118    protected TableUpgrader tableUpgrader;
119
120    private final QueryMakerService queryMakerService;
121
122    private final PathResolver pathResolver;
123
124    private final RepositoryImpl repository;
125
126    protected boolean clusteringEnabled;
127
128    protected static final String NOSCROLL_ID = "noscroll";
129
130    /**
131     * Creates a new Mapper.
132     *
133     * @param model the model
134     * @param pathResolver the path resolver (used for startswith queries)
135     * @param sqlInfo the sql info
136     * @param clusterInvalidator the cluster invalidator
137     * @param repository the repository
138     */
139    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator,
140            RepositoryImpl repository) {
141        super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator());
142        this.pathResolver = pathResolver;
143        this.repository = repository;
144        clusteringEnabled = clusterInvalidator != null;
145        queryMakerService = Framework.getService(QueryMakerService.class);
146
147        tableUpgrader = new TableUpgrader(this);
148        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
149                TEST_UPGRADE_VERSIONS);
150        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
151        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
152        tableUpgrader.add(Model.HIER_TABLE_NAME, Model.MAIN_SYS_CHANGE_TOKEN_KEY, "upgradeSysChangeToken",
153                TEST_UPGRADE_SYS_CHANGE_TOKEN);
154    }
155
156    @Override
157    public int getTableSize(String tableName) {
158        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
159    }
160
161    /*
162     * ----- Root -----
163     */
164
165    @Override
166    public void createDatabase(String ddlMode) {
167        // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it
168        try {
169            if (!connection.getAutoCommit()) {
170                throw new NuxeoException("connection should not run in transactional mode for DDL operations");
171            }
172            createTables(ddlMode);
173        } catch (SQLException e) {
174            throw new NuxeoException(e);
175        }
176    }
177
178    protected String getTableName(String origName) {
179
180        if (dialect instanceof DialectOracle) {
181            if (origName.length() > 30) {
182
183                StringBuilder sb = new StringBuilder(origName.length());
184
185                try {
186                    MessageDigest digest = MessageDigest.getInstance("MD5");
187                    sb.append(origName.substring(0, 15));
188                    sb.append('_');
189
190                    digest.update(origName.getBytes());
191                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
192
193                    return sb.toString();
194
195                } catch (NoSuchAlgorithmException e) {
196                    throw new RuntimeException("Error", e);
197                }
198            }
199        }
200
201        return origName;
202    }
203
204    protected void createTables(String ddlMode) throws SQLException {
205        ListCollector ddlCollector = new ListCollector();
206
207        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
208        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
209        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
210        if (testProps.containsKey(TEST_UPGRADE)) {
211            // create "old" tables
212            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
213        }
214
215        String schemaName = dialect.getConnectionSchema(connection);
216        DatabaseMetaData metadata = connection.getMetaData();
217        Set<String> tableNames = findTableNames(metadata, schemaName);
218        Database database = sqlInfo.getDatabase();
219        Map<String, List<Column>> added = new HashMap<>();
220
221        for (Table table : database.getTables()) {
222            String tableName = getTableName(table.getPhysicalName());
223            if (!tableNames.contains(tableName.toUpperCase())) {
224
225                /*
226                 * Create missing table.
227                 */
228
229                ddlCollector.add(table.getCreateSql());
230                ddlCollector.addAll(table.getPostCreateSqls(model));
231
232                added.put(table.getKey(), null); // null = table created
233
234                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
235
236            } else {
237
238                /*
239                 * Get existing columns.
240                 */
241
242                Map<String, Integer> columnTypes = new HashMap<>();
243                Map<String, String> columnTypeNames = new HashMap<>();
244                Map<String, Integer> columnTypeSizes = new HashMap<>();
245                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
246                    while (rs.next()) {
247                        String schema = rs.getString("TABLE_SCHEM");
248                        if (schema != null) { // null for MySQL, doh!
249                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
250                                // H2 returns some system tables (locks)
251                                continue;
252                            }
253                        }
254                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
255                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
256                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
257                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
258                    }
259                }
260
261                /*
262                 * Update types and create missing columns.
263                 */
264
265                List<Column> addedColumns = new LinkedList<>();
266                for (Column column : table.getColumns()) {
267                    String upperName = column.getPhysicalName().toUpperCase();
268                    Integer type = columnTypes.remove(upperName);
269                    if (type == null) {
270                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
271                        ddlCollector.add(table.getAddColumnSql(column));
272                        ddlCollector.addAll(table.getPostAddSqls(column, model));
273                        addedColumns.add(column);
274                    } else {
275                        String actualName = columnTypeNames.get(upperName);
276                        Integer actualSize = columnTypeSizes.get(upperName);
277                        String message = column.checkJdbcType(type, actualName, actualSize);
278                        if (message != null) {
279                            log.error(message);
280                            Framework.getRuntime().getMessageHandler().addError(message);
281                        }
282                    }
283                }
284                for (String col : dialect.getIgnoredColumns(table)) {
285                    columnTypes.remove(col.toUpperCase());
286                }
287                if (!columnTypes.isEmpty()) {
288                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
289                            + String.join(", ", columnTypes.keySet()));
290                }
291                if (!addedColumns.isEmpty()) {
292                    if (added.containsKey(table.getKey())) {
293                        throw new AssertionError();
294                    }
295                    added.put(table.getKey(), addedColumns);
296                }
297            }
298        }
299
300        if (testProps.containsKey(TEST_UPGRADE)) {
301            // create "old" content in tables
302            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
303        }
304
305        // run upgrade for each table if added columns or test
306        for (Entry<String, List<Column>> en : added.entrySet()) {
307            List<Column> addedColumns = en.getValue();
308            String tableKey = en.getKey();
309            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
310        }
311
312        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
313        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
314
315        // aclr_permission check for PostgreSQL
316        dialect.performAdditionalStatements(connection);
317
318        /*
319         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
320         */
321
322        // ddlMode may be:
323        // ignore (not treated here, nothing done)
324        // dump (implies execute)
325        // dump,execute
326        // dump,ignore (no execute)
327        // execute
328        // abort (implies dump)
329        // compat can be used instead of execute to always recreate stored procedures
330
331        List<String> ddl = ddlCollector.getStrings();
332        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
333        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
334        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
335        if (dump || abort) {
336
337            /*
338             * Dump DDL if not empty.
339             */
340
341            if (!ddl.isEmpty()) {
342                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
343                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
344                    for (String sql : dialect.getDumpStart()) {
345                        ps.println(sql);
346                    }
347                    for (String sql : ddl) {
348                        sql = sql.trim();
349                        if (sql.endsWith(";")) {
350                            sql = sql.substring(0, sql.length() - 1);
351                        }
352                        ps.println(dialect.getSQLForDump(sql));
353                    }
354                    for (String sql : dialect.getDumpStop()) {
355                        ps.println(sql);
356                    }
357                } catch (IOException e) {
358                    throw new NuxeoException(e);
359                }
360
361                /*
362                 * Abort if requested.
363                 */
364
365                if (abort) {
366                    log.error("Dumped DDL to: " + dumpFile);
367                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
368                            + ", DDL must be executed: " + dumpFile);
369                }
370            }
371        }
372        if (!ignore) {
373
374            /*
375             * Execute DDL.
376             */
377
378            try (Statement st = connection.createStatement()) {
379                for (String sql : ddl) {
380                    logger.log(sql.replace("\n", "\n    ")); // indented
381                    try {
382                        st.execute(sql);
383                    } catch (SQLException e) {
384                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
385                    }
386                    countExecute();
387                }
388            }
389
390            /*
391             * Execute post-DDL stuff.
392             */
393
394            try (Statement st = connection.createStatement()) {
395                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
396                    logger.log(sql.replace("\n", "\n    ")); // indented
397                    try {
398                        st.execute(sql);
399                    } catch (SQLException e) {
400                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
401                    }
402                    countExecute();
403                }
404            }
405        }
406    }
407
408    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
409            throws SQLException {
410        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
411    }
412
413    /** Finds uppercase table names. */
414    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
415        Set<String> tableNames = new HashSet<>();
416        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
417        while (rs.next()) {
418            String tableName = rs.getString("TABLE_NAME");
419            tableNames.add(tableName.toUpperCase());
420        }
421        rs.close();
422        return tableNames;
423    }
424
425    @Override
426    public int getClusterNodeIdType() {
427        return sqlInfo.getClusterNodeIdType();
428    }
429
430    @Override
431    public void createClusterNode(Serializable nodeId) {
432        Calendar now = Calendar.getInstance();
433        String sql = sqlInfo.getCreateClusterNodeSql();
434        List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
435        try (PreparedStatement ps = connection.prepareStatement(sql)) {
436            if (logger.isLogEnabled()) {
437                logger.logSQL(sql, Arrays.asList(nodeId, now));
438            }
439            columns.get(0).setToPreparedStatement(ps, 1, nodeId);
440            columns.get(1).setToPreparedStatement(ps, 2, now);
441            ps.execute();
442
443        } catch (SQLException e) {
444            try {
445                checkConcurrentUpdate(e);
446            } catch (ConcurrentUpdateException cue) {
447                cue.addInfo(
448                        "Duplicate cluster node with id: " + nodeId
449                                + " (a crashed node must be cleaned up, or the repository.clustering.id configuration fixed)");
450                throw cue;
451            }
452            throw new NuxeoException(e);
453        }
454    }
455
456    @Override
457    public void removeClusterNode(Serializable nodeId) {
458        // delete from cluster_nodes
459        String sql = sqlInfo.getDeleteClusterNodeSql();
460        Column column = sqlInfo.getDeleteClusterNodeColumn();
461        try (PreparedStatement ps = connection.prepareStatement(sql)) {
462            if (logger.isLogEnabled()) {
463                logger.logSQL(sql, Collections.singletonList(nodeId));
464            }
465            column.setToPreparedStatement(ps, 1, nodeId);
466            ps.execute();
467            // delete un-processed invals from cluster_invals
468            deleteClusterInvals(nodeId);
469        } catch (SQLException e) {
470            throw new NuxeoException(e);
471        }
472    }
473
474    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
475        String sql = sqlInfo.getDeleteClusterInvalsSql();
476        Column column = sqlInfo.getDeleteClusterInvalsColumn();
477        try (PreparedStatement ps = connection.prepareStatement(sql)) {
478            if (logger.isLogEnabled()) {
479                logger.logSQL(sql, Collections.singletonList(nodeId));
480            }
481            column.setToPreparedStatement(ps, 1, nodeId);
482            int n = ps.executeUpdate();
483            countExecute();
484            if (logger.isLogEnabled()) {
485                logger.logCount(n);
486            }
487        }
488    }
489
490    @Override
491    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
492        String sql = dialect.getClusterInsertInvalidations();
493        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
494        try (PreparedStatement ps = connection.prepareStatement(sql)) {
495            int kind = Invalidations.MODIFIED;
496            while (true) {
497                Set<RowId> rowIds = invalidations.getKindSet(kind);
498
499                // reorganize by id
500                Map<Serializable, Set<String>> res = new HashMap<>();
501                for (RowId rowId : rowIds) {
502                    Set<String> tableNames = res.get(rowId.id);
503                    if (tableNames == null) {
504                        res.put(rowId.id, tableNames = new HashSet<>());
505                    }
506                    tableNames.add(rowId.tableName);
507                }
508
509                // do inserts
510                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
511                    Serializable id = en.getKey();
512                    String fragments = join(en.getValue(), ' ');
513                    if (logger.isLogEnabled()) {
514                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
515                    }
516                    Serializable frags;
517                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
518                        frags = fragments.split(" ");
519                    } else {
520                        frags = fragments;
521                    }
522                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
523                    columns.get(1).setToPreparedStatement(ps, 2, id);
524                    columns.get(2).setToPreparedStatement(ps, 3, frags);
525                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
526                    ps.execute();
527                    countExecute();
528                }
529                if (kind == Invalidations.MODIFIED) {
530                    kind = Invalidations.DELETED;
531                } else {
532                    break;
533                }
534            }
535        } catch (SQLException e) {
536            throw new NuxeoException("Could not invalidate", e);
537        }
538    }
539
540    // join that works on a set
541    protected static String join(Collection<String> strings, char sep) {
542        if (strings.isEmpty()) {
543            throw new RuntimeException();
544        }
545        if (strings.size() == 1) {
546            return strings.iterator().next();
547        }
548        int size = 0;
549        for (String word : strings) {
550            size += word.length() + 1;
551        }
552        StringBuilder buf = new StringBuilder(size);
553        for (String word : strings) {
554            buf.append(word);
555            buf.append(sep);
556        }
557        buf.setLength(size - 1);
558        return buf.toString();
559    }
560
561    @Override
562    public Invalidations getClusterInvalidations(Serializable nodeId) {
563        Invalidations invalidations = new Invalidations();
564        String sql = dialect.getClusterGetInvalidations();
565        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
566        if (logger.isLogEnabled()) {
567            logger.logSQL(sql, Collections.singletonList(nodeId));
568        }
569        try (PreparedStatement ps = connection.prepareStatement(sql)) {
570            setToPreparedStatement(ps, 1, nodeId);
571            try (ResultSet rs = ps.executeQuery()) {
572                countExecute();
573                while (rs.next()) {
574                    // first column ignored, it's the node id
575                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
576                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
577                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
578                    String[] fragments;
579                    if (dialect.supportsArrays() && frags instanceof String[]) {
580                        fragments = (String[]) frags;
581                    } else {
582                        fragments = ((String) frags).split(" ");
583                    }
584                    invalidations.add(id, fragments, kind);
585                }
586            }
587            if (logger.isLogEnabled()) {
588                // logCount(n);
589                logger.log("  -> " + invalidations);
590            }
591            if (dialect.isClusteringDeleteNeeded()) {
592                deleteClusterInvals(nodeId);
593            }
594            return invalidations;
595        } catch (SQLException e) {
596            throw new NuxeoException("Could not invalidate", e);
597        }
598    }
599
600    @Override
601    public Serializable getRootId(String repositoryId) {
602        String sql = sqlInfo.getSelectRootIdSql();
603        if (logger.isLogEnabled()) {
604            logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
605        }
606        try (PreparedStatement ps = connection.prepareStatement(sql)) {
607            ps.setString(1, repositoryId);
608            try (ResultSet rs = ps.executeQuery()) {
609                countExecute();
610                if (!rs.next()) {
611                    if (logger.isLogEnabled()) {
612                        logger.log("  -> (none)");
613                    }
614                    return null;
615                }
616                Column column = sqlInfo.getSelectRootIdWhatColumn();
617                Serializable id = column.getFromResultSet(rs, 1);
618                if (logger.isLogEnabled()) {
619                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
620                }
621                // check that we didn't get several rows
622                if (rs.next()) {
623                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
624                }
625                return id;
626            }
627        } catch (SQLException e) {
628            throw new NuxeoException("Could not select: " + sql, e);
629        }
630    }
631
632    @Override
633    public void setRootId(Serializable repositoryId, Serializable id) {
634        String sql = sqlInfo.getInsertRootIdSql();
635        try (PreparedStatement ps = connection.prepareStatement(sql)) {
636            List<Column> columns = sqlInfo.getInsertRootIdColumns();
637            List<Serializable> debugValues = null;
638            if (logger.isLogEnabled()) {
639                debugValues = new ArrayList<>(2);
640            }
641            int i = 0;
642            for (Column column : columns) {
643                i++;
644                String key = column.getKey();
645                Serializable v;
646                if (key.equals(Model.MAIN_KEY)) {
647                    v = id;
648                } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
649                    v = repositoryId;
650                } else {
651                    throw new RuntimeException(key);
652                }
653                column.setToPreparedStatement(ps, i, v);
654                if (debugValues != null) {
655                    debugValues.add(v);
656                }
657            }
658            if (debugValues != null) {
659                logger.logSQL(sql, debugValues);
660                debugValues.clear();
661            }
662            ps.execute();
663            countExecute();
664        } catch (SQLException e) {
665            throw new NuxeoException("Could not insert: " + sql, e);
666        }
667    }
668
669    protected QueryMaker findQueryMaker(String queryType) {
670        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
671            QueryMaker queryMaker;
672            try {
673                queryMaker = klass.newInstance();
674            } catch (ReflectiveOperationException e) {
675                throw new NuxeoException(e);
676            }
677            if (queryMaker.accepts(queryType)) {
678                return queryMaker;
679            }
680        }
681        return null;
682    }
683
684    protected void prepareUserReadAcls(QueryFilter queryFilter) {
685        String sql = dialect.getPrepareUserReadAclsSql();
686        Serializable principals = queryFilter.getPrincipals();
687        if (sql == null || principals == null) {
688            return;
689        }
690        if (!dialect.supportsArrays()) {
691            principals = String.join(Dialect.ARRAY_SEP, (String[]) principals);
692        }
693        try (PreparedStatement ps = connection.prepareStatement(sql)) {
694            if (logger.isLogEnabled()) {
695                logger.logSQL(sql, Collections.singleton(principals));
696            }
697            setToPreparedStatement(ps, 1, principals);
698            ps.execute();
699            countExecute();
700        } catch (SQLException e) {
701            throw new NuxeoException("Failed to prepare user read acl cache", e);
702        }
703    }
704
705    @Override
706    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
707            boolean countTotal) {
708        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
709    }
710
711    @Override
712    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
713        PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo,
714                (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1));
715
716        if (logger.isLogEnabled()) {
717            logger.logIds(result, countUpTo != 0, result.totalSize());
718        }
719
720        return result;
721    }
722
723    // queryFilter used for principals and permissions
724    @Override
725    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
726            boolean distinctDocuments, Object... params) {
727        if (dialect.needsPrepareUserReadAcls()) {
728            prepareUserReadAcls(queryFilter);
729        }
730        QueryMaker queryMaker = findQueryMaker(queryType);
731        if (queryMaker == null) {
732            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
733        }
734        query = computeDistinctDocuments(query, distinctDocuments);
735        try {
736            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
737        } catch (SQLException e) {
738            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
739        }
740    }
741
742    @Override
743    public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType,
744            QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) {
745        query = computeDistinctDocuments(query, distinctDocuments);
746        PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo,
747                (info, rs) -> info.mapMaker.makeMap(rs), params);
748
749        if (logger.isLogEnabled()) {
750            logger.logMaps(result, countUpTo != 0, result.totalSize());
751        }
752
753        return result;
754    }
755
756    protected String computeDistinctDocuments(String query, boolean distinctDocuments) {
757        if (distinctDocuments) {
758            String q = query.toLowerCase();
759            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
760                // Replace "select" by "select distinct", split at "select ".length() index
761                query = "SELECT DISTINCT " + query.substring(7);
762            }
763        }
764        return query;
765    }
766
767    protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter,
768            long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) {
769        if (dialect.needsPrepareUserReadAcls()) {
770            prepareUserReadAcls(queryFilter);
771        }
772        QueryMaker queryMaker = findQueryMaker(queryType);
773        if (queryMaker == null) {
774            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
775        }
776        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params);
777
778        if (q == null) {
779            logger.log("Query cannot return anything due to conflicting clauses");
780            return new PartialList<>(Collections.emptyList(), 0);
781        }
782        long limit = queryFilter.getLimit();
783        long offset = queryFilter.getOffset();
784
785        if (logger.isLogEnabled()) {
786            String sql = q.selectInfo.sql;
787            if (limit != 0) {
788                sql += " -- LIMIT " + limit + " OFFSET " + offset;
789            }
790            if (countUpTo != 0) {
791                sql += " -- COUNT TOTAL UP TO " + countUpTo;
792            }
793            logger.logSQL(sql, q.selectParams);
794        }
795
796        String sql = q.selectInfo.sql;
797
798        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
799            // full result set not needed for counting
800            sql = dialect.addPagingClause(sql, limit, offset);
801            limit = 0;
802            offset = 0;
803        } else if (countUpTo > 0 && dialect.supportsPaging()) {
804            // ask one more row
805            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
806        }
807
808        try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
809                ResultSet.CONCUR_READ_ONLY)) {
810            int i = 1;
811            for (Serializable object : q.selectParams) {
812                setToPreparedStatement(ps, i++, object);
813            }
814            try (ResultSet rs = ps.executeQuery()) {
815                countExecute();
816
817                // limit/offset
818                long totalSize = -1;
819                boolean available;
820                if ((limit == 0) || (offset == 0)) {
821                    available = rs.first();
822                    if (!available) {
823                        totalSize = 0;
824                    }
825                    if (limit == 0) {
826                        limit = -1; // infinite
827                    }
828                } else {
829                    available = rs.absolute((int) offset + 1);
830                }
831
832                List<T> projections = new LinkedList<>();
833                int rowNum = 0;
834                while (available && (limit != 0)) {
835                    try {
836                        T projection = extractor.apply(q.selectInfo, rs);
837                        projections.add(projection);
838                        rowNum = rs.getRow();
839                        available = rs.next();
840                        limit--;
841                    } catch (SQLDataException e) {
842                        // actually no data available, MariaDB Connector/J lied, stop now
843                        available = false;
844                    }
845                }
846
847                // total size
848                if (countUpTo != 0 && (totalSize == -1)) {
849                    if (!available && (rowNum != 0)) {
850                        // last row read was the actual last
851                        totalSize = rowNum;
852                    } else {
853                        // available if limit reached with some left
854                        // rowNum == 0 if skipped too far
855                        rs.last();
856                        totalSize = rs.getRow();
857                    }
858                    if (countUpTo > 0 && totalSize > countUpTo) {
859                        // the result where truncated we don't know the total size
860                        totalSize = -2;
861                    }
862                }
863
864                return new PartialList<>(projections, totalSize);
865            }
866        } catch (SQLException e) {
867            throw new NuxeoException("Invalid query: " + query, e);
868        }
869    }
870
871    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
872        if (object instanceof Calendar) {
873            dialect.setToPreparedStatementTimestamp(ps, i, object, null);
874        } else if (object instanceof java.sql.Date) {
875            ps.setDate(i, (java.sql.Date) object);
876        } else if (object instanceof Long) {
877            ps.setLong(i, ((Long) object).longValue());
878        } else if (object instanceof WrappedId) {
879            dialect.setId(ps, i, object.toString());
880        } else if (object instanceof Object[]) {
881            int jdbcType;
882            if (object instanceof String[]) {
883                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
884            } else if (object instanceof Boolean[]) {
885                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
886            } else if (object instanceof Long[]) {
887                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
888            } else if (object instanceof Double[]) {
889                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
890            } else if (object instanceof java.sql.Date[]) {
891                jdbcType = Types.DATE;
892            } else if (object instanceof java.sql.Clob[]) {
893                jdbcType = Types.CLOB;
894            } else if (object instanceof Calendar[]) {
895                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
896                object = dialect.getTimestampFromCalendar((Calendar[]) object);
897            } else if (object instanceof Integer[]) {
898                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
899            } else {
900                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
901            }
902            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
903            ps.setArray(i, array);
904        } else {
905            ps.setObject(i, object);
906        }
907        return i;
908    }
909
910    @Override
911    public ScrollResult<String> scroll(String query, int batchSize, int keepAliveSeconds) {
912        if (!dialect.supportsScroll()) {
913            return defaultScroll(query);
914        }
915        checkForTimedoutScroll();
916        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
917        return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds);
918    }
919
920    @Override
921    public ScrollResult<String> scroll(String query, QueryFilter queryFilter, int batchSize, int keepAliveSeconds) {
922        if (!dialect.supportsScroll()) {
923            return defaultScroll(query);
924        }
925        if (dialect.needsPrepareUserReadAcls()) {
926            prepareUserReadAcls(queryFilter);
927        }
928        checkForTimedoutScroll();
929        return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds);
930    }
931
932    protected void checkForTimedoutScroll() {
933        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
934    }
935
936    protected ScrollResult<String> scrollSearch(String query, QueryFilter queryFilter, int batchSize, int keepAliveSeconds) {
937        QueryMaker queryMaker = findQueryMaker("NXQL");
938        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
939        if (q == null) {
940            logger.log("Query cannot return anything due to conflicting clauses");
941            throw new NuxeoException("Query cannot return anything due to conflicting clauses");
942        }
943        if (logger.isLogEnabled()) {
944            logger.logSQL(q.selectInfo.sql, q.selectParams);
945        }
946        try {
947            if (connection.getAutoCommit()) {
948                throw new NuxeoException("Scroll should be done inside a transaction");
949            }
950            // ps MUST NOT be auto-closed because it's referenced by a cursor
951            PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY,
952                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
953            ps.setFetchSize(batchSize);
954            int i = 1;
955            for (Serializable object : q.selectParams) {
956                setToPreparedStatement(ps, i++, object);
957            }
958            // rs MUST NOT be auto-closed because it's referenced by a cursor
959            ResultSet rs = ps.executeQuery();
960            String scrollId = UUID.randomUUID().toString();
961            registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds);
962            return scroll(scrollId);
963        } catch (SQLException e) {
964            throw new NuxeoException("Error on query", e);
965        }
966    }
967
968    protected class CursorResult {
969        protected final int keepAliveSeconds;
970
971        protected final PreparedStatement preparedStatement;
972
973        protected final ResultSet resultSet;
974
975        protected final int batchSize;
976
977        protected long lastCallTimestamp;
978
979        CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) {
980            this.preparedStatement = preparedStatement;
981            this.resultSet = resultSet;
982            this.batchSize = batchSize;
983            this.keepAliveSeconds = keepAliveSeconds;
984            lastCallTimestamp = System.currentTimeMillis();
985        }
986
987        boolean timedOut(String scrollId) {
988            long now = System.currentTimeMillis();
989            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
990                if (unregisterCursor(scrollId)) {
991                    log.warn("Scroll " + scrollId + " timed out");
992                }
993                return true;
994            }
995            return false;
996        }
997
998        void touch() {
999            lastCallTimestamp = System.currentTimeMillis();
1000        }
1001
1002        synchronized void close() throws SQLException {
1003            if (resultSet != null) {
1004                resultSet.close();
1005            }
1006            if (preparedStatement != null) {
1007                preparedStatement.close();
1008            }
1009        }
1010    }
1011
1012    protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize,
1013            int keepAliveSeconds) {
1014        cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds));
1015    }
1016
1017    protected boolean unregisterCursor(String scrollId) {
1018        CursorResult cursor = cursorResults.remove(scrollId);
1019        if (cursor != null) {
1020            try {
1021                cursor.close();
1022                return true;
1023            } catch (SQLException e) {
1024                log.error("Failed to close cursor for scroll: " + scrollId, e);
1025                // do not propagate exception on cleaning
1026            }
1027        }
1028        return false;
1029    }
1030
1031    protected ScrollResult<String> defaultScroll(String query) {
1032        // the database has no proper support for cursor just return everything in one batch
1033        QueryMaker queryMaker = findQueryMaker("NXQL");
1034        List<String> ids;
1035        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
1036        try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) {
1037            ids = new ArrayList<>((int) ret.size());
1038            for (Map<String, Serializable> map : ret) {
1039                ids.add(map.get("ecm:uuid").toString());
1040            }
1041        } catch (SQLException e) {
1042            throw new NuxeoException("Invalid scroll query: " + query, e);
1043        }
1044        return new ScrollResultImpl<>(NOSCROLL_ID, ids);
1045    }
1046
1047    @Override
1048    public ScrollResult<String> scroll(String scrollId) {
1049        if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) {
1050            // there is only one batch in this case
1051            return emptyResult();
1052        }
1053        CursorResult cursorResult = cursorResults.get(scrollId);
1054        if (cursorResult == null) {
1055            throw new NuxeoException("Unknown or timed out scrollId");
1056        } else if (cursorResult.timedOut(scrollId)) {
1057            throw new NuxeoException("Timed out scrollId");
1058        }
1059        cursorResult.touch();
1060        List<String> ids = new ArrayList<>(cursorResult.batchSize);
1061        synchronized (cursorResult) {
1062            try {
1063                if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) {
1064                    unregisterCursor(scrollId);
1065                    return emptyResult();
1066                }
1067                while (ids.size() < cursorResult.batchSize) {
1068                    if (cursorResult.resultSet.next()) {
1069                        ids.add(cursorResult.resultSet.getString(1));
1070                    } else {
1071                        cursorResult.close();
1072                        if (ids.isEmpty()) {
1073                            unregisterCursor(scrollId);
1074                        }
1075                        break;
1076                    }
1077                }
1078            } catch (SQLException e) {
1079                throw new NuxeoException("Error during scroll", e);
1080            }
1081        }
1082        return new ScrollResultImpl<>(scrollId, ids);
1083    }
1084
1085    @Override
1086    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
1087        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
1088        if (select == null) {
1089            return getAncestorsIdsIterative(ids);
1090        }
1091        Serializable whereIds = newIdArray(ids);
1092        Set<Serializable> res = new HashSet<>();
1093        if (logger.isLogEnabled()) {
1094            logger.logSQL(select.sql, Collections.singleton(whereIds));
1095        }
1096        Column what = select.whatColumns.get(0);
1097        try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1098            setToPreparedStatementIdArray(ps, 1, whereIds);
1099            try (ResultSet rs = ps.executeQuery()) {
1100                countExecute();
1101                List<Serializable> debugIds = null;
1102                if (logger.isLogEnabled()) {
1103                    debugIds = new LinkedList<>();
1104                }
1105                while (rs.next()) {
1106                    if (dialect.supportsArraysReturnInsteadOfRows()) {
1107                        Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
1108                        for (Serializable id : resultIds) {
1109                            if (id != null) {
1110                                res.add(id);
1111                                if (logger.isLogEnabled()) {
1112                                    debugIds.add(id);
1113                                }
1114                            }
1115                        }
1116                    } else {
1117                        Serializable id = what.getFromResultSet(rs, 1);
1118                        if (id != null) {
1119                            res.add(id);
1120                            if (logger.isLogEnabled()) {
1121                                debugIds.add(id);
1122                            }
1123                        }
1124                    }
1125                }
1126                if (logger.isLogEnabled()) {
1127                    logger.logIds(debugIds, false, 0);
1128                }
1129            }
1130            return res;
1131        } catch (SQLException e) {
1132            throw new NuxeoException("Failed to get ancestors ids", e);
1133        }
1134    }
1135
1136    /**
1137     * Uses iterative parentid selection.
1138     */
1139    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
1140        try {
1141            LinkedList<Serializable> todo = new LinkedList<>(ids);
1142            Set<Serializable> done = new HashSet<>();
1143            Set<Serializable> res = new HashSet<>();
1144            while (!todo.isEmpty()) {
1145                done.addAll(todo);
1146                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
1147                if (logger.isLogEnabled()) {
1148                    logger.logSQL(select.sql, todo);
1149                }
1150                Column what = select.whatColumns.get(0);
1151                Column where = select.whereColumns.get(0);
1152                try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1153                    int i = 1;
1154                    for (Serializable id : todo) {
1155                        where.setToPreparedStatement(ps, i++, id);
1156                    }
1157                    try (ResultSet rs = ps.executeQuery()) {
1158                        countExecute();
1159                        todo = new LinkedList<>();
1160                        List<Serializable> debugIds = null;
1161                        if (logger.isLogEnabled()) {
1162                            debugIds = new LinkedList<>();
1163                        }
1164                        while (rs.next()) {
1165                            Serializable id = what.getFromResultSet(rs, 1);
1166                            if (id != null) {
1167                                res.add(id);
1168                                if (!done.contains(id)) {
1169                                    todo.add(id);
1170                                }
1171                                if (logger.isLogEnabled()) {
1172                                    debugIds.add(id); // NOSONAR
1173                                }
1174                            }
1175                        }
1176                        if (logger.isLogEnabled()) {
1177                            logger.logIds(debugIds, false, 0);
1178                        }
1179                    }
1180                }
1181            }
1182            return res;
1183        } catch (SQLException e) {
1184            throw new NuxeoException("Failed to get ancestors ids", e);
1185        }
1186    }
1187
1188    @Override
1189    public void updateReadAcls() {
1190        if (!dialect.supportsReadAcl()) {
1191            return;
1192        }
1193        if (log.isDebugEnabled()) {
1194            log.debug("updateReadAcls: updating");
1195        }
1196        try (Statement st = connection.createStatement()) {
1197            String sql = dialect.getUpdateReadAclsSql();
1198            if (logger.isLogEnabled()) {
1199                logger.log(sql);
1200            }
1201            st.execute(sql);
1202            countExecute();
1203        } catch (SQLException e) {
1204            checkConcurrentUpdate(e);
1205            throw new NuxeoException("Failed to update read acls", e);
1206        }
1207        if (log.isDebugEnabled()) {
1208            log.debug("updateReadAcls: done.");
1209        }
1210    }
1211
1212    @Override
1213    public void rebuildReadAcls() {
1214        if (!dialect.supportsReadAcl()) {
1215            return;
1216        }
1217        log.debug("rebuildReadAcls: rebuilding ...");
1218        try (Statement st = connection.createStatement()) {
1219            String sql = dialect.getRebuildReadAclsSql();
1220            logger.log(sql);
1221            st.execute(sql);
1222            countExecute();
1223        } catch (SQLException e) {
1224            throw new NuxeoException("Failed to rebuild read acls", e);
1225        }
1226        log.debug("rebuildReadAcls: done.");
1227    }
1228
1229    /*
1230     * ----- Locking -----
1231     */
1232
1233    @Override
1234    public Lock getLock(Serializable id) {
1235        if (log.isDebugEnabled()) {
1236            try {
1237                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1238            } catch (SQLException e) {
1239                throw new RuntimeException(e);
1240            }
1241        }
1242        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1243        Row row = readSimpleRow(rowId);
1244        return row == null ? null
1245                : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY));
1246    }
1247
1248    @Override
1249    public Lock setLock(final Serializable id, final Lock lock) {
1250        if (log.isDebugEnabled()) {
1251            log.debug("setLock " + id + " owner=" + lock.getOwner());
1252        }
1253        Lock oldLock = getLock(id);
1254        if (oldLock == null) {
1255            Row row = new Row(Model.LOCK_TABLE_NAME, id);
1256            row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1257            row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1258            insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1259        }
1260        return oldLock;
1261    }
1262
1263    @Override
1264    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1265        if (log.isDebugEnabled()) {
1266            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1267        }
1268        Lock oldLock = force ? null : getLock(id);
1269        if (!force && owner != null) {
1270            if (oldLock == null) {
1271                // not locked, nothing to do
1272                return null;
1273            }
1274            if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1275                // existing mismatched lock, flag failure
1276                return new Lock(oldLock, true);
1277            }
1278        }
1279        if (force || oldLock != null) {
1280            deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1281        }
1282        return oldLock;
1283    }
1284
1285    @Override
1286    public void markReferencedBinaries() {
1287        log.debug("Starting binaries GC mark");
1288        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
1289        String repositoryName = getRepositoryName();
1290        try (Statement st = connection.createStatement()) {
1291            int i = -1;
1292            for (String sql : sqlInfo.getBinariesSql) {
1293                i++;
1294                Column col = sqlInfo.getBinariesColumns.get(i);
1295                if (logger.isLogEnabled()) {
1296                    logger.log(sql);
1297                }
1298                try (ResultSet rs = st.executeQuery(sql)) {
1299                    countExecute();
1300                    int n = 0;
1301                    while (rs.next()) {
1302                        n++;
1303                        String key = (String) col.getFromResultSet(rs, 1);
1304                        if (key != null) {
1305                            blobManager.markReferencedBinary(key, repositoryName);
1306                        }
1307                    }
1308                    if (logger.isLogEnabled()) {
1309                        logger.logCount(n);
1310                    }
1311                }
1312            }
1313        } catch (SQLException e) {
1314            throw new RuntimeException("Failed to mark binaries for gC", e);
1315        }
1316        log.debug("End of binaries GC mark");
1317    }
1318
1319    /*
1320     * ----- XAResource -----
1321     */
1322
1323    protected static String systemToString(Object o) {
1324        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1325    }
1326
1327    @Override
1328    public void start(Xid xid, int flags) throws XAException {
1329        try {
1330            xaresource.start(xid, flags);
1331            if (logger.isLogEnabled()) {
1332                logger.log("XA start on " + systemToString(xid));
1333            }
1334        } catch (NuxeoException e) {
1335            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1336        } catch (XAException e) {
1337            logger.error("XA start error on " + systemToString(xid), e);
1338            throw e;
1339        }
1340    }
1341
1342    @Override
1343    public void end(Xid xid, int flags) throws XAException {
1344        try {
1345            xaresource.end(xid, flags);
1346            if (logger.isLogEnabled()) {
1347                logger.log("XA end on " + systemToString(xid));
1348            }
1349        } catch (NullPointerException e) {
1350            // H2 when no active transaction
1351            logger.error("XA end error on " + systemToString(xid), e);
1352            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1353        } catch (XAException e) {
1354            if (flags != XAResource.TMFAIL) {
1355                logger.error("XA end error on " + systemToString(xid), e);
1356            }
1357            throw e;
1358        }
1359    }
1360
1361    @Override
1362    public int prepare(Xid xid) throws XAException {
1363        try {
1364            return xaresource.prepare(xid);
1365        } catch (XAException e) {
1366            logger.error("XA prepare error on  " + systemToString(xid), e);
1367            throw e;
1368        }
1369    }
1370
1371    @Override
1372    public void commit(Xid xid, boolean onePhase) throws XAException {
1373        try {
1374            xaresource.commit(xid, onePhase);
1375        } catch (XAException e) {
1376            logger.error("XA commit error on  " + systemToString(xid), e);
1377            throw e;
1378        }
1379    }
1380
1381    // rollback interacts with caches so is in RowMapper
1382
1383    @Override
1384    public void forget(Xid xid) throws XAException {
1385        xaresource.forget(xid);
1386    }
1387
1388    @Override
1389    public Xid[] recover(int flag) throws XAException {
1390        return xaresource.recover(flag);
1391    }
1392
1393    @Override
1394    public boolean setTransactionTimeout(int seconds) throws XAException {
1395        return xaresource.setTransactionTimeout(seconds);
1396    }
1397
1398    @Override
1399    public int getTransactionTimeout() throws XAException {
1400        return xaresource.getTransactionTimeout();
1401    }
1402
1403    @Override
1404    public boolean isSameRM(XAResource xares) throws XAException {
1405        throw new UnsupportedOperationException();
1406    }
1407
1408    @Override
1409    public boolean isConnected() {
1410        return connection != null;
1411    }
1412
1413    @Override
1414    public void connect(boolean noSharing) {
1415        openConnections(noSharing);
1416    }
1417
1418    @Override
1419    public void disconnect() {
1420        closeConnections();
1421    }
1422
1423    /**
1424     * @since 7.10-HF25, 8.10-HF06, 9.2
1425     */
1426    @FunctionalInterface
1427    protected interface BiFunctionSQLException<T, U, R> {
1428
1429        /**
1430         * Applies this function to the given arguments.
1431         *
1432         * @param t the first function argument
1433         * @param u the second function argument
1434         * @return the function result
1435         */
1436        R apply(T t, U u) throws SQLException;
1437
1438    }
1439
1440}