001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import java.io.File;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.io.PrintStream;
027import java.io.Serializable;
028import java.security.MessageDigest;
029import java.security.NoSuchAlgorithmException;
030import java.sql.Array;
031import java.sql.Connection;
032import java.sql.DatabaseMetaData;
033import java.sql.PreparedStatement;
034import java.sql.ResultSet;
035import java.sql.SQLException;
036import java.sql.Statement;
037import java.sql.Types;
038import java.text.SimpleDateFormat;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Calendar;
042import java.util.Collection;
043import java.util.Collections;
044import java.util.Date;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.Map.Entry;
051import java.util.Set;
052import java.util.concurrent.Callable;
053
054import javax.sql.XADataSource;
055import javax.transaction.xa.XAException;
056import javax.transaction.xa.XAResource;
057import javax.transaction.xa.Xid;
058
059import org.apache.commons.logging.Log;
060import org.apache.commons.logging.LogFactory;
061import org.nuxeo.common.Environment;
062import org.nuxeo.common.utils.StringUtils;
063import org.nuxeo.ecm.core.api.IterableQueryResult;
064import org.nuxeo.ecm.core.api.Lock;
065import org.nuxeo.ecm.core.api.NuxeoException;
066import org.nuxeo.ecm.core.api.PartialList;
067import org.nuxeo.ecm.core.blob.BlobManager;
068import org.nuxeo.ecm.core.model.LockManager;
069import org.nuxeo.ecm.core.query.QueryFilter;
070import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
071import org.nuxeo.ecm.core.storage.sql.ColumnType;
072import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
073import org.nuxeo.ecm.core.storage.sql.Invalidations;
074import org.nuxeo.ecm.core.storage.sql.Mapper;
075import org.nuxeo.ecm.core.storage.sql.Model;
076import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
077import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
078import org.nuxeo.ecm.core.storage.sql.Row;
079import org.nuxeo.ecm.core.storage.sql.RowId;
080import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
081import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
082import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
085import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
088import org.nuxeo.runtime.api.Framework;
089
090/**
091 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
092 * computes statements.
093 * <p>
094 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
095 * statements recorded in the {@link SQLInfo}.
096 */
097public class JDBCMapper extends JDBCRowMapper implements Mapper {
098
099    private static final Log log = LogFactory.getLog(JDBCMapper.class);
100
101    public static Map<String, Serializable> testProps = new HashMap<String, Serializable>();
102
103    public static final String TEST_UPGRADE = "testUpgrade";
104
105    // property in sql.txt file
106    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
107
108    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
109
110    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
111
112    protected TableUpgrader tableUpgrader;
113
114    private final QueryMakerService queryMakerService;
115
116    private final PathResolver pathResolver;
117
118    private final RepositoryImpl repository;
119
120    protected boolean clusteringEnabled;
121
122    /**
123     * Creates a new Mapper.
124     *
125     * @param model the model
126     * @param pathResolver the path resolver (used for startswith queries)
127     * @param sqlInfo the sql info
128     * @param xadatasource the XA datasource to use to get connections
129     * @param clusterInvalidator the cluster invalidator
130     * @param repository
131     */
132    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, XADataSource xadatasource,
133            ClusterInvalidator clusterInvalidator, boolean noSharing, RepositoryImpl repository) {
134        super(model, sqlInfo, xadatasource, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing);
135        this.pathResolver = pathResolver;
136        this.repository = repository;
137        clusteringEnabled = clusterInvalidator != null;
138        queryMakerService = Framework.getService(QueryMakerService.class);
139
140        tableUpgrader = new TableUpgrader(this);
141        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
142                TEST_UPGRADE_VERSIONS);
143        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
144        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
145
146    }
147
148    @Override
149    public int getTableSize(String tableName) {
150        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
151    }
152
153    /*
154     * ----- Root -----
155     */
156
157    @Override
158    public void createDatabase(String ddlMode) {
159        try {
160            createTables(ddlMode);
161        } catch (SQLException e) {
162            throw new NuxeoException(e);
163        }
164    }
165
166    protected String getTableName(String origName) {
167
168        if (dialect instanceof DialectOracle) {
169            if (origName.length() > 30) {
170
171                StringBuilder sb = new StringBuilder(origName.length());
172
173                try {
174                    MessageDigest digest = MessageDigest.getInstance("MD5");
175                    sb.append(origName.substring(0, 15));
176                    sb.append('_');
177
178                    digest.update(origName.getBytes());
179                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
180
181                    return sb.toString();
182
183                } catch (NoSuchAlgorithmException e) {
184                    throw new RuntimeException("Error", e);
185                }
186            }
187        }
188
189        return origName;
190    }
191
192    protected void createTables(String ddlMode) throws SQLException {
193        ListCollector ddlCollector = new ListCollector();
194
195        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
196        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
197        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
198        if (testProps.containsKey(TEST_UPGRADE)) {
199            // create "old" tables
200            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
201        }
202
203        String schemaName = dialect.getConnectionSchema(connection);
204        DatabaseMetaData metadata = connection.getMetaData();
205        Set<String> tableNames = findTableNames(metadata, schemaName);
206        Database database = sqlInfo.getDatabase();
207        Map<String, List<Column>> added = new HashMap<String, List<Column>>();
208
209        for (Table table : database.getTables()) {
210            String tableName = getTableName(table.getPhysicalName());
211            if (!tableNames.contains(tableName.toUpperCase())) {
212
213                /*
214                 * Create missing table.
215                 */
216
217                ddlCollector.add(table.getCreateSql());
218                ddlCollector.addAll(table.getPostCreateSqls(model));
219
220                added.put(table.getKey(), null); // null = table created
221
222                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
223
224            } else {
225
226                /*
227                 * Get existing columns.
228                 */
229
230                Map<String, Integer> columnTypes = new HashMap<String, Integer>();
231                Map<String, String> columnTypeNames = new HashMap<String, String>();
232                Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>();
233                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
234                    while (rs.next()) {
235                        String schema = rs.getString("TABLE_SCHEM");
236                        if (schema != null) { // null for MySQL, doh!
237                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
238                                // H2 returns some system tables (locks)
239                                continue;
240                            }
241                        }
242                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
243                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
244                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
245                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
246                    }
247                }
248
249                /*
250                 * Update types and create missing columns.
251                 */
252
253                List<Column> addedColumns = new LinkedList<Column>();
254                for (Column column : table.getColumns()) {
255                    String upperName = column.getPhysicalName().toUpperCase();
256                    Integer type = columnTypes.remove(upperName);
257                    if (type == null) {
258                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
259                        ddlCollector.add(table.getAddColumnSql(column));
260                        ddlCollector.addAll(table.getPostAddSqls(column, model));
261                        addedColumns.add(column);
262                    } else {
263                        int expected = column.getJdbcType();
264                        int actual = type.intValue();
265                        String actualName = columnTypeNames.get(upperName);
266                        Integer actualSize = columnTypeSizes.get(upperName);
267                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
268                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
269                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName,
270                                    actualSize));
271                        }
272                    }
273                }
274                for (String col : dialect.getIgnoredColumns(table)) {
275                    columnTypes.remove(col.toUpperCase());
276                }
277                if (!columnTypes.isEmpty()) {
278                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
279                            + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", "));
280                }
281                if (!addedColumns.isEmpty()) {
282                    if (added.containsKey(table.getKey())) {
283                        throw new AssertionError();
284                    }
285                    added.put(table.getKey(), addedColumns);
286                }
287            }
288        }
289
290        if (testProps.containsKey(TEST_UPGRADE)) {
291            // create "old" content in tables
292            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
293        }
294
295        // run upgrade for each table if added columns or test
296        for (Entry<String, List<Column>> en : added.entrySet()) {
297            List<Column> addedColumns = en.getValue();
298            String tableKey = en.getKey();
299            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
300        }
301
302        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
303        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
304
305        // aclr_permission check for PostgreSQL
306        dialect.performAdditionalStatements(connection);
307
308        /*
309         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
310         */
311
312        // ddlMode may be:
313        // ignore (not treated here, nothing done)
314        // dump (implies execute)
315        // dump,execute
316        // dump,ignore (no execute)
317        // execute
318        // abort (implies dump)
319        // compat can be used instead of execute to always recreate stored procedures
320
321        List<String> ddl = ddlCollector.getStrings();
322        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
323        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
324        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
325        if (dump || abort) {
326
327            /*
328             * Dump DDL if not empty.
329             */
330
331            if (!ddl.isEmpty()) {
332                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
333                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
334                    for (String sql : dialect.getDumpStart()) {
335                        ps.println(sql);
336                    }
337                    for (String sql : ddl) {
338                        sql = sql.trim();
339                        if (sql.endsWith(";")) {
340                            sql = sql.substring(0, sql.length() - 1);
341                        }
342                        ps.println(dialect.getSQLForDump(sql));
343                    }
344                    for (String sql : dialect.getDumpStop()) {
345                        ps.println(sql);
346                    }
347                } catch (IOException e) {
348                    throw new NuxeoException(e);
349                }
350
351                /*
352                 * Abort if requested.
353                 */
354
355                if (abort) {
356                    log.error("Dumped DDL to: " + dumpFile);
357                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
358                            + ", DDL must be executed: " + dumpFile);
359                }
360            }
361        }
362        if (!ignore) {
363
364            /*
365             * Execute DDL.
366             */
367
368            try (Statement st = connection.createStatement()) {
369                for (String sql : ddl) {
370                    logger.log(sql.replace("\n", "\n    ")); // indented
371                    try {
372                        st.execute(sql);
373                    } catch (SQLException e) {
374                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
375                    }
376                    countExecute();
377                }
378            }
379
380            /*
381             * Execute post-DDL stuff.
382             */
383
384            try (Statement st = connection.createStatement()) {
385                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
386                    logger.log(sql.replace("\n", "\n    ")); // indented
387                    try {
388                        st.execute(sql);
389                    } catch (SQLException e) {
390                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
391                    }
392                    countExecute();
393                }
394            }
395        }
396    }
397
398    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
399            throws SQLException {
400        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
401    }
402
403    /** Finds uppercase table names. */
404    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
405        Set<String> tableNames = new HashSet<String>();
406        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
407        while (rs.next()) {
408            String tableName = rs.getString("TABLE_NAME");
409            tableNames.add(tableName.toUpperCase());
410        }
411        rs.close();
412        return tableNames;
413    }
414
415    @Override
416    public int getClusterNodeIdType() {
417        return sqlInfo.getClusterNodeIdType();
418    }
419
420    @Override
421    public void createClusterNode(Serializable nodeId) {
422        Calendar now = Calendar.getInstance();
423        try {
424            String sql = sqlInfo.getCreateClusterNodeSql();
425            List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
426            PreparedStatement ps = connection.prepareStatement(sql);
427            try {
428                if (logger.isLogEnabled()) {
429                    logger.logSQL(sql, Arrays.asList(nodeId, now));
430                }
431                columns.get(0).setToPreparedStatement(ps, 1, nodeId);
432                columns.get(1).setToPreparedStatement(ps, 2, now);
433                ps.execute();
434            } finally {
435                closeStatement(ps);
436            }
437        } catch (SQLException e) {
438            throw new NuxeoException(e);
439        }
440    }
441
442    @Override
443    public void removeClusterNode(Serializable nodeId) {
444        try {
445            // delete from cluster_nodes
446            String sql = sqlInfo.getDeleteClusterNodeSql();
447            Column column = sqlInfo.getDeleteClusterNodeColumn();
448            PreparedStatement ps = connection.prepareStatement(sql);
449            try {
450                if (logger.isLogEnabled()) {
451                    logger.logSQL(sql, Arrays.asList(nodeId));
452                }
453                column.setToPreparedStatement(ps, 1, nodeId);
454                ps.execute();
455            } finally {
456                closeStatement(ps);
457            }
458            // delete un-processed invals from cluster_invals
459            deleteClusterInvals(nodeId);
460        } catch (SQLException e) {
461            throw new NuxeoException(e);
462        }
463    }
464
465    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
466        String sql = sqlInfo.getDeleteClusterInvalsSql();
467        Column column = sqlInfo.getDeleteClusterInvalsColumn();
468        PreparedStatement ps = connection.prepareStatement(sql);
469        try {
470            if (logger.isLogEnabled()) {
471                logger.logSQL(sql, Arrays.asList(nodeId));
472            }
473            column.setToPreparedStatement(ps, 1, nodeId);
474            int n = ps.executeUpdate();
475            countExecute();
476            if (logger.isLogEnabled()) {
477                logger.logCount(n);
478            }
479        } finally {
480            try {
481                closeStatement(ps);
482            } catch (SQLException e) {
483                log.error("deleteClusterInvals: " + e.getMessage(), e);
484            }
485        }
486    }
487
488    @Override
489    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
490        String sql = dialect.getClusterInsertInvalidations();
491        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
492        PreparedStatement ps = null;
493        try {
494            ps = connection.prepareStatement(sql);
495            int kind = Invalidations.MODIFIED;
496            while (true) {
497                Set<RowId> rowIds = invalidations.getKindSet(kind);
498
499                // reorganize by id
500                Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>();
501                for (RowId rowId : rowIds) {
502                    Set<String> tableNames = res.get(rowId.id);
503                    if (tableNames == null) {
504                        res.put(rowId.id, tableNames = new HashSet<String>());
505                    }
506                    tableNames.add(rowId.tableName);
507                }
508
509                // do inserts
510                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
511                    Serializable id = en.getKey();
512                    String fragments = join(en.getValue(), ' ');
513                    if (logger.isLogEnabled()) {
514                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
515                    }
516                    Serializable frags;
517                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
518                        frags = fragments.split(" ");
519                    } else {
520                        frags = fragments;
521                    }
522                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
523                    columns.get(1).setToPreparedStatement(ps, 2, id);
524                    columns.get(2).setToPreparedStatement(ps, 3, frags);
525                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
526                    ps.execute();
527                    countExecute();
528                }
529                if (kind == Invalidations.MODIFIED) {
530                    kind = Invalidations.DELETED;
531                } else {
532                    break;
533                }
534            }
535        } catch (SQLException e) {
536            throw new NuxeoException("Could not invalidate", e);
537        } finally {
538            try {
539                closeStatement(ps);
540            } catch (SQLException e) {
541                log.error(e.getMessage(), e);
542            }
543        }
544    }
545
546    // join that works on a set
547    protected static final String join(Collection<String> strings, char sep) {
548        if (strings.isEmpty()) {
549            throw new RuntimeException();
550        }
551        if (strings.size() == 1) {
552            return strings.iterator().next();
553        }
554        int size = 0;
555        for (String word : strings) {
556            size += word.length() + 1;
557        }
558        StringBuilder buf = new StringBuilder(size);
559        for (String word : strings) {
560            buf.append(word);
561            buf.append(sep);
562        }
563        buf.setLength(size - 1);
564        return buf.toString();
565    }
566
567    @Override
568    public Invalidations getClusterInvalidations(Serializable nodeId) {
569        Invalidations invalidations = new Invalidations();
570        String sql = dialect.getClusterGetInvalidations();
571        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
572        try {
573            if (logger.isLogEnabled()) {
574                logger.logSQL(sql, Arrays.asList(nodeId));
575            }
576            PreparedStatement ps = connection.prepareStatement(sql);
577            ResultSet rs = null;
578            try {
579                setToPreparedStatement(ps, 1, nodeId);
580                rs = ps.executeQuery();
581                countExecute();
582                while (rs.next()) {
583                    // first column ignored, it's the node id
584                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
585                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
586                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
587                    String[] fragments;
588                    if (dialect.supportsArrays() && frags instanceof String[]) {
589                        fragments = (String[]) frags;
590                    } else {
591                        fragments = ((String) frags).split(" ");
592                    }
593                    invalidations.add(id, fragments, kind);
594                }
595            } finally {
596                closeStatement(ps, rs);
597            }
598            if (logger.isLogEnabled()) {
599                // logCount(n);
600                logger.log("  -> " + invalidations);
601            }
602            if (dialect.isClusteringDeleteNeeded()) {
603                deleteClusterInvals(nodeId);
604            }
605            return invalidations;
606        } catch (SQLException e) {
607            throw new NuxeoException("Could not invalidate", e);
608        }
609    }
610
611    @Override
612    public Serializable getRootId(String repositoryId) {
613        String sql = sqlInfo.getSelectRootIdSql();
614        try {
615            if (logger.isLogEnabled()) {
616                logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
617            }
618            PreparedStatement ps = connection.prepareStatement(sql);
619            ResultSet rs = null;
620            try {
621                ps.setString(1, repositoryId);
622                rs = ps.executeQuery();
623                countExecute();
624                if (!rs.next()) {
625                    if (logger.isLogEnabled()) {
626                        logger.log("  -> (none)");
627                    }
628                    return null;
629                }
630                Column column = sqlInfo.getSelectRootIdWhatColumn();
631                Serializable id = column.getFromResultSet(rs, 1);
632                if (logger.isLogEnabled()) {
633                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
634                }
635                // check that we didn't get several rows
636                if (rs.next()) {
637                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
638                }
639                return id;
640            } finally {
641                closeStatement(ps, rs);
642            }
643        } catch (SQLException e) {
644            throw new NuxeoException("Could not select: " + sql, e);
645        }
646    }
647
648    @Override
649    public void setRootId(Serializable repositoryId, Serializable id) {
650        String sql = sqlInfo.getInsertRootIdSql();
651        try {
652            PreparedStatement ps = connection.prepareStatement(sql);
653            try {
654                List<Column> columns = sqlInfo.getInsertRootIdColumns();
655                List<Serializable> debugValues = null;
656                if (logger.isLogEnabled()) {
657                    debugValues = new ArrayList<Serializable>(2);
658                }
659                int i = 0;
660                for (Column column : columns) {
661                    i++;
662                    String key = column.getKey();
663                    Serializable v;
664                    if (key.equals(Model.MAIN_KEY)) {
665                        v = id;
666                    } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
667                        v = repositoryId;
668                    } else {
669                        throw new RuntimeException(key);
670                    }
671                    column.setToPreparedStatement(ps, i, v);
672                    if (debugValues != null) {
673                        debugValues.add(v);
674                    }
675                }
676                if (debugValues != null) {
677                    logger.logSQL(sql, debugValues);
678                    debugValues.clear();
679                }
680                ps.execute();
681                countExecute();
682            } finally {
683                closeStatement(ps);
684            }
685        } catch (SQLException e) {
686            throw new NuxeoException("Could not insert: " + sql, e);
687        }
688    }
689
690    protected QueryMaker findQueryMaker(String queryType) {
691        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
692            QueryMaker queryMaker;
693            try {
694                queryMaker = klass.newInstance();
695            } catch (ReflectiveOperationException e) {
696                throw new NuxeoException(e);
697            }
698            if (queryMaker.accepts(queryType)) {
699                return queryMaker;
700            }
701        }
702        return null;
703    }
704
705    protected void prepareUserReadAcls(QueryFilter queryFilter) {
706        String sql = dialect.getPrepareUserReadAclsSql();
707        Serializable principals = queryFilter.getPrincipals();
708        if (sql == null || principals == null) {
709            return;
710        }
711        if (!dialect.supportsArrays()) {
712            principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP);
713        }
714        PreparedStatement ps = null;
715        try {
716            ps = connection.prepareStatement(sql);
717            if (logger.isLogEnabled()) {
718                logger.logSQL(sql, Collections.singleton(principals));
719            }
720            setToPreparedStatement(ps, 1, principals);
721            ps.execute();
722            countExecute();
723        } catch (SQLException e) {
724            throw new NuxeoException("Failed to prepare user read acl cache", e);
725        } finally {
726            try {
727                closeStatement(ps);
728            } catch (SQLException e) {
729                log.error(e.getMessage(), e);
730            }
731        }
732    }
733
734    @Override
735    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
736            boolean countTotal) {
737        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
738    }
739
740    @Override
741    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
742        if (dialect.needsPrepareUserReadAcls()) {
743            prepareUserReadAcls(queryFilter);
744        }
745        QueryMaker queryMaker = findQueryMaker(queryType);
746        if (queryMaker == null) {
747            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
748        }
749        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
750
751        if (q == null) {
752            logger.log("Query cannot return anything due to conflicting clauses");
753            return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0);
754        }
755        long limit = queryFilter.getLimit();
756        long offset = queryFilter.getOffset();
757
758        if (logger.isLogEnabled()) {
759            String sql = q.selectInfo.sql;
760            if (limit != 0) {
761                sql += " -- LIMIT " + limit + " OFFSET " + offset;
762            }
763            if (countUpTo != 0) {
764                sql += " -- COUNT TOTAL UP TO " + countUpTo;
765            }
766            logger.logSQL(sql, q.selectParams);
767        }
768
769        String sql = q.selectInfo.sql;
770
771        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
772            // full result set not needed for counting
773            sql = dialect.addPagingClause(sql, limit, offset);
774            limit = 0;
775            offset = 0;
776        } else if (countUpTo > 0 && dialect.supportsPaging()) {
777            // ask one more row
778            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
779        }
780
781        PreparedStatement ps = null;
782        ResultSet rs = null;
783        try {
784            ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
785            int i = 1;
786            for (Serializable object : q.selectParams) {
787                setToPreparedStatement(ps, i++, object);
788            }
789            rs = ps.executeQuery();
790            countExecute();
791
792            // limit/offset
793            long totalSize = -1;
794            boolean available;
795            if ((limit == 0) || (offset == 0)) {
796                available = rs.first();
797                if (!available) {
798                    totalSize = 0;
799                }
800                if (limit == 0) {
801                    limit = -1; // infinite
802                }
803            } else {
804                available = rs.absolute((int) offset + 1);
805            }
806
807            Column column = q.selectInfo.whatColumns.get(0);
808            List<Serializable> ids = new LinkedList<Serializable>();
809            int rowNum = 0;
810            while (available && (limit != 0)) {
811                Serializable id = column.getFromResultSet(rs, 1);
812                ids.add(id);
813                rowNum = rs.getRow();
814                available = rs.next();
815                limit--;
816            }
817
818            // total size
819            if (countUpTo != 0 && (totalSize == -1)) {
820                if (!available && (rowNum != 0)) {
821                    // last row read was the actual last
822                    totalSize = rowNum;
823                } else {
824                    // available if limit reached with some left
825                    // rowNum == 0 if skipped too far
826                    rs.last();
827                    totalSize = rs.getRow();
828                }
829                if (countUpTo > 0 && totalSize > countUpTo) {
830                    // the result where truncated we don't know the total size
831                    totalSize = -2;
832                }
833            }
834
835            if (logger.isLogEnabled()) {
836                logger.logIds(ids, countUpTo != 0, totalSize);
837            }
838
839            return new PartialList<Serializable>(ids, totalSize);
840        } catch (SQLException e) {
841            throw new NuxeoException("Invalid query: " + query, e);
842        } finally {
843            try {
844                closeStatement(ps, rs);
845            } catch (SQLException e) {
846                log.error("Cannot close connection", e);
847            }
848        }
849    }
850
851    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
852        if (object instanceof Calendar) {
853            Calendar cal = (Calendar) object;
854            ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal);
855        } else if (object instanceof java.sql.Date) {
856            ps.setDate(i, (java.sql.Date) object);
857        } else if (object instanceof Long) {
858            ps.setLong(i, ((Long) object).longValue());
859        } else if (object instanceof WrappedId) {
860            dialect.setId(ps, i, object.toString());
861        } else if (object instanceof Object[]) {
862            int jdbcType;
863            if (object instanceof String[]) {
864                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
865            } else if (object instanceof Boolean[]) {
866                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
867            } else if (object instanceof Long[]) {
868                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
869            } else if (object instanceof Double[]) {
870                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
871            } else if (object instanceof java.sql.Date[]) {
872                jdbcType = Types.DATE;
873            } else if (object instanceof java.sql.Clob[]) {
874                jdbcType = Types.CLOB;
875            } else if (object instanceof Calendar[]) {
876                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
877                object = dialect.getTimestampFromCalendar((Calendar) object);
878            } else if (object instanceof Integer[]) {
879                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
880            } else {
881                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
882            }
883            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
884            ps.setArray(i, array);
885        } else {
886            ps.setObject(i, object);
887        }
888        return i;
889    }
890
891    // queryFilter used for principals and permissions
892    @Override
893    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
894            Object... params) {
895        if (dialect.needsPrepareUserReadAcls()) {
896            prepareUserReadAcls(queryFilter);
897        }
898        QueryMaker queryMaker = findQueryMaker(queryType);
899        if (queryMaker == null) {
900            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
901        }
902        try {
903            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
904        } catch (SQLException e) {
905            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
906        }
907    }
908
909    @Override
910    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
911        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
912        if (select == null) {
913            return getAncestorsIdsIterative(ids);
914        }
915        Serializable whereIds = newIdArray(ids);
916        Set<Serializable> res = new HashSet<Serializable>();
917        PreparedStatement ps = null;
918        ResultSet rs = null;
919        try {
920            if (logger.isLogEnabled()) {
921                logger.logSQL(select.sql, Collections.singleton(whereIds));
922            }
923            Column what = select.whatColumns.get(0);
924            ps = connection.prepareStatement(select.sql);
925            setToPreparedStatementIdArray(ps, 1, whereIds);
926            rs = ps.executeQuery();
927            countExecute();
928            List<Serializable> debugIds = null;
929            if (logger.isLogEnabled()) {
930                debugIds = new LinkedList<Serializable>();
931            }
932            while (rs.next()) {
933                if (dialect.supportsArraysReturnInsteadOfRows()) {
934                    Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
935                    for (Serializable id : resultIds) {
936                        if (id != null) {
937                            res.add(id);
938                            if (logger.isLogEnabled()) {
939                                debugIds.add(id);
940                            }
941                        }
942                    }
943                } else {
944                    Serializable id = what.getFromResultSet(rs, 1);
945                    if (id != null) {
946                        res.add(id);
947                        if (logger.isLogEnabled()) {
948                            debugIds.add(id);
949                        }
950                    }
951                }
952            }
953            if (logger.isLogEnabled()) {
954                logger.logIds(debugIds, false, 0);
955            }
956            return res;
957        } catch (SQLException e) {
958            throw new NuxeoException("Failed to get ancestors ids", e);
959        } finally {
960            try {
961                closeStatement(ps, rs);
962            } catch (SQLException e) {
963                log.error(e.getMessage(), e);
964            }
965        }
966    }
967
968    /**
969     * Uses iterative parentid selection.
970     */
971    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
972        PreparedStatement ps = null;
973        ResultSet rs = null;
974        try {
975            LinkedList<Serializable> todo = new LinkedList<Serializable>(ids);
976            Set<Serializable> done = new HashSet<Serializable>();
977            Set<Serializable> res = new HashSet<Serializable>();
978            while (!todo.isEmpty()) {
979                done.addAll(todo);
980                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
981                if (logger.isLogEnabled()) {
982                    logger.logSQL(select.sql, todo);
983                }
984                Column what = select.whatColumns.get(0);
985                Column where = select.whereColumns.get(0);
986                ps = connection.prepareStatement(select.sql);
987                int i = 1;
988                for (Serializable id : todo) {
989                    where.setToPreparedStatement(ps, i++, id);
990                }
991                rs = ps.executeQuery();
992                countExecute();
993                todo = new LinkedList<Serializable>();
994                List<Serializable> debugIds = null;
995                if (logger.isLogEnabled()) {
996                    debugIds = new LinkedList<Serializable>();
997                }
998                while (rs.next()) {
999                    Serializable id = what.getFromResultSet(rs, 1);
1000                    if (id != null) {
1001                        res.add(id);
1002                        if (!done.contains(id)) {
1003                            todo.add(id);
1004                        }
1005                        if (logger.isLogEnabled()) {
1006                            debugIds.add(id);
1007                        }
1008                    }
1009                }
1010                if (logger.isLogEnabled()) {
1011                    logger.logIds(debugIds, false, 0);
1012                }
1013                rs.close();
1014                ps.close();
1015            }
1016            return res;
1017        } catch (SQLException e) {
1018            throw new NuxeoException("Failed to get ancestors ids", e);
1019        } finally {
1020            try {
1021                closeStatement(ps, rs);
1022            } catch (SQLException e) {
1023                log.error(e.getMessage(), e);
1024            }
1025        }
1026    }
1027
1028    @Override
1029    public void updateReadAcls() {
1030        if (!dialect.supportsReadAcl()) {
1031            return;
1032        }
1033        if (log.isDebugEnabled()) {
1034            log.debug("updateReadAcls: updating");
1035        }
1036        Statement st = null;
1037        try {
1038            st = connection.createStatement();
1039            String sql = dialect.getUpdateReadAclsSql();
1040            if (logger.isLogEnabled()) {
1041                logger.log(sql);
1042            }
1043            st.execute(sql);
1044            countExecute();
1045        } catch (SQLException e) {
1046            throw new NuxeoException("Failed to update read acls", e);
1047        } finally {
1048            try {
1049                closeStatement(st);
1050            } catch (SQLException e) {
1051                log.error(e.getMessage(), e);
1052            }
1053        }
1054        if (log.isDebugEnabled()) {
1055            log.debug("updateReadAcls: done.");
1056        }
1057    }
1058
1059    @Override
1060    public void rebuildReadAcls() {
1061        if (!dialect.supportsReadAcl()) {
1062            return;
1063        }
1064        log.debug("rebuildReadAcls: rebuilding ...");
1065        Statement st = null;
1066        try {
1067            st = connection.createStatement();
1068            String sql = dialect.getRebuildReadAclsSql();
1069            logger.log(sql);
1070            st.execute(sql);
1071            countExecute();
1072        } catch (SQLException e) {
1073            throw new NuxeoException("Failed to rebuild read acls", e);
1074        } finally {
1075            try {
1076                closeStatement(st);
1077            } catch (SQLException e) {
1078                log.error(e.getMessage(), e);
1079            }
1080        }
1081        log.debug("rebuildReadAcls: done.");
1082    }
1083
1084    /*
1085     * ----- Locking -----
1086     */
1087
1088    protected Connection connection(boolean autocommit) {
1089        try {
1090            connection.setAutoCommit(autocommit);
1091        } catch (SQLException e) {
1092            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1093        }
1094        return connection;
1095    }
1096
1097    /**
1098     * Calls the callable, inside a transaction if in cluster mode.
1099     * <p>
1100     * Called under {@link #serializationLock}.
1101     */
1102    protected Lock callInTransaction(LockCallable callable, boolean tx) {
1103        boolean ok = false;
1104        try {
1105            if (log.isDebugEnabled()) {
1106                log.debug("callInTransaction setAutoCommit " + !tx);
1107            }
1108            connection.setAutoCommit(!tx);
1109        } catch (SQLException e) {
1110            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1111        }
1112        try {
1113            Lock result = callable.call();
1114            ok = true;
1115            return result;
1116        } finally {
1117            if (tx) {
1118                try {
1119                    try {
1120                        if (ok) {
1121                            if (log.isDebugEnabled()) {
1122                                log.debug("callInTransaction commit");
1123                            }
1124                            connection.commit();
1125                        } else {
1126                            if (log.isDebugEnabled()) {
1127                                log.debug("callInTransaction rollback");
1128                            }
1129                            connection.rollback();
1130                        }
1131                    } finally {
1132                        // restore autoCommit=true
1133                        if (log.isDebugEnabled()) {
1134                            log.debug("callInTransaction restoring autoCommit=true");
1135                        }
1136                        connection.setAutoCommit(true);
1137                    }
1138                } catch (SQLException e) {
1139                    throw new NuxeoException(e);
1140                }
1141            }
1142        }
1143    }
1144
1145    public interface LockCallable extends Callable<Lock> {
1146        @Override
1147        public Lock call();
1148    }
1149
1150    @Override
1151    public Lock getLock(Serializable id) {
1152        if (log.isDebugEnabled()) {
1153            try {
1154                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1155            } catch (SQLException e) {
1156                throw new RuntimeException(e);
1157            }
1158        }
1159        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1160        Row row = readSimpleRow(rowId);
1161        return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY),
1162                (Calendar) row.get(Model.LOCK_CREATED_KEY));
1163    }
1164
1165    @Override
1166    public Lock setLock(final Serializable id, final Lock lock) {
1167        if (log.isDebugEnabled()) {
1168            log.debug("setLock " + id + " owner=" + lock.getOwner());
1169        }
1170        SetLock call = new SetLock(id, lock);
1171        return callInTransaction(call, clusteringEnabled);
1172    }
1173
1174    protected class SetLock implements LockCallable {
1175        protected final Serializable id;
1176
1177        protected final Lock lock;
1178
1179        protected SetLock(Serializable id, Lock lock) {
1180            super();
1181            this.id = id;
1182            this.lock = lock;
1183        }
1184
1185        @Override
1186        public Lock call() {
1187            Lock oldLock = getLock(id);
1188            if (oldLock == null) {
1189                Row row = new Row(Model.LOCK_TABLE_NAME, id);
1190                row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1191                row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1192                insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1193            }
1194            return oldLock;
1195        }
1196    }
1197
1198    @Override
1199    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1200        if (log.isDebugEnabled()) {
1201            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1202        }
1203        RemoveLock call = new RemoveLock(id, owner, force);
1204        return callInTransaction(call, !force);
1205    }
1206
1207    protected class RemoveLock implements LockCallable {
1208        protected final Serializable id;
1209
1210        protected final String owner;
1211
1212        protected final boolean force;
1213
1214        protected RemoveLock(Serializable id, String owner, boolean force) {
1215            super();
1216            this.id = id;
1217            this.owner = owner;
1218            this.force = force;
1219        }
1220
1221        @Override
1222        public Lock call() {
1223            Lock oldLock = force ? null : getLock(id);
1224            if (!force && owner != null) {
1225                if (oldLock == null) {
1226                    // not locked, nothing to do
1227                    return null;
1228                }
1229                if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1230                    // existing mismatched lock, flag failure
1231                    return new Lock(oldLock, true);
1232                }
1233            }
1234            if (force || oldLock != null) {
1235                deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1236            }
1237            return oldLock;
1238        }
1239    }
1240
1241    @Override
1242    public void markReferencedBinaries() {
1243        log.debug("Starting binaries GC mark");
1244        Statement st = null;
1245        ResultSet rs = null;
1246        BlobManager blobManager = Framework.getService(BlobManager.class);
1247        String repositoryName = getRepositoryName();
1248        try {
1249            st = connection.createStatement();
1250            int i = -1;
1251            for (String sql : sqlInfo.getBinariesSql) {
1252                i++;
1253                Column col = sqlInfo.getBinariesColumns.get(i);
1254                if (logger.isLogEnabled()) {
1255                    logger.log(sql);
1256                }
1257                rs = st.executeQuery(sql);
1258                countExecute();
1259                int n = 0;
1260                while (rs.next()) {
1261                    n++;
1262                    String key = (String) col.getFromResultSet(rs, 1);
1263                    if (key != null) {
1264                        blobManager.markReferencedBinary(key, repositoryName);
1265                    }
1266                }
1267                if (logger.isLogEnabled()) {
1268                    logger.logCount(n);
1269                }
1270                rs.close();
1271            }
1272        } catch (SQLException e) {
1273            throw new RuntimeException("Failed to mark binaries for gC", e);
1274        } finally {
1275            try {
1276                closeStatement(st, rs);
1277            } catch (SQLException e) {
1278                log.error(e.getMessage(), e);
1279            }
1280        }
1281        log.debug("End of binaries GC mark");
1282    }
1283
1284    /*
1285     * ----- XAResource -----
1286     */
1287
1288    protected static String systemToString(Object o) {
1289        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1290    }
1291
1292    @Override
1293    public void start(Xid xid, int flags) throws XAException {
1294        try {
1295            xaresource.start(xid, flags);
1296            if (logger.isLogEnabled()) {
1297                logger.log("XA start on " + systemToString(xid));
1298            }
1299        } catch (NuxeoException e) {
1300            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1301        } catch (XAException e) {
1302            logger.error("XA start error on " + systemToString(xid), e);
1303            throw e;
1304        }
1305    }
1306
1307    @Override
1308    public void end(Xid xid, int flags) throws XAException {
1309        try {
1310            xaresource.end(xid, flags);
1311            if (logger.isLogEnabled()) {
1312                logger.log("XA end on " + systemToString(xid));
1313            }
1314        } catch (NullPointerException e) {
1315            // H2 when no active transaction
1316            logger.error("XA end error on " + systemToString(xid), e);
1317            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1318        } catch (XAException e) {
1319            if (flags != XAResource.TMFAIL) {
1320                logger.error("XA end error on " + systemToString(xid), e);
1321            }
1322            throw e;
1323        }
1324    }
1325
1326    @Override
1327    public int prepare(Xid xid) throws XAException {
1328        try {
1329            return xaresource.prepare(xid);
1330        } catch (XAException e) {
1331            logger.error("XA prepare error on  " + systemToString(xid), e);
1332            throw e;
1333        }
1334    }
1335
1336    @Override
1337    public void commit(Xid xid, boolean onePhase) throws XAException {
1338        try {
1339            xaresource.commit(xid, onePhase);
1340        } catch (XAException e) {
1341            logger.error("XA commit error on  " + systemToString(xid), e);
1342            throw e;
1343        }
1344    }
1345
1346    // rollback interacts with caches so is in RowMapper
1347
1348    @Override
1349    public void forget(Xid xid) throws XAException {
1350        xaresource.forget(xid);
1351    }
1352
1353    @Override
1354    public Xid[] recover(int flag) throws XAException {
1355        return xaresource.recover(flag);
1356    }
1357
1358    @Override
1359    public boolean setTransactionTimeout(int seconds) throws XAException {
1360        return xaresource.setTransactionTimeout(seconds);
1361    }
1362
1363    @Override
1364    public int getTransactionTimeout() throws XAException {
1365        return xaresource.getTransactionTimeout();
1366    }
1367
1368    @Override
1369    public boolean isSameRM(XAResource xares) throws XAException {
1370        throw new UnsupportedOperationException();
1371    }
1372
1373    @Override
1374    public boolean isConnected() {
1375        return connection != null;
1376    }
1377
1378    @Override
1379    public void connect() {
1380        openConnections();
1381    }
1382
1383    @Override
1384    public void disconnect() {
1385        closeConnections();
1386    }
1387
1388}