001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
023
024import java.io.File;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.PrintStream;
029import java.io.Serializable;
030import java.security.MessageDigest;
031import java.security.NoSuchAlgorithmException;
032import java.sql.Array;
033import java.sql.DatabaseMetaData;
034import java.sql.PreparedStatement;
035import java.sql.ResultSet;
036import java.sql.SQLDataException;
037import java.sql.SQLException;
038import java.sql.Statement;
039import java.sql.Types;
040import java.util.ArrayList;
041import java.util.Arrays;
042import java.util.Calendar;
043import java.util.Collection;
044import java.util.Collections;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.Map.Entry;
051import java.util.Set;
052import java.util.UUID;
053import java.util.concurrent.ConcurrentHashMap;
054
055import javax.transaction.xa.XAException;
056import javax.transaction.xa.XAResource;
057import javax.transaction.xa.Xid;
058
059import org.apache.commons.logging.Log;
060import org.apache.commons.logging.LogFactory;
061import org.nuxeo.common.Environment;
062import org.nuxeo.ecm.core.api.IterableQueryResult;
063import org.nuxeo.ecm.core.api.Lock;
064import org.nuxeo.ecm.core.api.NuxeoException;
065import org.nuxeo.ecm.core.api.PartialList;
066import org.nuxeo.ecm.core.api.ScrollResult;
067import org.nuxeo.ecm.core.api.ScrollResultImpl;
068import org.nuxeo.ecm.core.blob.BlobManager;
069import org.nuxeo.ecm.core.model.LockManager;
070import org.nuxeo.ecm.core.query.QueryFilter;
071import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
072import org.nuxeo.ecm.core.storage.sql.ColumnType;
073import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
074import org.nuxeo.ecm.core.storage.sql.Invalidations;
075import org.nuxeo.ecm.core.storage.sql.Mapper;
076import org.nuxeo.ecm.core.storage.sql.Model;
077import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
078import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
079import org.nuxeo.ecm.core.storage.sql.Row;
080import org.nuxeo.ecm.core.storage.sql.RowId;
081import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
082import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
089import org.nuxeo.runtime.api.Framework;
090
091/**
092 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
093 * computes statements.
094 * <p>
095 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
096 * statements recorded in the {@link SQLInfo}.
097 */
098public class JDBCMapper extends JDBCRowMapper implements Mapper {
099
100    private static final Log log = LogFactory.getLog(JDBCMapper.class);
101
102    public static Map<String, Serializable> testProps = new HashMap<>();
103
104    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
105
106    public static final String TEST_UPGRADE = "testUpgrade";
107
108    // property in sql.txt file
109    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
110
111    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
112
113    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
114
115    protected TableUpgrader tableUpgrader;
116
117    private final QueryMakerService queryMakerService;
118
119    private final PathResolver pathResolver;
120
121    private final RepositoryImpl repository;
122
123    protected boolean clusteringEnabled;
124
125    protected static final String NOSCROLL_ID = "noscroll";
126
127    /**
128     * Creates a new Mapper.
129     *
130     * @param model the model
131     * @param pathResolver the path resolver (used for startswith queries)
132     * @param sqlInfo the sql info
133     * @param clusterInvalidator the cluster invalidator
134     * @param noSharing whether to use no-sharing mode for the connection
135     * @param repository the repository
136     */
137    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator,
138            RepositoryImpl repository) {
139        super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator());
140        this.pathResolver = pathResolver;
141        this.repository = repository;
142        clusteringEnabled = clusterInvalidator != null;
143        queryMakerService = Framework.getService(QueryMakerService.class);
144
145        tableUpgrader = new TableUpgrader(this);
146        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
147                TEST_UPGRADE_VERSIONS);
148        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
149        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
150
151    }
152
153    @Override
154    public int getTableSize(String tableName) {
155        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
156    }
157
158    /*
159     * ----- Root -----
160     */
161
162    @Override
163    public void createDatabase(String ddlMode) {
164        // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it
165        try {
166            if (connection.getAutoCommit() == false) {
167                throw new NuxeoException("connection should not run in transactional mode for DDL operations");
168            }
169            createTables(ddlMode);
170        } catch (SQLException e) {
171            throw new NuxeoException(e);
172        }
173    }
174
175    protected String getTableName(String origName) {
176
177        if (dialect instanceof DialectOracle) {
178            if (origName.length() > 30) {
179
180                StringBuilder sb = new StringBuilder(origName.length());
181
182                try {
183                    MessageDigest digest = MessageDigest.getInstance("MD5");
184                    sb.append(origName.substring(0, 15));
185                    sb.append('_');
186
187                    digest.update(origName.getBytes());
188                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
189
190                    return sb.toString();
191
192                } catch (NoSuchAlgorithmException e) {
193                    throw new RuntimeException("Error", e);
194                }
195            }
196        }
197
198        return origName;
199    }
200
201    protected void createTables(String ddlMode) throws SQLException {
202        ListCollector ddlCollector = new ListCollector();
203
204        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
205        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
206        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
207        if (testProps.containsKey(TEST_UPGRADE)) {
208            // create "old" tables
209            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
210        }
211
212        String schemaName = dialect.getConnectionSchema(connection);
213        DatabaseMetaData metadata = connection.getMetaData();
214        Set<String> tableNames = findTableNames(metadata, schemaName);
215        Database database = sqlInfo.getDatabase();
216        Map<String, List<Column>> added = new HashMap<>();
217
218        for (Table table : database.getTables()) {
219            String tableName = getTableName(table.getPhysicalName());
220            if (!tableNames.contains(tableName.toUpperCase())) {
221
222                /*
223                 * Create missing table.
224                 */
225
226                ddlCollector.add(table.getCreateSql());
227                ddlCollector.addAll(table.getPostCreateSqls(model));
228
229                added.put(table.getKey(), null); // null = table created
230
231                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
232
233            } else {
234
235                /*
236                 * Get existing columns.
237                 */
238
239                Map<String, Integer> columnTypes = new HashMap<>();
240                Map<String, String> columnTypeNames = new HashMap<>();
241                Map<String, Integer> columnTypeSizes = new HashMap<>();
242                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
243                    while (rs.next()) {
244                        String schema = rs.getString("TABLE_SCHEM");
245                        if (schema != null) { // null for MySQL, doh!
246                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
247                                // H2 returns some system tables (locks)
248                                continue;
249                            }
250                        }
251                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
252                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
253                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
254                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
255                    }
256                }
257
258                /*
259                 * Update types and create missing columns.
260                 */
261
262                List<Column> addedColumns = new LinkedList<>();
263                for (Column column : table.getColumns()) {
264                    String upperName = column.getPhysicalName().toUpperCase();
265                    Integer type = columnTypes.remove(upperName);
266                    if (type == null) {
267                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
268                        ddlCollector.add(table.getAddColumnSql(column));
269                        ddlCollector.addAll(table.getPostAddSqls(column, model));
270                        addedColumns.add(column);
271                    } else {
272                        int expected = column.getJdbcType();
273                        int actual = type.intValue();
274                        String actualName = columnTypeNames.get(upperName);
275                        Integer actualSize = columnTypeSizes.get(upperName);
276                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
277                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
278                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName,
279                                    actualSize));
280                        }
281                    }
282                }
283                for (String col : dialect.getIgnoredColumns(table)) {
284                    columnTypes.remove(col.toUpperCase());
285                }
286                if (!columnTypes.isEmpty()) {
287                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
288                            + String.join(", ", columnTypes.keySet()));
289                }
290                if (!addedColumns.isEmpty()) {
291                    if (added.containsKey(table.getKey())) {
292                        throw new AssertionError();
293                    }
294                    added.put(table.getKey(), addedColumns);
295                }
296            }
297        }
298
299        if (testProps.containsKey(TEST_UPGRADE)) {
300            // create "old" content in tables
301            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
302        }
303
304        // run upgrade for each table if added columns or test
305        for (Entry<String, List<Column>> en : added.entrySet()) {
306            List<Column> addedColumns = en.getValue();
307            String tableKey = en.getKey();
308            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
309        }
310
311        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
312        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
313
314        // aclr_permission check for PostgreSQL
315        dialect.performAdditionalStatements(connection);
316
317        /*
318         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
319         */
320
321        // ddlMode may be:
322        // ignore (not treated here, nothing done)
323        // dump (implies execute)
324        // dump,execute
325        // dump,ignore (no execute)
326        // execute
327        // abort (implies dump)
328        // compat can be used instead of execute to always recreate stored procedures
329
330        List<String> ddl = ddlCollector.getStrings();
331        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
332        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
333        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
334        if (dump || abort) {
335
336            /*
337             * Dump DDL if not empty.
338             */
339
340            if (!ddl.isEmpty()) {
341                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
342                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
343                    for (String sql : dialect.getDumpStart()) {
344                        ps.println(sql);
345                    }
346                    for (String sql : ddl) {
347                        sql = sql.trim();
348                        if (sql.endsWith(";")) {
349                            sql = sql.substring(0, sql.length() - 1);
350                        }
351                        ps.println(dialect.getSQLForDump(sql));
352                    }
353                    for (String sql : dialect.getDumpStop()) {
354                        ps.println(sql);
355                    }
356                } catch (IOException e) {
357                    throw new NuxeoException(e);
358                }
359
360                /*
361                 * Abort if requested.
362                 */
363
364                if (abort) {
365                    log.error("Dumped DDL to: " + dumpFile);
366                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
367                            + ", DDL must be executed: " + dumpFile);
368                }
369            }
370        }
371        if (!ignore) {
372
373            /*
374             * Execute DDL.
375             */
376
377            try (Statement st = connection.createStatement()) {
378                for (String sql : ddl) {
379                    logger.log(sql.replace("\n", "\n    ")); // indented
380                    try {
381                        st.execute(sql);
382                    } catch (SQLException e) {
383                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
384                    }
385                    countExecute();
386                }
387            }
388
389            /*
390             * Execute post-DDL stuff.
391             */
392
393            try (Statement st = connection.createStatement()) {
394                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
395                    logger.log(sql.replace("\n", "\n    ")); // indented
396                    try {
397                        st.execute(sql);
398                    } catch (SQLException e) {
399                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
400                    }
401                    countExecute();
402                }
403            }
404        }
405    }
406
407    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
408            throws SQLException {
409        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
410    }
411
412    /** Finds uppercase table names. */
413    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
414        Set<String> tableNames = new HashSet<>();
415        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
416        while (rs.next()) {
417            String tableName = rs.getString("TABLE_NAME");
418            tableNames.add(tableName.toUpperCase());
419        }
420        rs.close();
421        return tableNames;
422    }
423
424    @Override
425    public int getClusterNodeIdType() {
426        return sqlInfo.getClusterNodeIdType();
427    }
428
429    @Override
430    public void createClusterNode(Serializable nodeId) {
431        Calendar now = Calendar.getInstance();
432        String sql = sqlInfo.getCreateClusterNodeSql();
433        List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
434        try (PreparedStatement ps = connection.prepareStatement(sql)) {
435            if (logger.isLogEnabled()) {
436                logger.logSQL(sql, Arrays.asList(nodeId, now));
437            }
438            columns.get(0).setToPreparedStatement(ps, 1, nodeId);
439            columns.get(1).setToPreparedStatement(ps, 2, now);
440            ps.execute();
441
442        } catch (SQLException e) {
443            throw new NuxeoException(e);
444        }
445    }
446
447    @Override
448    public void removeClusterNode(Serializable nodeId) {
449        // delete from cluster_nodes
450        String sql = sqlInfo.getDeleteClusterNodeSql();
451        Column column = sqlInfo.getDeleteClusterNodeColumn();
452        try (PreparedStatement ps = connection.prepareStatement(sql)) {
453            if (logger.isLogEnabled()) {
454                logger.logSQL(sql, Collections.singletonList(nodeId));
455            }
456            column.setToPreparedStatement(ps, 1, nodeId);
457            ps.execute();
458            // delete un-processed invals from cluster_invals
459            deleteClusterInvals(nodeId);
460        } catch (SQLException e) {
461            throw new NuxeoException(e);
462        }
463    }
464
465    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
466        String sql = sqlInfo.getDeleteClusterInvalsSql();
467        Column column = sqlInfo.getDeleteClusterInvalsColumn();
468        try (PreparedStatement ps = connection.prepareStatement(sql)) {
469            if (logger.isLogEnabled()) {
470                logger.logSQL(sql, Collections.singletonList(nodeId));
471            }
472            column.setToPreparedStatement(ps, 1, nodeId);
473            int n = ps.executeUpdate();
474            countExecute();
475            if (logger.isLogEnabled()) {
476                logger.logCount(n);
477            }
478        }
479    }
480
481    @Override
482    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
483        String sql = dialect.getClusterInsertInvalidations();
484        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
485        try (PreparedStatement ps = connection.prepareStatement(sql)) {
486            int kind = Invalidations.MODIFIED;
487            while (true) {
488                Set<RowId> rowIds = invalidations.getKindSet(kind);
489
490                // reorganize by id
491                Map<Serializable, Set<String>> res = new HashMap<>();
492                for (RowId rowId : rowIds) {
493                    Set<String> tableNames = res.get(rowId.id);
494                    if (tableNames == null) {
495                        res.put(rowId.id, tableNames = new HashSet<>());
496                    }
497                    tableNames.add(rowId.tableName);
498                }
499
500                // do inserts
501                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
502                    Serializable id = en.getKey();
503                    String fragments = join(en.getValue(), ' ');
504                    if (logger.isLogEnabled()) {
505                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
506                    }
507                    Serializable frags;
508                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
509                        frags = fragments.split(" ");
510                    } else {
511                        frags = fragments;
512                    }
513                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
514                    columns.get(1).setToPreparedStatement(ps, 2, id);
515                    columns.get(2).setToPreparedStatement(ps, 3, frags);
516                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
517                    ps.execute();
518                    countExecute();
519                }
520                if (kind == Invalidations.MODIFIED) {
521                    kind = Invalidations.DELETED;
522                } else {
523                    break;
524                }
525            }
526        } catch (SQLException e) {
527            throw new NuxeoException("Could not invalidate", e);
528        }
529    }
530
531    // join that works on a set
532    protected static String join(Collection<String> strings, char sep) {
533        if (strings.isEmpty()) {
534            throw new RuntimeException();
535        }
536        if (strings.size() == 1) {
537            return strings.iterator().next();
538        }
539        int size = 0;
540        for (String word : strings) {
541            size += word.length() + 1;
542        }
543        StringBuilder buf = new StringBuilder(size);
544        for (String word : strings) {
545            buf.append(word);
546            buf.append(sep);
547        }
548        buf.setLength(size - 1);
549        return buf.toString();
550    }
551
552    @Override
553    public Invalidations getClusterInvalidations(Serializable nodeId) {
554        Invalidations invalidations = new Invalidations();
555        String sql = dialect.getClusterGetInvalidations();
556        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
557        if (logger.isLogEnabled()) {
558            logger.logSQL(sql, Collections.singletonList(nodeId));
559        }
560        try (PreparedStatement ps = connection.prepareStatement(sql)) {
561            setToPreparedStatement(ps, 1, nodeId);
562            try (ResultSet rs = ps.executeQuery()) {
563                countExecute();
564                while (rs.next()) {
565                    // first column ignored, it's the node id
566                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
567                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
568                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
569                    String[] fragments;
570                    if (dialect.supportsArrays() && frags instanceof String[]) {
571                        fragments = (String[]) frags;
572                    } else {
573                        fragments = ((String) frags).split(" ");
574                    }
575                    invalidations.add(id, fragments, kind);
576                }
577            }
578            if (logger.isLogEnabled()) {
579                // logCount(n);
580                logger.log("  -> " + invalidations);
581            }
582            if (dialect.isClusteringDeleteNeeded()) {
583                deleteClusterInvals(nodeId);
584            }
585            return invalidations;
586        } catch (SQLException e) {
587            throw new NuxeoException("Could not invalidate", e);
588        }
589    }
590
591    @Override
592    public Serializable getRootId(String repositoryId) {
593        String sql = sqlInfo.getSelectRootIdSql();
594        if (logger.isLogEnabled()) {
595            logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
596        }
597        try (PreparedStatement ps = connection.prepareStatement(sql)) {
598            ps.setString(1, repositoryId);
599            try (ResultSet rs = ps.executeQuery()) {
600                countExecute();
601                if (!rs.next()) {
602                    if (logger.isLogEnabled()) {
603                        logger.log("  -> (none)");
604                    }
605                    return null;
606                }
607                Column column = sqlInfo.getSelectRootIdWhatColumn();
608                Serializable id = column.getFromResultSet(rs, 1);
609                if (logger.isLogEnabled()) {
610                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
611                }
612                // check that we didn't get several rows
613                if (rs.next()) {
614                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
615                }
616                return id;
617            }
618        } catch (SQLException e) {
619            throw new NuxeoException("Could not select: " + sql, e);
620        }
621    }
622
623    @Override
624    public void setRootId(Serializable repositoryId, Serializable id) {
625        String sql = sqlInfo.getInsertRootIdSql();
626        try (PreparedStatement ps = connection.prepareStatement(sql)) {
627            List<Column> columns = sqlInfo.getInsertRootIdColumns();
628            List<Serializable> debugValues = null;
629            if (logger.isLogEnabled()) {
630                debugValues = new ArrayList<>(2);
631            }
632            int i = 0;
633            for (Column column : columns) {
634                i++;
635                String key = column.getKey();
636                Serializable v;
637                if (key.equals(Model.MAIN_KEY)) {
638                    v = id;
639                } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
640                    v = repositoryId;
641                } else {
642                    throw new RuntimeException(key);
643                }
644                column.setToPreparedStatement(ps, i, v);
645                if (debugValues != null) {
646                    debugValues.add(v);
647                }
648            }
649            if (debugValues != null) {
650                logger.logSQL(sql, debugValues);
651                debugValues.clear();
652            }
653            ps.execute();
654            countExecute();
655        } catch (SQLException e) {
656            throw new NuxeoException("Could not insert: " + sql, e);
657        }
658    }
659
660    protected QueryMaker findQueryMaker(String queryType) {
661        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
662            QueryMaker queryMaker;
663            try {
664                queryMaker = klass.newInstance();
665            } catch (ReflectiveOperationException e) {
666                throw new NuxeoException(e);
667            }
668            if (queryMaker.accepts(queryType)) {
669                return queryMaker;
670            }
671        }
672        return null;
673    }
674
675    protected void prepareUserReadAcls(QueryFilter queryFilter) {
676        String sql = dialect.getPrepareUserReadAclsSql();
677        Serializable principals = queryFilter.getPrincipals();
678        if (sql == null || principals == null) {
679            return;
680        }
681        if (!dialect.supportsArrays()) {
682            principals = String.join(Dialect.ARRAY_SEP, (String[]) principals);
683        }
684        try (PreparedStatement ps = connection.prepareStatement(sql)) {
685            if (logger.isLogEnabled()) {
686                logger.logSQL(sql, Collections.singleton(principals));
687            }
688            setToPreparedStatement(ps, 1, principals);
689            ps.execute();
690            countExecute();
691        } catch (SQLException e) {
692            throw new NuxeoException("Failed to prepare user read acl cache", e);
693        }
694    }
695
696    @Override
697    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
698            boolean countTotal) {
699        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
700    }
701
702    @Override
703    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
704        if (dialect.needsPrepareUserReadAcls()) {
705            prepareUserReadAcls(queryFilter);
706        }
707        QueryMaker queryMaker = findQueryMaker(queryType);
708        if (queryMaker == null) {
709            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
710        }
711        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
712
713        if (q == null) {
714            logger.log("Query cannot return anything due to conflicting clauses");
715            return new PartialList<>(Collections.<Serializable> emptyList(), 0);
716        }
717        long limit = queryFilter.getLimit();
718        long offset = queryFilter.getOffset();
719
720        if (logger.isLogEnabled()) {
721            String sql = q.selectInfo.sql;
722            if (limit != 0) {
723                sql += " -- LIMIT " + limit + " OFFSET " + offset;
724            }
725            if (countUpTo != 0) {
726                sql += " -- COUNT TOTAL UP TO " + countUpTo;
727            }
728            logger.logSQL(sql, q.selectParams);
729        }
730
731        String sql = q.selectInfo.sql;
732
733        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
734            // full result set not needed for counting
735            sql = dialect.addPagingClause(sql, limit, offset);
736            limit = 0;
737            offset = 0;
738        } else if (countUpTo > 0 && dialect.supportsPaging()) {
739            // ask one more row
740            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
741        }
742
743        try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
744                ResultSet.CONCUR_READ_ONLY)) {
745            int i = 1;
746            for (Serializable object : q.selectParams) {
747                setToPreparedStatement(ps, i++, object);
748            }
749            try (ResultSet rs = ps.executeQuery()) {
750                countExecute();
751
752                // limit/offset
753                long totalSize = -1;
754                boolean available;
755                if ((limit == 0) || (offset == 0)) {
756                    available = rs.first();
757                    if (!available) {
758                        totalSize = 0;
759                    }
760                    if (limit == 0) {
761                        limit = -1; // infinite
762                    }
763                } else {
764                    available = rs.absolute((int) offset + 1);
765                }
766
767                Column column = q.selectInfo.whatColumns.get(0);
768                List<Serializable> ids = new LinkedList<>();
769                int rowNum = 0;
770                while (available && (limit != 0)) {
771                    Serializable id;
772                    try {
773                        id = column.getFromResultSet(rs, 1);
774                    } catch (SQLDataException e) {
775                        // actually no data available, MariaDB Connector/J lied, stop now
776                        available = false;
777                        break;
778                    }
779                    ids.add(id);
780                    rowNum = rs.getRow();
781                    available = rs.next();
782                    limit--;
783                }
784
785                // total size
786                if (countUpTo != 0 && (totalSize == -1)) {
787                    if (!available && (rowNum != 0)) {
788                        // last row read was the actual last
789                        totalSize = rowNum;
790                    } else {
791                        // available if limit reached with some left
792                        // rowNum == 0 if skipped too far
793                        rs.last();
794                        totalSize = rs.getRow();
795                    }
796                    if (countUpTo > 0 && totalSize > countUpTo) {
797                        // the result where truncated we don't know the total size
798                        totalSize = -2;
799                    }
800                }
801
802                if (logger.isLogEnabled()) {
803                    logger.logIds(ids, countUpTo != 0, totalSize);
804                }
805
806                return new PartialList<>(ids, totalSize);
807            }
808        } catch (SQLException e) {
809            throw new NuxeoException("Invalid query: " + query, e);
810        }
811    }
812
813    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
814        if (object instanceof Calendar) {
815            Calendar cal = (Calendar) object;
816            ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal);
817        } else if (object instanceof java.sql.Date) {
818            ps.setDate(i, (java.sql.Date) object);
819        } else if (object instanceof Long) {
820            ps.setLong(i, ((Long) object).longValue());
821        } else if (object instanceof WrappedId) {
822            dialect.setId(ps, i, object.toString());
823        } else if (object instanceof Object[]) {
824            int jdbcType;
825            if (object instanceof String[]) {
826                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
827            } else if (object instanceof Boolean[]) {
828                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
829            } else if (object instanceof Long[]) {
830                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
831            } else if (object instanceof Double[]) {
832                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
833            } else if (object instanceof java.sql.Date[]) {
834                jdbcType = Types.DATE;
835            } else if (object instanceof java.sql.Clob[]) {
836                jdbcType = Types.CLOB;
837            } else if (object instanceof Calendar[]) {
838                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
839                object = dialect.getTimestampFromCalendar((Calendar) object);
840            } else if (object instanceof Integer[]) {
841                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
842            } else {
843                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
844            }
845            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
846            ps.setArray(i, array);
847        } else {
848            ps.setObject(i, object);
849        }
850        return i;
851    }
852
853    // queryFilter used for principals and permissions
854    @Override
855    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
856            boolean distinctDocuments, Object... params) {
857        if (dialect.needsPrepareUserReadAcls()) {
858            prepareUserReadAcls(queryFilter);
859        }
860        QueryMaker queryMaker = findQueryMaker(queryType);
861        if (queryMaker == null) {
862            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
863        }
864        if (distinctDocuments) {
865            String q = query.toLowerCase();
866            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
867                query = "SELECT DISTINCT " + query.substring("SELECT ".length());
868            }
869        }
870        try {
871            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
872        } catch (SQLException e) {
873            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
874        }
875    }
876
877    @Override
878    public ScrollResult scroll(String query, int batchSize, int keepAliveSeconds) {
879        if (!dialect.supportsScroll()) {
880            return defaultScroll(query);
881        }
882        checkForTimedoutScroll();
883        return scrollSearch(query, batchSize, keepAliveSeconds);
884    }
885
886    protected void checkForTimedoutScroll() {
887        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
888    }
889
890    protected ScrollResult scrollSearch(String query, int batchSize, int keepAliveSeconds) {
891        QueryMaker queryMaker = findQueryMaker("NXQL");
892        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
893        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, null);
894        if (q == null) {
895            logger.log("Query cannot return anything due to conflicting clauses");
896            throw new NuxeoException("Query cannot return anything due to conflicting clauses");
897        }
898        if (logger.isLogEnabled()) {
899            logger.logSQL(q.selectInfo.sql, q.selectParams);
900        }
901        try {
902            if (connection.getAutoCommit()) {
903                throw new NuxeoException("Scroll should be done inside a transaction");
904            }
905            // ps MUST NOT be auto-closed because it's referenced by a cursor
906            PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY,
907                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
908            ps.setFetchSize(batchSize);
909            int i = 1;
910            for (Serializable object : q.selectParams) {
911                setToPreparedStatement(ps, i++, object);
912            }
913            // rs MUST NOT be auto-closed because it's referenced by a cursor
914            ResultSet rs = ps.executeQuery();
915            String scrollId = UUID.randomUUID().toString();
916            registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds);
917            return scroll(scrollId);
918        } catch (SQLException e) {
919            throw new NuxeoException("Error on query", e);
920        }
921    }
922
923    protected class CursorResult {
924        protected final int keepAliveSeconds;
925
926        protected final PreparedStatement preparedStatement;
927
928        protected final ResultSet resultSet;
929
930        protected final int batchSize;
931
932        protected long lastCallTimestamp;
933
934        CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) {
935            this.preparedStatement = preparedStatement;
936            this.resultSet = resultSet;
937            this.batchSize = batchSize;
938            this.keepAliveSeconds = keepAliveSeconds;
939            lastCallTimestamp = System.currentTimeMillis();
940        }
941
942        boolean timedOut(String scrollId) {
943            long now = System.currentTimeMillis();
944            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
945                if (unregisterCursor(scrollId)) {
946                    log.warn("Scroll " + scrollId + " timed out");
947                }
948                return true;
949            }
950            return false;
951        }
952
953        void touch() {
954            lastCallTimestamp = System.currentTimeMillis();
955        }
956
957        synchronized void close() throws SQLException {
958            if (resultSet != null) {
959                resultSet.close();
960            }
961            if (preparedStatement != null) {
962                preparedStatement.close();
963            }
964        }
965    }
966
967    protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize,
968            int keepAliveSeconds) {
969        cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds));
970    }
971
972    protected boolean unregisterCursor(String scrollId) {
973        CursorResult cursor = cursorResults.remove(scrollId);
974        if (cursor != null) {
975            try {
976                cursor.close();
977                return true;
978            } catch (SQLException e) {
979                log.error("Failed to close cursor for scroll: " + scrollId, e);
980                // do not propagate exception on cleaning
981            }
982        }
983        return false;
984    }
985
986    protected ScrollResult defaultScroll(String query) {
987        // the database has no proper support for cursor just return everything in one batch
988        QueryMaker queryMaker = findQueryMaker("NXQL");
989        List<String> ids;
990        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
991        try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this,
992                null)) {
993            ids = new ArrayList<>((int) ret.size());
994            for (Map<String, Serializable> map : ret) {
995                ids.add(map.get("ecm:uuid").toString());
996            }
997        } catch (SQLException e) {
998            throw new NuxeoException("Invalid scroll query: " + query, e);
999        }
1000        return new ScrollResultImpl(NOSCROLL_ID, ids);
1001    }
1002
1003    @Override
1004    public ScrollResult scroll(String scrollId) {
1005        if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) {
1006            // there is only one batch in this case
1007            return emptyResult();
1008        }
1009        CursorResult cursorResult = cursorResults.get(scrollId);
1010        if (cursorResult == null) {
1011            throw new NuxeoException("Unknown or timed out scrollId");
1012        } else if (cursorResult.timedOut(scrollId)) {
1013            throw new NuxeoException("Timed out scrollId");
1014        }
1015        cursorResult.touch();
1016        List<String> ids = new ArrayList<>(cursorResult.batchSize);
1017        synchronized (cursorResult) {
1018            try {
1019                if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) {
1020                    unregisterCursor(scrollId);
1021                    return emptyResult();
1022                }
1023                while (ids.size() < cursorResult.batchSize) {
1024                    if (cursorResult.resultSet.next()) {
1025                        ids.add(cursorResult.resultSet.getString(1));
1026                    } else {
1027                        cursorResult.close();
1028                        if (ids.isEmpty()) {
1029                            unregisterCursor(scrollId);
1030                        }
1031                        break;
1032                    }
1033                }
1034            } catch (SQLException e) {
1035                throw new NuxeoException("Error during scroll", e);
1036            }
1037        }
1038        return new ScrollResultImpl(scrollId, ids);
1039    }
1040
1041    @Override
1042    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
1043        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
1044        if (select == null) {
1045            return getAncestorsIdsIterative(ids);
1046        }
1047        Serializable whereIds = newIdArray(ids);
1048        Set<Serializable> res = new HashSet<>();
1049        if (logger.isLogEnabled()) {
1050            logger.logSQL(select.sql, Collections.singleton(whereIds));
1051        }
1052        Column what = select.whatColumns.get(0);
1053        try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1054            setToPreparedStatementIdArray(ps, 1, whereIds);
1055            try (ResultSet rs = ps.executeQuery()) {
1056                countExecute();
1057                List<Serializable> debugIds = null;
1058                if (logger.isLogEnabled()) {
1059                    debugIds = new LinkedList<>();
1060                }
1061                while (rs.next()) {
1062                    if (dialect.supportsArraysReturnInsteadOfRows()) {
1063                        Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
1064                        for (Serializable id : resultIds) {
1065                            if (id != null) {
1066                                res.add(id);
1067                                if (logger.isLogEnabled()) {
1068                                    debugIds.add(id);
1069                                }
1070                            }
1071                        }
1072                    } else {
1073                        Serializable id = what.getFromResultSet(rs, 1);
1074                        if (id != null) {
1075                            res.add(id);
1076                            if (logger.isLogEnabled()) {
1077                                debugIds.add(id);
1078                            }
1079                        }
1080                    }
1081                }
1082                if (logger.isLogEnabled()) {
1083                    logger.logIds(debugIds, false, 0);
1084                }
1085            }
1086            return res;
1087        } catch (SQLException e) {
1088            throw new NuxeoException("Failed to get ancestors ids", e);
1089        }
1090    }
1091
1092    /**
1093     * Uses iterative parentid selection.
1094     */
1095    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
1096        try {
1097            LinkedList<Serializable> todo = new LinkedList<>(ids);
1098            Set<Serializable> done = new HashSet<>();
1099            Set<Serializable> res = new HashSet<>();
1100            while (!todo.isEmpty()) {
1101                done.addAll(todo);
1102                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
1103                if (logger.isLogEnabled()) {
1104                    logger.logSQL(select.sql, todo);
1105                }
1106                Column what = select.whatColumns.get(0);
1107                Column where = select.whereColumns.get(0);
1108                try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
1109                    int i = 1;
1110                    for (Serializable id : todo) {
1111                        where.setToPreparedStatement(ps, i++, id);
1112                    }
1113                    try (ResultSet rs = ps.executeQuery()) {
1114                        countExecute();
1115                        todo = new LinkedList<>();
1116                        List<Serializable> debugIds = null;
1117                        if (logger.isLogEnabled()) {
1118                            debugIds = new LinkedList<>();
1119                        }
1120                        while (rs.next()) {
1121                            Serializable id = what.getFromResultSet(rs, 1);
1122                            if (id != null) {
1123                                res.add(id);
1124                                if (!done.contains(id)) {
1125                                    todo.add(id);
1126                                }
1127                                if (logger.isLogEnabled()) {
1128                                    debugIds.add(id);
1129                                }
1130                            }
1131                        }
1132                        if (logger.isLogEnabled()) {
1133                            logger.logIds(debugIds, false, 0);
1134                        }
1135                    }
1136                }
1137            }
1138            return res;
1139        } catch (SQLException e) {
1140            throw new NuxeoException("Failed to get ancestors ids", e);
1141        }
1142    }
1143
1144    @Override
1145    public void updateReadAcls() {
1146        if (!dialect.supportsReadAcl()) {
1147            return;
1148        }
1149        if (log.isDebugEnabled()) {
1150            log.debug("updateReadAcls: updating");
1151        }
1152        try (Statement st = connection.createStatement()) {
1153            String sql = dialect.getUpdateReadAclsSql();
1154            if (logger.isLogEnabled()) {
1155                logger.log(sql);
1156            }
1157            st.execute(sql);
1158            countExecute();
1159        } catch (SQLException e) {
1160            checkConcurrentUpdate(e);
1161            throw new NuxeoException("Failed to update read acls", e);
1162        }
1163        if (log.isDebugEnabled()) {
1164            log.debug("updateReadAcls: done.");
1165        }
1166    }
1167
1168    @Override
1169    public void rebuildReadAcls() {
1170        if (!dialect.supportsReadAcl()) {
1171            return;
1172        }
1173        log.debug("rebuildReadAcls: rebuilding ...");
1174        try (Statement st = connection.createStatement()) {
1175            String sql = dialect.getRebuildReadAclsSql();
1176            logger.log(sql);
1177            st.execute(sql);
1178            countExecute();
1179        } catch (SQLException e) {
1180            throw new NuxeoException("Failed to rebuild read acls", e);
1181        }
1182        log.debug("rebuildReadAcls: done.");
1183    }
1184
1185    /*
1186     * ----- Locking -----
1187     */
1188
1189    @Override
1190    public Lock getLock(Serializable id) {
1191        if (log.isDebugEnabled()) {
1192            try {
1193                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1194            } catch (SQLException e) {
1195                throw new RuntimeException(e);
1196            }
1197        }
1198        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1199        Row row = readSimpleRow(rowId);
1200        return row == null ? null
1201                : new Lock((String) row.get(Model.LOCK_OWNER_KEY), (Calendar) row.get(Model.LOCK_CREATED_KEY));
1202    }
1203
1204    @Override
1205    public Lock setLock(final Serializable id, final Lock lock) {
1206        if (log.isDebugEnabled()) {
1207            log.debug("setLock " + id + " owner=" + lock.getOwner());
1208        }
1209        Lock oldLock = getLock(id);
1210        if (oldLock == null) {
1211            Row row = new Row(Model.LOCK_TABLE_NAME, id);
1212            row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1213            row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1214            insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1215        }
1216        return oldLock;
1217    }
1218
1219    @Override
1220    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1221        if (log.isDebugEnabled()) {
1222            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1223        }
1224        Lock oldLock = force ? null : getLock(id);
1225        if (!force && owner != null) {
1226            if (oldLock == null) {
1227                // not locked, nothing to do
1228                return null;
1229            }
1230            if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1231                // existing mismatched lock, flag failure
1232                return new Lock(oldLock, true);
1233            }
1234        }
1235        if (force || oldLock != null) {
1236            deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1237        }
1238        return oldLock;
1239    }
1240
1241    @Override
1242    public void markReferencedBinaries() {
1243        log.debug("Starting binaries GC mark");
1244        BlobManager blobManager = Framework.getService(BlobManager.class);
1245        String repositoryName = getRepositoryName();
1246        try (Statement st = connection.createStatement()) {
1247            int i = -1;
1248            for (String sql : sqlInfo.getBinariesSql) {
1249                i++;
1250                Column col = sqlInfo.getBinariesColumns.get(i);
1251                if (logger.isLogEnabled()) {
1252                    logger.log(sql);
1253                }
1254                try (ResultSet rs = st.executeQuery(sql)) {
1255                    countExecute();
1256                    int n = 0;
1257                    while (rs.next()) {
1258                        n++;
1259                        String key = (String) col.getFromResultSet(rs, 1);
1260                        if (key != null) {
1261                            blobManager.markReferencedBinary(key, repositoryName);
1262                        }
1263                    }
1264                    if (logger.isLogEnabled()) {
1265                        logger.logCount(n);
1266                    }
1267                }
1268            }
1269        } catch (SQLException e) {
1270            throw new RuntimeException("Failed to mark binaries for gC", e);
1271        }
1272        log.debug("End of binaries GC mark");
1273    }
1274
1275    /*
1276     * ----- XAResource -----
1277     */
1278
1279    protected static String systemToString(Object o) {
1280        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1281    }
1282
1283    @Override
1284    public void start(Xid xid, int flags) throws XAException {
1285        try {
1286            xaresource.start(xid, flags);
1287            if (logger.isLogEnabled()) {
1288                logger.log("XA start on " + systemToString(xid));
1289            }
1290        } catch (NuxeoException e) {
1291            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1292        } catch (XAException e) {
1293            logger.error("XA start error on " + systemToString(xid), e);
1294            throw e;
1295        }
1296    }
1297
1298    @Override
1299    public void end(Xid xid, int flags) throws XAException {
1300        try {
1301            xaresource.end(xid, flags);
1302            if (logger.isLogEnabled()) {
1303                logger.log("XA end on " + systemToString(xid));
1304            }
1305        } catch (NullPointerException e) {
1306            // H2 when no active transaction
1307            logger.error("XA end error on " + systemToString(xid), e);
1308            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1309        } catch (XAException e) {
1310            if (flags != XAResource.TMFAIL) {
1311                logger.error("XA end error on " + systemToString(xid), e);
1312            }
1313            throw e;
1314        }
1315    }
1316
1317    @Override
1318    public int prepare(Xid xid) throws XAException {
1319        try {
1320            return xaresource.prepare(xid);
1321        } catch (XAException e) {
1322            logger.error("XA prepare error on  " + systemToString(xid), e);
1323            throw e;
1324        }
1325    }
1326
1327    @Override
1328    public void commit(Xid xid, boolean onePhase) throws XAException {
1329        try {
1330            xaresource.commit(xid, onePhase);
1331        } catch (XAException e) {
1332            logger.error("XA commit error on  " + systemToString(xid), e);
1333            throw e;
1334        }
1335    }
1336
1337    // rollback interacts with caches so is in RowMapper
1338
1339    @Override
1340    public void forget(Xid xid) throws XAException {
1341        xaresource.forget(xid);
1342    }
1343
1344    @Override
1345    public Xid[] recover(int flag) throws XAException {
1346        return xaresource.recover(flag);
1347    }
1348
1349    @Override
1350    public boolean setTransactionTimeout(int seconds) throws XAException {
1351        return xaresource.setTransactionTimeout(seconds);
1352    }
1353
1354    @Override
1355    public int getTransactionTimeout() throws XAException {
1356        return xaresource.getTransactionTimeout();
1357    }
1358
1359    @Override
1360    public boolean isSameRM(XAResource xares) throws XAException {
1361        throw new UnsupportedOperationException();
1362    }
1363
1364    @Override
1365    public boolean isConnected() {
1366        return connection != null;
1367    }
1368
1369    @Override
1370    public void connect(boolean noSharing) {
1371        openConnections(noSharing);
1372    }
1373
1374    @Override
1375    public void disconnect() {
1376        closeConnections();
1377    }
1378
1379}