001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import java.io.File;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.io.PrintStream;
027import java.io.Serializable;
028import java.security.MessageDigest;
029import java.security.NoSuchAlgorithmException;
030import java.sql.Array;
031import java.sql.Connection;
032import java.sql.DatabaseMetaData;
033import java.sql.PreparedStatement;
034import java.sql.ResultSet;
035import java.sql.SQLException;
036import java.sql.Statement;
037import java.sql.Types;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Calendar;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashMap;
044import java.util.HashSet;
045import java.util.LinkedList;
046import java.util.List;
047import java.util.Map;
048import java.util.Map.Entry;
049import java.util.Set;
050import java.util.UUID;
051import java.util.concurrent.Callable;
052import java.util.concurrent.ConcurrentHashMap;
053
054import javax.transaction.xa.XAException;
055import javax.transaction.xa.XAResource;
056import javax.transaction.xa.Xid;
057
058import org.apache.commons.logging.Log;
059import org.apache.commons.logging.LogFactory;
060import org.nuxeo.common.Environment;
061import org.nuxeo.common.utils.StringUtils;
062import org.nuxeo.ecm.core.api.IterableQueryResult;
063import org.nuxeo.ecm.core.api.Lock;
064import org.nuxeo.ecm.core.api.NuxeoException;
065import org.nuxeo.ecm.core.api.PartialList;
066import org.nuxeo.ecm.core.api.ScrollResult;
067import org.nuxeo.ecm.core.api.ScrollResultImpl;
068import org.nuxeo.ecm.core.blob.BlobManager;
069import org.nuxeo.ecm.core.model.LockManager;
070import org.nuxeo.ecm.core.query.QueryFilter;
071import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
072import org.nuxeo.ecm.core.storage.sql.ColumnType;
073import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
074import org.nuxeo.ecm.core.storage.sql.Invalidations;
075import org.nuxeo.ecm.core.storage.sql.Mapper;
076import org.nuxeo.ecm.core.storage.sql.Model;
077import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
078import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
079import org.nuxeo.ecm.core.storage.sql.Row;
080import org.nuxeo.ecm.core.storage.sql.RowId;
081import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
082import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
085import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
088import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
089import org.nuxeo.runtime.api.Framework;
090
091import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
092
093/**
094 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
095 * computes statements.
096 * <p>
097 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
098 * statements recorded in the {@link SQLInfo}.
099 */
100public class JDBCMapper extends JDBCRowMapper implements Mapper {
101
102    private static final Log log = LogFactory.getLog(JDBCMapper.class);
103
104    public static Map<String, Serializable> testProps = new HashMap<String, Serializable>();
105
106    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
107
108    public static final String TEST_UPGRADE = "testUpgrade";
109
110    // property in sql.txt file
111    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
112
113    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
114
115    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
116
117    protected TableUpgrader tableUpgrader;
118
119    private final QueryMakerService queryMakerService;
120
121    private final PathResolver pathResolver;
122
123    private final RepositoryImpl repository;
124
125    protected boolean clusteringEnabled;
126
127    protected static final String NOSCROLL_ID = "noscroll";
128
129    /**
130     * Creates a new Mapper.
131     *
132     * @param model the model
133     * @param pathResolver the path resolver (used for startswith queries)
134     * @param sqlInfo the sql info
135     * @param clusterInvalidator the cluster invalidator
136     * @param noSharing whether to use no-sharing mode for the connection
137     * @param repository
138     */
139    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, ClusterInvalidator clusterInvalidator,
140            boolean noSharing, RepositoryImpl repository) {
141        super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing);
142        this.pathResolver = pathResolver;
143        this.repository = repository;
144        clusteringEnabled = clusterInvalidator != null;
145        queryMakerService = Framework.getService(QueryMakerService.class);
146
147        tableUpgrader = new TableUpgrader(this);
148        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
149                TEST_UPGRADE_VERSIONS);
150        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
151        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
152
153    }
154
155    @Override
156    public int getTableSize(String tableName) {
157        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
158    }
159
160    /*
161     * ----- Root -----
162     */
163
164    @Override
165    public void createDatabase(String ddlMode) {
166        // some databases (SQL Server) can't create tables/indexes/etc in a transaction, so suspend it
167        try {
168            boolean suspend = !connection.getAutoCommit();
169            try {
170                if (suspend) {
171                    connection.setAutoCommit(true);
172                }
173                createTables(ddlMode);
174            } finally {
175                if (suspend) {
176                    connection.setAutoCommit(false);
177                }
178            }
179        } catch (SQLException e) {
180            throw new NuxeoException(e);
181        }
182    }
183
184    protected String getTableName(String origName) {
185
186        if (dialect instanceof DialectOracle) {
187            if (origName.length() > 30) {
188
189                StringBuilder sb = new StringBuilder(origName.length());
190
191                try {
192                    MessageDigest digest = MessageDigest.getInstance("MD5");
193                    sb.append(origName.substring(0, 15));
194                    sb.append('_');
195
196                    digest.update(origName.getBytes());
197                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
198
199                    return sb.toString();
200
201                } catch (NoSuchAlgorithmException e) {
202                    throw new RuntimeException("Error", e);
203                }
204            }
205        }
206
207        return origName;
208    }
209
210    protected void createTables(String ddlMode) throws SQLException {
211        ListCollector ddlCollector = new ListCollector();
212
213        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
214        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
215        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
216        if (testProps.containsKey(TEST_UPGRADE)) {
217            // create "old" tables
218            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
219        }
220
221        String schemaName = dialect.getConnectionSchema(connection);
222        DatabaseMetaData metadata = connection.getMetaData();
223        Set<String> tableNames = findTableNames(metadata, schemaName);
224        Database database = sqlInfo.getDatabase();
225        Map<String, List<Column>> added = new HashMap<String, List<Column>>();
226
227        for (Table table : database.getTables()) {
228            String tableName = getTableName(table.getPhysicalName());
229            if (!tableNames.contains(tableName.toUpperCase())) {
230
231                /*
232                 * Create missing table.
233                 */
234
235                ddlCollector.add(table.getCreateSql());
236                ddlCollector.addAll(table.getPostCreateSqls(model));
237
238                added.put(table.getKey(), null); // null = table created
239
240                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
241
242            } else {
243
244                /*
245                 * Get existing columns.
246                 */
247
248                Map<String, Integer> columnTypes = new HashMap<String, Integer>();
249                Map<String, String> columnTypeNames = new HashMap<String, String>();
250                Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>();
251                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
252                    while (rs.next()) {
253                        String schema = rs.getString("TABLE_SCHEM");
254                        if (schema != null) { // null for MySQL, doh!
255                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
256                                // H2 returns some system tables (locks)
257                                continue;
258                            }
259                        }
260                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
261                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
262                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
263                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
264                    }
265                }
266
267                /*
268                 * Update types and create missing columns.
269                 */
270
271                List<Column> addedColumns = new LinkedList<Column>();
272                for (Column column : table.getColumns()) {
273                    String upperName = column.getPhysicalName().toUpperCase();
274                    Integer type = columnTypes.remove(upperName);
275                    if (type == null) {
276                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
277                        ddlCollector.add(table.getAddColumnSql(column));
278                        ddlCollector.addAll(table.getPostAddSqls(column, model));
279                        addedColumns.add(column);
280                    } else {
281                        int expected = column.getJdbcType();
282                        int actual = type.intValue();
283                        String actualName = columnTypeNames.get(upperName);
284                        Integer actualSize = columnTypeSizes.get(upperName);
285                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
286                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
287                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName,
288                                    actualSize));
289                        }
290                    }
291                }
292                for (String col : dialect.getIgnoredColumns(table)) {
293                    columnTypes.remove(col.toUpperCase());
294                }
295                if (!columnTypes.isEmpty()) {
296                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
297                            + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", "));
298                }
299                if (!addedColumns.isEmpty()) {
300                    if (added.containsKey(table.getKey())) {
301                        throw new AssertionError();
302                    }
303                    added.put(table.getKey(), addedColumns);
304                }
305            }
306        }
307
308        if (testProps.containsKey(TEST_UPGRADE)) {
309            // create "old" content in tables
310            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
311        }
312
313        // run upgrade for each table if added columns or test
314        for (Entry<String, List<Column>> en : added.entrySet()) {
315            List<Column> addedColumns = en.getValue();
316            String tableKey = en.getKey();
317            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
318        }
319
320        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
321        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
322
323        // aclr_permission check for PostgreSQL
324        dialect.performAdditionalStatements(connection);
325
326        /*
327         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
328         */
329
330        // ddlMode may be:
331        // ignore (not treated here, nothing done)
332        // dump (implies execute)
333        // dump,execute
334        // dump,ignore (no execute)
335        // execute
336        // abort (implies dump)
337        // compat can be used instead of execute to always recreate stored procedures
338
339        List<String> ddl = ddlCollector.getStrings();
340        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
341        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
342        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
343        if (dump || abort) {
344
345            /*
346             * Dump DDL if not empty.
347             */
348
349            if (!ddl.isEmpty()) {
350                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
351                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
352                    for (String sql : dialect.getDumpStart()) {
353                        ps.println(sql);
354                    }
355                    for (String sql : ddl) {
356                        sql = sql.trim();
357                        if (sql.endsWith(";")) {
358                            sql = sql.substring(0, sql.length() - 1);
359                        }
360                        ps.println(dialect.getSQLForDump(sql));
361                    }
362                    for (String sql : dialect.getDumpStop()) {
363                        ps.println(sql);
364                    }
365                } catch (IOException e) {
366                    throw new NuxeoException(e);
367                }
368
369                /*
370                 * Abort if requested.
371                 */
372
373                if (abort) {
374                    log.error("Dumped DDL to: " + dumpFile);
375                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
376                            + ", DDL must be executed: " + dumpFile);
377                }
378            }
379        }
380        if (!ignore) {
381
382            /*
383             * Execute DDL.
384             */
385
386            try (Statement st = connection.createStatement()) {
387                for (String sql : ddl) {
388                    logger.log(sql.replace("\n", "\n    ")); // indented
389                    try {
390                        st.execute(sql);
391                    } catch (SQLException e) {
392                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
393                    }
394                    countExecute();
395                }
396            }
397
398            /*
399             * Execute post-DDL stuff.
400             */
401
402            try (Statement st = connection.createStatement()) {
403                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
404                    logger.log(sql.replace("\n", "\n    ")); // indented
405                    try {
406                        st.execute(sql);
407                    } catch (SQLException e) {
408                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
409                    }
410                    countExecute();
411                }
412            }
413        }
414    }
415
416    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
417            throws SQLException {
418        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
419    }
420
421    /** Finds uppercase table names. */
422    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
423        Set<String> tableNames = new HashSet<String>();
424        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
425        while (rs.next()) {
426            String tableName = rs.getString("TABLE_NAME");
427            tableNames.add(tableName.toUpperCase());
428        }
429        rs.close();
430        return tableNames;
431    }
432
433    @Override
434    public int getClusterNodeIdType() {
435        return sqlInfo.getClusterNodeIdType();
436    }
437
438    @Override
439    public void createClusterNode(Serializable nodeId) {
440        Calendar now = Calendar.getInstance();
441        try {
442            String sql = sqlInfo.getCreateClusterNodeSql();
443            List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
444            PreparedStatement ps = connection.prepareStatement(sql);
445            try {
446                if (logger.isLogEnabled()) {
447                    logger.logSQL(sql, Arrays.asList(nodeId, now));
448                }
449                columns.get(0).setToPreparedStatement(ps, 1, nodeId);
450                columns.get(1).setToPreparedStatement(ps, 2, now);
451                ps.execute();
452            } finally {
453                closeStatement(ps);
454            }
455        } catch (SQLException e) {
456            throw new NuxeoException(e);
457        }
458    }
459
460    @Override
461    public void removeClusterNode(Serializable nodeId) {
462        try {
463            // delete from cluster_nodes
464            String sql = sqlInfo.getDeleteClusterNodeSql();
465            Column column = sqlInfo.getDeleteClusterNodeColumn();
466            PreparedStatement ps = connection.prepareStatement(sql);
467            try {
468                if (logger.isLogEnabled()) {
469                    logger.logSQL(sql, Arrays.asList(nodeId));
470                }
471                column.setToPreparedStatement(ps, 1, nodeId);
472                ps.execute();
473            } finally {
474                closeStatement(ps);
475            }
476            // delete un-processed invals from cluster_invals
477            deleteClusterInvals(nodeId);
478        } catch (SQLException e) {
479            throw new NuxeoException(e);
480        }
481    }
482
483    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
484        String sql = sqlInfo.getDeleteClusterInvalsSql();
485        Column column = sqlInfo.getDeleteClusterInvalsColumn();
486        PreparedStatement ps = connection.prepareStatement(sql);
487        try {
488            if (logger.isLogEnabled()) {
489                logger.logSQL(sql, Arrays.asList(nodeId));
490            }
491            column.setToPreparedStatement(ps, 1, nodeId);
492            int n = ps.executeUpdate();
493            countExecute();
494            if (logger.isLogEnabled()) {
495                logger.logCount(n);
496            }
497        } finally {
498            try {
499                closeStatement(ps);
500            } catch (SQLException e) {
501                log.error("deleteClusterInvals: " + e.getMessage(), e);
502            }
503        }
504    }
505
506    @Override
507    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
508        String sql = dialect.getClusterInsertInvalidations();
509        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
510        PreparedStatement ps = null;
511        try {
512            ps = connection.prepareStatement(sql);
513            int kind = Invalidations.MODIFIED;
514            while (true) {
515                Set<RowId> rowIds = invalidations.getKindSet(kind);
516
517                // reorganize by id
518                Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>();
519                for (RowId rowId : rowIds) {
520                    Set<String> tableNames = res.get(rowId.id);
521                    if (tableNames == null) {
522                        res.put(rowId.id, tableNames = new HashSet<String>());
523                    }
524                    tableNames.add(rowId.tableName);
525                }
526
527                // do inserts
528                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
529                    Serializable id = en.getKey();
530                    String fragments = join(en.getValue(), ' ');
531                    if (logger.isLogEnabled()) {
532                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
533                    }
534                    Serializable frags;
535                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
536                        frags = fragments.split(" ");
537                    } else {
538                        frags = fragments;
539                    }
540                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
541                    columns.get(1).setToPreparedStatement(ps, 2, id);
542                    columns.get(2).setToPreparedStatement(ps, 3, frags);
543                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
544                    ps.execute();
545                    countExecute();
546                }
547                if (kind == Invalidations.MODIFIED) {
548                    kind = Invalidations.DELETED;
549                } else {
550                    break;
551                }
552            }
553        } catch (SQLException e) {
554            throw new NuxeoException("Could not invalidate", e);
555        } finally {
556            try {
557                closeStatement(ps);
558            } catch (SQLException e) {
559                log.error(e.getMessage(), e);
560            }
561        }
562    }
563
564    // join that works on a set
565    protected static final String join(Collection<String> strings, char sep) {
566        if (strings.isEmpty()) {
567            throw new RuntimeException();
568        }
569        if (strings.size() == 1) {
570            return strings.iterator().next();
571        }
572        int size = 0;
573        for (String word : strings) {
574            size += word.length() + 1;
575        }
576        StringBuilder buf = new StringBuilder(size);
577        for (String word : strings) {
578            buf.append(word);
579            buf.append(sep);
580        }
581        buf.setLength(size - 1);
582        return buf.toString();
583    }
584
585    @Override
586    public Invalidations getClusterInvalidations(Serializable nodeId) {
587        Invalidations invalidations = new Invalidations();
588        String sql = dialect.getClusterGetInvalidations();
589        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
590        try {
591            if (logger.isLogEnabled()) {
592                logger.logSQL(sql, Arrays.asList(nodeId));
593            }
594            PreparedStatement ps = connection.prepareStatement(sql);
595            ResultSet rs = null;
596            try {
597                setToPreparedStatement(ps, 1, nodeId);
598                rs = ps.executeQuery();
599                countExecute();
600                while (rs.next()) {
601                    // first column ignored, it's the node id
602                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
603                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
604                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
605                    String[] fragments;
606                    if (dialect.supportsArrays() && frags instanceof String[]) {
607                        fragments = (String[]) frags;
608                    } else {
609                        fragments = ((String) frags).split(" ");
610                    }
611                    invalidations.add(id, fragments, kind);
612                }
613            } finally {
614                closeStatement(ps, rs);
615            }
616            if (logger.isLogEnabled()) {
617                // logCount(n);
618                logger.log("  -> " + invalidations);
619            }
620            if (dialect.isClusteringDeleteNeeded()) {
621                deleteClusterInvals(nodeId);
622            }
623            return invalidations;
624        } catch (SQLException e) {
625            throw new NuxeoException("Could not invalidate", e);
626        }
627    }
628
629    @Override
630    public Serializable getRootId(String repositoryId) {
631        String sql = sqlInfo.getSelectRootIdSql();
632        try {
633            if (logger.isLogEnabled()) {
634                logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
635            }
636            PreparedStatement ps = connection.prepareStatement(sql);
637            ResultSet rs = null;
638            try {
639                ps.setString(1, repositoryId);
640                rs = ps.executeQuery();
641                countExecute();
642                if (!rs.next()) {
643                    if (logger.isLogEnabled()) {
644                        logger.log("  -> (none)");
645                    }
646                    return null;
647                }
648                Column column = sqlInfo.getSelectRootIdWhatColumn();
649                Serializable id = column.getFromResultSet(rs, 1);
650                if (logger.isLogEnabled()) {
651                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
652                }
653                // check that we didn't get several rows
654                if (rs.next()) {
655                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
656                }
657                return id;
658            } finally {
659                closeStatement(ps, rs);
660            }
661        } catch (SQLException e) {
662            throw new NuxeoException("Could not select: " + sql, e);
663        }
664    }
665
666    @Override
667    public void setRootId(Serializable repositoryId, Serializable id) {
668        String sql = sqlInfo.getInsertRootIdSql();
669        try {
670            PreparedStatement ps = connection.prepareStatement(sql);
671            try {
672                List<Column> columns = sqlInfo.getInsertRootIdColumns();
673                List<Serializable> debugValues = null;
674                if (logger.isLogEnabled()) {
675                    debugValues = new ArrayList<Serializable>(2);
676                }
677                int i = 0;
678                for (Column column : columns) {
679                    i++;
680                    String key = column.getKey();
681                    Serializable v;
682                    if (key.equals(Model.MAIN_KEY)) {
683                        v = id;
684                    } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
685                        v = repositoryId;
686                    } else {
687                        throw new RuntimeException(key);
688                    }
689                    column.setToPreparedStatement(ps, i, v);
690                    if (debugValues != null) {
691                        debugValues.add(v);
692                    }
693                }
694                if (debugValues != null) {
695                    logger.logSQL(sql, debugValues);
696                    debugValues.clear();
697                }
698                ps.execute();
699                countExecute();
700            } finally {
701                closeStatement(ps);
702            }
703        } catch (SQLException e) {
704            throw new NuxeoException("Could not insert: " + sql, e);
705        }
706    }
707
708    protected QueryMaker findQueryMaker(String queryType) {
709        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
710            QueryMaker queryMaker;
711            try {
712                queryMaker = klass.newInstance();
713            } catch (ReflectiveOperationException e) {
714                throw new NuxeoException(e);
715            }
716            if (queryMaker.accepts(queryType)) {
717                return queryMaker;
718            }
719        }
720        return null;
721    }
722
723    protected void prepareUserReadAcls(QueryFilter queryFilter) {
724        String sql = dialect.getPrepareUserReadAclsSql();
725        Serializable principals = queryFilter.getPrincipals();
726        if (sql == null || principals == null) {
727            return;
728        }
729        if (!dialect.supportsArrays()) {
730            principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP);
731        }
732        PreparedStatement ps = null;
733        try {
734            ps = connection.prepareStatement(sql);
735            if (logger.isLogEnabled()) {
736                logger.logSQL(sql, Collections.singleton(principals));
737            }
738            setToPreparedStatement(ps, 1, principals);
739            ps.execute();
740            countExecute();
741        } catch (SQLException e) {
742            throw new NuxeoException("Failed to prepare user read acl cache", e);
743        } finally {
744            try {
745                closeStatement(ps);
746            } catch (SQLException e) {
747                log.error(e.getMessage(), e);
748            }
749        }
750    }
751
752    @Override
753    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
754            boolean countTotal) {
755        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
756    }
757
758    @Override
759    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
760        if (dialect.needsPrepareUserReadAcls()) {
761            prepareUserReadAcls(queryFilter);
762        }
763        QueryMaker queryMaker = findQueryMaker(queryType);
764        if (queryMaker == null) {
765            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
766        }
767        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
768
769        if (q == null) {
770            logger.log("Query cannot return anything due to conflicting clauses");
771            return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0);
772        }
773        long limit = queryFilter.getLimit();
774        long offset = queryFilter.getOffset();
775
776        if (logger.isLogEnabled()) {
777            String sql = q.selectInfo.sql;
778            if (limit != 0) {
779                sql += " -- LIMIT " + limit + " OFFSET " + offset;
780            }
781            if (countUpTo != 0) {
782                sql += " -- COUNT TOTAL UP TO " + countUpTo;
783            }
784            logger.logSQL(sql, q.selectParams);
785        }
786
787        String sql = q.selectInfo.sql;
788
789        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
790            // full result set not needed for counting
791            sql = dialect.addPagingClause(sql, limit, offset);
792            limit = 0;
793            offset = 0;
794        } else if (countUpTo > 0 && dialect.supportsPaging()) {
795            // ask one more row
796            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
797        }
798
799        PreparedStatement ps = null;
800        ResultSet rs = null;
801        try {
802            ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
803            int i = 1;
804            for (Serializable object : q.selectParams) {
805                setToPreparedStatement(ps, i++, object);
806            }
807            rs = ps.executeQuery();
808            countExecute();
809
810            // limit/offset
811            long totalSize = -1;
812            boolean available;
813            if ((limit == 0) || (offset == 0)) {
814                available = rs.first();
815                if (!available) {
816                    totalSize = 0;
817                }
818                if (limit == 0) {
819                    limit = -1; // infinite
820                }
821            } else {
822                available = rs.absolute((int) offset + 1);
823            }
824
825            Column column = q.selectInfo.whatColumns.get(0);
826            List<Serializable> ids = new LinkedList<Serializable>();
827            int rowNum = 0;
828            while (available && (limit != 0)) {
829                Serializable id = column.getFromResultSet(rs, 1);
830                ids.add(id);
831                rowNum = rs.getRow();
832                available = rs.next();
833                limit--;
834            }
835
836            // total size
837            if (countUpTo != 0 && (totalSize == -1)) {
838                if (!available && (rowNum != 0)) {
839                    // last row read was the actual last
840                    totalSize = rowNum;
841                } else {
842                    // available if limit reached with some left
843                    // rowNum == 0 if skipped too far
844                    rs.last();
845                    totalSize = rs.getRow();
846                }
847                if (countUpTo > 0 && totalSize > countUpTo) {
848                    // the result where truncated we don't know the total size
849                    totalSize = -2;
850                }
851            }
852
853            if (logger.isLogEnabled()) {
854                logger.logIds(ids, countUpTo != 0, totalSize);
855            }
856
857            return new PartialList<Serializable>(ids, totalSize);
858        } catch (SQLException e) {
859            throw new NuxeoException("Invalid query: " + query, e);
860        } finally {
861            try {
862                closeStatement(ps, rs);
863            } catch (SQLException e) {
864                log.error("Cannot close connection", e);
865            }
866        }
867    }
868
869    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
870        if (object instanceof Calendar) {
871            Calendar cal = (Calendar) object;
872            ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal);
873        } else if (object instanceof java.sql.Date) {
874            ps.setDate(i, (java.sql.Date) object);
875        } else if (object instanceof Long) {
876            ps.setLong(i, ((Long) object).longValue());
877        } else if (object instanceof WrappedId) {
878            dialect.setId(ps, i, object.toString());
879        } else if (object instanceof Object[]) {
880            int jdbcType;
881            if (object instanceof String[]) {
882                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
883            } else if (object instanceof Boolean[]) {
884                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
885            } else if (object instanceof Long[]) {
886                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
887            } else if (object instanceof Double[]) {
888                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
889            } else if (object instanceof java.sql.Date[]) {
890                jdbcType = Types.DATE;
891            } else if (object instanceof java.sql.Clob[]) {
892                jdbcType = Types.CLOB;
893            } else if (object instanceof Calendar[]) {
894                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
895                object = dialect.getTimestampFromCalendar((Calendar) object);
896            } else if (object instanceof Integer[]) {
897                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
898            } else {
899                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
900            }
901            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
902            ps.setArray(i, array);
903        } else {
904            ps.setObject(i, object);
905        }
906        return i;
907    }
908
909    // queryFilter used for principals and permissions
910    @Override
911    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
912            boolean distinctDocuments, Object... params) {
913        if (dialect.needsPrepareUserReadAcls()) {
914            prepareUserReadAcls(queryFilter);
915        }
916        QueryMaker queryMaker = findQueryMaker(queryType);
917        if (queryMaker == null) {
918            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
919        }
920        if (distinctDocuments) {
921            String q = query.toLowerCase();
922            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
923                query = "SELECT DISTINCT " + query.substring("SELECT ".length());
924            }
925        }
926        try {
927            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
928        } catch (SQLException e) {
929            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
930        }
931    }
932
933    @Override
934    public ScrollResult scroll(String query, int batchSize, int keepAliveSeconds) {
935        if (!dialect.supportsScroll()) {
936            return defaultScroll(query);
937        }
938        checkForTimedoutScroll();
939        return scrollSearch(query, batchSize, keepAliveSeconds);
940    }
941
942    protected void checkForTimedoutScroll() {
943        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
944    }
945
946    protected ScrollResult scrollSearch(String query, int batchSize, int keepAliveSeconds) {
947        QueryMaker queryMaker = findQueryMaker("NXQL");
948        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
949        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, null);
950        if (q == null) {
951            logger.log("Query cannot return anything due to conflicting clauses");
952            throw new NuxeoException("Query cannot return anything due to conflicting clauses");
953        }
954        if (logger.isLogEnabled()) {
955            logger.logSQL(q.selectInfo.sql, q.selectParams);
956        }
957        try {
958            if (connection.getAutoCommit()) {
959                throw new NuxeoException("Scroll should be done inside a transaction");
960            }
961            PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY,
962                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
963            ps.setFetchSize(batchSize);
964            int i = 1;
965            for (Serializable object : q.selectParams) {
966                setToPreparedStatement(ps, i++, object);
967            }
968            ResultSet rs = ps.executeQuery();
969            String scrollId = UUID.randomUUID().toString();
970            registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds);
971            return scroll(scrollId);
972        } catch (SQLException e) {
973            throw new NuxeoException("Error on query", e);
974        }
975    }
976
977    protected class CursorResult {
978        protected final int keepAliveSeconds;
979        protected final PreparedStatement preparedStatement;
980        protected final ResultSet resultSet;
981        protected final int batchSize;
982        protected long lastCallTimestamp;
983
984        CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) {
985            this.preparedStatement = preparedStatement;
986            this.resultSet = resultSet;
987            this.batchSize = batchSize;
988            this.keepAliveSeconds = keepAliveSeconds;
989            lastCallTimestamp = System.currentTimeMillis();
990        }
991
992        boolean timedOut(String scrollId) {
993            long now = System.currentTimeMillis();
994            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
995                if (unregisterCursor(scrollId)) {
996                    log.warn("Scroll " + scrollId + " timed out");
997                }
998                return true;
999            }
1000            return false;
1001        }
1002
1003        void touch() {
1004            lastCallTimestamp = System.currentTimeMillis();
1005        }
1006
1007        synchronized void close() throws SQLException {
1008            if (resultSet != null) {
1009                resultSet.close();
1010            }
1011            if (preparedStatement != null) {
1012                preparedStatement.close();
1013            }
1014        }
1015    }
1016
1017    protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize,
1018                                  int keepAliveSeconds) {
1019        cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds));
1020    }
1021
1022    protected boolean unregisterCursor(String scrollId) {
1023        CursorResult cursor = cursorResults.remove(scrollId);
1024        if (cursor != null) {
1025            try {
1026                cursor.close();
1027                return true;
1028            } catch (SQLException e) {
1029                log.error("Failed to close cursor for scroll: " + scrollId, e);
1030                // do not propagate exception on cleaning
1031            }
1032        }
1033        return false;
1034    }
1035
1036    protected ScrollResult defaultScroll(String query) {
1037        // the database has no proper support for cursor just return everything in one batch
1038        QueryMaker queryMaker = findQueryMaker("NXQL");
1039        List<String> ids;
1040        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
1041        try (IterableQueryResult  ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, null)) {
1042            ids = new ArrayList<>((int) ret.size());
1043            for (Map<String, Serializable> map : ret) {
1044                ids.add(map.get("ecm:uuid").toString());
1045            }
1046        } catch (SQLException e) {
1047            throw new NuxeoException("Invalid scroll query: " + query, e);
1048        }
1049        return new ScrollResultImpl(NOSCROLL_ID, ids);
1050    }
1051
1052    @Override
1053    public ScrollResult scroll(String scrollId) {
1054        if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) {
1055            // there is only one batch in this case
1056            return emptyResult();
1057        }
1058        CursorResult cursorResult = cursorResults.get(scrollId);
1059        if (cursorResult == null) {
1060            throw new NuxeoException("Unknown or timed out scrollId");
1061        } else if (cursorResult.timedOut(scrollId)) {
1062            throw new NuxeoException("Timed out scrollId");
1063        }
1064        cursorResult.touch();
1065        List<String> ids = new ArrayList<>(cursorResult.batchSize);
1066        synchronized (cursorResult) {
1067            try {
1068                if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) {
1069                    unregisterCursor(scrollId);
1070                    return emptyResult();
1071                }
1072                while (ids.size() < cursorResult.batchSize) {
1073                    if (cursorResult.resultSet.next()) {
1074                        ids.add(cursorResult.resultSet.getString(1));
1075                    } else {
1076                        cursorResult.close();
1077                        if (ids.isEmpty()) {
1078                            unregisterCursor(scrollId);
1079                        }
1080                        break;
1081                    }
1082                }
1083            } catch (SQLException e) {
1084                throw new NuxeoException("Error during scroll", e);
1085            }
1086        }
1087        return new ScrollResultImpl(scrollId, ids);
1088    }
1089
1090    @Override
1091    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
1092        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
1093        if (select == null) {
1094            return getAncestorsIdsIterative(ids);
1095        }
1096        Serializable whereIds = newIdArray(ids);
1097        Set<Serializable> res = new HashSet<Serializable>();
1098        PreparedStatement ps = null;
1099        ResultSet rs = null;
1100        try {
1101            if (logger.isLogEnabled()) {
1102                logger.logSQL(select.sql, Collections.singleton(whereIds));
1103            }
1104            Column what = select.whatColumns.get(0);
1105            ps = connection.prepareStatement(select.sql);
1106            setToPreparedStatementIdArray(ps, 1, whereIds);
1107            rs = ps.executeQuery();
1108            countExecute();
1109            List<Serializable> debugIds = null;
1110            if (logger.isLogEnabled()) {
1111                debugIds = new LinkedList<Serializable>();
1112            }
1113            while (rs.next()) {
1114                if (dialect.supportsArraysReturnInsteadOfRows()) {
1115                    Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
1116                    for (Serializable id : resultIds) {
1117                        if (id != null) {
1118                            res.add(id);
1119                            if (logger.isLogEnabled()) {
1120                                debugIds.add(id);
1121                            }
1122                        }
1123                    }
1124                } else {
1125                    Serializable id = what.getFromResultSet(rs, 1);
1126                    if (id != null) {
1127                        res.add(id);
1128                        if (logger.isLogEnabled()) {
1129                            debugIds.add(id);
1130                        }
1131                    }
1132                }
1133            }
1134            if (logger.isLogEnabled()) {
1135                logger.logIds(debugIds, false, 0);
1136            }
1137            return res;
1138        } catch (SQLException e) {
1139            throw new NuxeoException("Failed to get ancestors ids", e);
1140        } finally {
1141            try {
1142                closeStatement(ps, rs);
1143            } catch (SQLException e) {
1144                log.error(e.getMessage(), e);
1145            }
1146        }
1147    }
1148
1149    /**
1150     * Uses iterative parentid selection.
1151     */
1152    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
1153        PreparedStatement ps = null;
1154        ResultSet rs = null;
1155        try {
1156            LinkedList<Serializable> todo = new LinkedList<Serializable>(ids);
1157            Set<Serializable> done = new HashSet<Serializable>();
1158            Set<Serializable> res = new HashSet<Serializable>();
1159            while (!todo.isEmpty()) {
1160                done.addAll(todo);
1161                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
1162                if (logger.isLogEnabled()) {
1163                    logger.logSQL(select.sql, todo);
1164                }
1165                Column what = select.whatColumns.get(0);
1166                Column where = select.whereColumns.get(0);
1167                ps = connection.prepareStatement(select.sql);
1168                int i = 1;
1169                for (Serializable id : todo) {
1170                    where.setToPreparedStatement(ps, i++, id);
1171                }
1172                rs = ps.executeQuery();
1173                countExecute();
1174                todo = new LinkedList<Serializable>();
1175                List<Serializable> debugIds = null;
1176                if (logger.isLogEnabled()) {
1177                    debugIds = new LinkedList<Serializable>();
1178                }
1179                while (rs.next()) {
1180                    Serializable id = what.getFromResultSet(rs, 1);
1181                    if (id != null) {
1182                        res.add(id);
1183                        if (!done.contains(id)) {
1184                            todo.add(id);
1185                        }
1186                        if (logger.isLogEnabled()) {
1187                            debugIds.add(id);
1188                        }
1189                    }
1190                }
1191                if (logger.isLogEnabled()) {
1192                    logger.logIds(debugIds, false, 0);
1193                }
1194                rs.close();
1195                ps.close();
1196            }
1197            return res;
1198        } catch (SQLException e) {
1199            throw new NuxeoException("Failed to get ancestors ids", e);
1200        } finally {
1201            try {
1202                closeStatement(ps, rs);
1203            } catch (SQLException e) {
1204                log.error(e.getMessage(), e);
1205            }
1206        }
1207    }
1208
1209    @Override
1210    public void updateReadAcls() {
1211        if (!dialect.supportsReadAcl()) {
1212            return;
1213        }
1214        if (log.isDebugEnabled()) {
1215            log.debug("updateReadAcls: updating");
1216        }
1217        Statement st = null;
1218        try {
1219            st = connection.createStatement();
1220            String sql = dialect.getUpdateReadAclsSql();
1221            if (logger.isLogEnabled()) {
1222                logger.log(sql);
1223            }
1224            st.execute(sql);
1225            countExecute();
1226        } catch (SQLException e) {
1227            checkConcurrentUpdate(e);
1228            throw new NuxeoException("Failed to update read acls", e);
1229        } finally {
1230            try {
1231                closeStatement(st);
1232            } catch (SQLException e) {
1233                log.error(e.getMessage(), e);
1234            }
1235        }
1236        if (log.isDebugEnabled()) {
1237            log.debug("updateReadAcls: done.");
1238        }
1239    }
1240
1241    @Override
1242    public void rebuildReadAcls() {
1243        if (!dialect.supportsReadAcl()) {
1244            return;
1245        }
1246        log.debug("rebuildReadAcls: rebuilding ...");
1247        Statement st = null;
1248        try {
1249            st = connection.createStatement();
1250            String sql = dialect.getRebuildReadAclsSql();
1251            logger.log(sql);
1252            st.execute(sql);
1253            countExecute();
1254        } catch (SQLException e) {
1255            throw new NuxeoException("Failed to rebuild read acls", e);
1256        } finally {
1257            try {
1258                closeStatement(st);
1259            } catch (SQLException e) {
1260                log.error(e.getMessage(), e);
1261            }
1262        }
1263        log.debug("rebuildReadAcls: done.");
1264    }
1265
1266    /*
1267     * ----- Locking -----
1268     */
1269
1270    protected Connection connection(boolean autocommit) {
1271        try {
1272            connection.setAutoCommit(autocommit);
1273        } catch (SQLException e) {
1274            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1275        }
1276        return connection;
1277    }
1278
1279    /**
1280     * Calls the callable, inside a transaction if in cluster mode.
1281     * <p>
1282     * Called under {@link #serializationLock}.
1283     */
1284    protected Lock callInTransaction(LockCallable callable, boolean tx) {
1285        boolean ok = false;
1286        try {
1287            if (log.isDebugEnabled()) {
1288                log.debug("callInTransaction setAutoCommit " + !tx);
1289            }
1290            connection.setAutoCommit(!tx);
1291        } catch (SQLException e) {
1292            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1293        }
1294        try {
1295            Lock result = callable.call();
1296            ok = true;
1297            return result;
1298        } finally {
1299            if (tx) {
1300                try {
1301                    try {
1302                        if (ok) {
1303                            if (log.isDebugEnabled()) {
1304                                log.debug("callInTransaction commit");
1305                            }
1306                            connection.commit();
1307                        } else {
1308                            if (log.isDebugEnabled()) {
1309                                log.debug("callInTransaction rollback");
1310                            }
1311                            connection.rollback();
1312                        }
1313                    } finally {
1314                        // restore autoCommit=true
1315                        if (log.isDebugEnabled()) {
1316                            log.debug("callInTransaction restoring autoCommit=true");
1317                        }
1318                        connection.setAutoCommit(true);
1319                    }
1320                } catch (SQLException e) {
1321                    throw new NuxeoException(e);
1322                }
1323            }
1324        }
1325    }
1326
1327    public interface LockCallable extends Callable<Lock> {
1328        @Override
1329        public Lock call();
1330    }
1331
1332    @Override
1333    public Lock getLock(Serializable id) {
1334        if (log.isDebugEnabled()) {
1335            try {
1336                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1337            } catch (SQLException e) {
1338                throw new RuntimeException(e);
1339            }
1340        }
1341        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1342        Row row = readSimpleRow(rowId);
1343        return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY),
1344                (Calendar) row.get(Model.LOCK_CREATED_KEY));
1345    }
1346
1347    @Override
1348    public Lock setLock(final Serializable id, final Lock lock) {
1349        if (log.isDebugEnabled()) {
1350            log.debug("setLock " + id + " owner=" + lock.getOwner());
1351        }
1352        SetLock call = new SetLock(id, lock);
1353        return callInTransaction(call, clusteringEnabled);
1354    }
1355
1356    protected class SetLock implements LockCallable {
1357        protected final Serializable id;
1358
1359        protected final Lock lock;
1360
1361        protected SetLock(Serializable id, Lock lock) {
1362            super();
1363            this.id = id;
1364            this.lock = lock;
1365        }
1366
1367        @Override
1368        public Lock call() {
1369            Lock oldLock = getLock(id);
1370            if (oldLock == null) {
1371                Row row = new Row(Model.LOCK_TABLE_NAME, id);
1372                row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1373                row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1374                insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1375            }
1376            return oldLock;
1377        }
1378    }
1379
1380    @Override
1381    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1382        if (log.isDebugEnabled()) {
1383            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1384        }
1385        RemoveLock call = new RemoveLock(id, owner, force);
1386        return callInTransaction(call, !force);
1387    }
1388
1389    protected class RemoveLock implements LockCallable {
1390        protected final Serializable id;
1391
1392        protected final String owner;
1393
1394        protected final boolean force;
1395
1396        protected RemoveLock(Serializable id, String owner, boolean force) {
1397            super();
1398            this.id = id;
1399            this.owner = owner;
1400            this.force = force;
1401        }
1402
1403        @Override
1404        public Lock call() {
1405            Lock oldLock = force ? null : getLock(id);
1406            if (!force && owner != null) {
1407                if (oldLock == null) {
1408                    // not locked, nothing to do
1409                    return null;
1410                }
1411                if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1412                    // existing mismatched lock, flag failure
1413                    return new Lock(oldLock, true);
1414                }
1415            }
1416            if (force || oldLock != null) {
1417                deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1418            }
1419            return oldLock;
1420        }
1421    }
1422
1423    @Override
1424    public void markReferencedBinaries() {
1425        log.debug("Starting binaries GC mark");
1426        Statement st = null;
1427        ResultSet rs = null;
1428        BlobManager blobManager = Framework.getService(BlobManager.class);
1429        String repositoryName = getRepositoryName();
1430        try {
1431            st = connection.createStatement();
1432            int i = -1;
1433            for (String sql : sqlInfo.getBinariesSql) {
1434                i++;
1435                Column col = sqlInfo.getBinariesColumns.get(i);
1436                if (logger.isLogEnabled()) {
1437                    logger.log(sql);
1438                }
1439                rs = st.executeQuery(sql);
1440                countExecute();
1441                int n = 0;
1442                while (rs.next()) {
1443                    n++;
1444                    String key = (String) col.getFromResultSet(rs, 1);
1445                    if (key != null) {
1446                        blobManager.markReferencedBinary(key, repositoryName);
1447                    }
1448                }
1449                if (logger.isLogEnabled()) {
1450                    logger.logCount(n);
1451                }
1452                rs.close();
1453            }
1454        } catch (SQLException e) {
1455            throw new RuntimeException("Failed to mark binaries for gC", e);
1456        } finally {
1457            try {
1458                closeStatement(st, rs);
1459            } catch (SQLException e) {
1460                log.error(e.getMessage(), e);
1461            }
1462        }
1463        log.debug("End of binaries GC mark");
1464    }
1465
1466    /*
1467     * ----- XAResource -----
1468     */
1469
1470    protected static String systemToString(Object o) {
1471        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1472    }
1473
1474    @Override
1475    public void start(Xid xid, int flags) throws XAException {
1476        try {
1477            xaresource.start(xid, flags);
1478            if (logger.isLogEnabled()) {
1479                logger.log("XA start on " + systemToString(xid));
1480            }
1481        } catch (NuxeoException e) {
1482            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1483        } catch (XAException e) {
1484            logger.error("XA start error on " + systemToString(xid), e);
1485            throw e;
1486        }
1487    }
1488
1489    @Override
1490    public void end(Xid xid, int flags) throws XAException {
1491        try {
1492            xaresource.end(xid, flags);
1493            if (logger.isLogEnabled()) {
1494                logger.log("XA end on " + systemToString(xid));
1495            }
1496        } catch (NullPointerException e) {
1497            // H2 when no active transaction
1498            logger.error("XA end error on " + systemToString(xid), e);
1499            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1500        } catch (XAException e) {
1501            if (flags != XAResource.TMFAIL) {
1502                logger.error("XA end error on " + systemToString(xid), e);
1503            }
1504            throw e;
1505        }
1506    }
1507
1508    @Override
1509    public int prepare(Xid xid) throws XAException {
1510        try {
1511            return xaresource.prepare(xid);
1512        } catch (XAException e) {
1513            logger.error("XA prepare error on  " + systemToString(xid), e);
1514            throw e;
1515        }
1516    }
1517
1518    @Override
1519    public void commit(Xid xid, boolean onePhase) throws XAException {
1520        try {
1521            xaresource.commit(xid, onePhase);
1522        } catch (XAException e) {
1523            logger.error("XA commit error on  " + systemToString(xid), e);
1524            throw e;
1525        }
1526    }
1527
1528    // rollback interacts with caches so is in RowMapper
1529
1530    @Override
1531    public void forget(Xid xid) throws XAException {
1532        xaresource.forget(xid);
1533    }
1534
1535    @Override
1536    public Xid[] recover(int flag) throws XAException {
1537        return xaresource.recover(flag);
1538    }
1539
1540    @Override
1541    public boolean setTransactionTimeout(int seconds) throws XAException {
1542        return xaresource.setTransactionTimeout(seconds);
1543    }
1544
1545    @Override
1546    public int getTransactionTimeout() throws XAException {
1547        return xaresource.getTransactionTimeout();
1548    }
1549
1550    @Override
1551    public boolean isSameRM(XAResource xares) throws XAException {
1552        throw new UnsupportedOperationException();
1553    }
1554
1555    @Override
1556    public boolean isConnected() {
1557        return connection != null;
1558    }
1559
1560    @Override
1561    public void connect() {
1562        openConnections();
1563    }
1564
1565    @Override
1566    public void disconnect() {
1567        closeConnections();
1568    }
1569
1570}