001/*
002 * Copyright (c) 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     Florent Guillaume
011 *     Benoit Delbosc
012 */
013package org.nuxeo.ecm.core.storage.sql.jdbc;
014
015import java.io.Serializable;
016import java.security.MessageDigest;
017import java.security.NoSuchAlgorithmException;
018import java.sql.Array;
019import java.sql.Connection;
020import java.sql.DatabaseMetaData;
021import java.sql.PreparedStatement;
022import java.sql.ResultSet;
023import java.sql.SQLException;
024import java.sql.Statement;
025import java.sql.Types;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Calendar;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Set;
038import java.util.concurrent.Callable;
039
040import javax.sql.XADataSource;
041import javax.transaction.xa.XAException;
042import javax.transaction.xa.XAResource;
043import javax.transaction.xa.Xid;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.nuxeo.common.utils.StringUtils;
048import org.nuxeo.ecm.core.api.IterableQueryResult;
049import org.nuxeo.ecm.core.api.Lock;
050import org.nuxeo.ecm.core.api.NuxeoException;
051import org.nuxeo.ecm.core.api.PartialList;
052import org.nuxeo.ecm.core.blob.BlobManager;
053import org.nuxeo.ecm.core.model.LockManager;
054import org.nuxeo.ecm.core.query.QueryFilter;
055import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
056import org.nuxeo.ecm.core.storage.sql.ColumnType;
057import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
058import org.nuxeo.ecm.core.storage.sql.Invalidations;
059import org.nuxeo.ecm.core.storage.sql.Mapper;
060import org.nuxeo.ecm.core.storage.sql.Model;
061import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
062import org.nuxeo.ecm.core.storage.sql.Row;
063import org.nuxeo.ecm.core.storage.sql.RowId;
064import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
065import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
066import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
067import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
068import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
069import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
070import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
071import org.nuxeo.runtime.api.Framework;
072
073/**
074 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
075 * computes statements.
076 * <p>
077 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
078 * statements recorded in the {@link SQLInfo}.
079 */
080public class JDBCMapper extends JDBCRowMapper implements Mapper {
081
082    private static final Log log = LogFactory.getLog(JDBCMapper.class);
083
084    public static Map<String, Serializable> testProps = new HashMap<String, Serializable>();
085
086    public static final String TEST_UPGRADE = "testUpgrade";
087
088    // property in sql.txt file
089    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
090
091    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
092
093    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
094
095    public static final String TEST_UPGRADE_FULLTEXT = "testUpgradeFulltext";
096
097    protected TableUpgrader tableUpgrader;
098
099    private final QueryMakerService queryMakerService;
100
101    private final PathResolver pathResolver;
102
103    private final RepositoryImpl repository;
104
105    protected boolean clusteringEnabled;
106
107    /**
108     * Creates a new Mapper.
109     *
110     * @param model the model
111     * @param pathResolver the path resolver (used for startswith queries)
112     * @param sqlInfo the sql info
113     * @param xadatasource the XA datasource to use to get connections
114     * @param clusterInvalidator the cluster invalidator
115     * @param repository
116     */
117    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, XADataSource xadatasource,
118            ClusterInvalidator clusterInvalidator, boolean noSharing, RepositoryImpl repository) {
119        super(model, sqlInfo, xadatasource, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing);
120        this.pathResolver = pathResolver;
121        this.repository = repository;
122        clusteringEnabled = clusterInvalidator != null;
123        queryMakerService = Framework.getService(QueryMakerService.class);
124
125        tableUpgrader = new TableUpgrader(this);
126        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
127                TEST_UPGRADE_VERSIONS);
128        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
129        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
130
131    }
132
133    @Override
134    public int getTableSize(String tableName) {
135        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
136    }
137
138    /*
139     * ----- Root -----
140     */
141
142    @Override
143    public void createDatabase() {
144        try {
145            createTables();
146        } catch (SQLException e) {
147            throw new NuxeoException(e);
148        }
149    }
150
151    protected String getTableName(String origName) {
152
153        if (dialect instanceof DialectOracle) {
154            if (origName.length() > 30) {
155
156                StringBuilder sb = new StringBuilder(origName.length());
157
158                try {
159                    MessageDigest digest = MessageDigest.getInstance("MD5");
160                    sb.append(origName.substring(0, 15));
161                    sb.append('_');
162
163                    digest.update(origName.getBytes());
164                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
165
166                    return sb.toString();
167
168                } catch (NoSuchAlgorithmException e) {
169                    throw new RuntimeException("Error", e);
170                }
171            }
172        }
173
174        return origName;
175    }
176
177    protected void createTables() throws SQLException {
178        sqlInfo.executeSQLStatements(null, this); // for missing category
179        sqlInfo.executeSQLStatements("first", this);
180        sqlInfo.executeSQLStatements("beforeTableCreation", this);
181        if (testProps.containsKey(TEST_UPGRADE)) {
182            // create "old" tables
183            sqlInfo.executeSQLStatements("testUpgrade", this);
184        }
185
186        String schemaName = dialect.getConnectionSchema(connection);
187        DatabaseMetaData metadata = connection.getMetaData();
188        Set<String> tableNames = findTableNames(metadata, schemaName);
189        Database database = sqlInfo.getDatabase();
190        Map<String, List<Column>> added = new HashMap<String, List<Column>>();
191
192        Statement st = null;
193        ResultSet rs = null;
194        try {
195            st = connection.createStatement();
196            for (Table table : database.getTables()) {
197                String tableName = getTableName(table.getPhysicalName());
198                if (tableNames.contains(tableName.toUpperCase())) {
199                    dialect.existingTableDetected(connection, table, model, sqlInfo.database);
200                } else {
201
202                    /*
203                     * Create missing table.
204                     */
205
206                    boolean create = dialect.preCreateTable(connection, table, model, sqlInfo.database);
207                    if (!create) {
208                        log.warn("Creation skipped for table: " + tableName);
209                        continue;
210                    }
211
212                    String sql = table.getCreateSql();
213                    logger.log(sql);
214                    try {
215                        st.execute(sql);
216                        countExecute();
217                    } catch (SQLException e) {
218                        try {
219                            closeStatement(st);
220                        } finally {
221                            throw new SQLException("Error creating table: " + sql + " : " + e.getMessage(), e);
222                        }
223                    }
224
225                    for (String s : table.getPostCreateSqls(model)) {
226                        logger.log(s);
227                        try {
228                            st.execute(s);
229                            countExecute();
230                        } catch (SQLException e) {
231                            throw new SQLException("Error post creating table: " + s + " : " + e.getMessage(), e);
232                        }
233                    }
234                    for (String s : dialect.getPostCreateTableSqls(table, model, sqlInfo.database)) {
235                        logger.log(s);
236                        try {
237                            st.execute(s);
238                            countExecute();
239                        } catch (SQLException e) {
240                            throw new SQLException("Error post creating table: " + s + " : " + e.getMessage(), e);
241                        }
242                    }
243                    added.put(table.getKey(), null); // null = table created
244                }
245
246                /*
247                 * Get existing columns.
248                 */
249
250                rs = metadata.getColumns(null, schemaName, tableName, "%");
251                Map<String, Integer> columnTypes = new HashMap<String, Integer>();
252                Map<String, String> columnTypeNames = new HashMap<String, String>();
253                Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>();
254                while (rs.next()) {
255                    String schema = rs.getString("TABLE_SCHEM");
256                    if (schema != null) { // null for MySQL, doh!
257                        if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
258                            // H2 returns some system tables (locks)
259                            continue;
260                        }
261                    }
262                    String columnName = rs.getString("COLUMN_NAME").toUpperCase();
263                    columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
264                    columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
265                    columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
266                }
267                rs.close();
268                /*
269                 * Update types and create missing columns.
270                 */
271
272                List<Column> addedColumns = new LinkedList<Column>();
273                for (Column column : table.getColumns()) {
274                    String upperName = column.getPhysicalName().toUpperCase();
275                    Integer type = columnTypes.remove(upperName);
276                    if (type == null) {
277                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
278                        String sql = table.getAddColumnSql(column);
279                        logger.log(sql);
280                        try {
281                            st.execute(sql);
282                            countExecute();
283                        } catch (SQLException e) {
284                            throw new SQLException("Error adding column: " + sql + " : " + e.getMessage(), e);
285                        }
286                        for (String s : table.getPostAddSqls(column, model)) {
287                            logger.log(s);
288                            try {
289                                st.execute(s);
290                                countExecute();
291                            } catch (SQLException e) {
292                                throw new SQLException("Error post adding column: " + s + " : " + e.getMessage(), e);
293                            }
294                        }
295                        addedColumns.add(column);
296                    } else {
297                        int expected = column.getJdbcType();
298                        int actual = type.intValue();
299                        String actualName = columnTypeNames.get(upperName);
300                        Integer actualSize = columnTypeSizes.get(upperName);
301                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
302                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
303                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName, actualSize));
304                        }
305                    }
306                }
307                for (String col : dialect.getIgnoredColumns(table)) {
308                    columnTypes.remove(col.toUpperCase());
309                }
310                if (!columnTypes.isEmpty()) {
311                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
312                            + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", "));
313                }
314                if (!addedColumns.isEmpty()) {
315                    if (added.containsKey(table.getKey())) {
316                        throw new AssertionError();
317                    }
318                    added.put(table.getKey(), addedColumns);
319                }
320            }
321        } finally {
322            try {
323                closeStatement(st, rs);
324            } catch (SQLException e) {
325                log.error(e.getMessage(), e);
326            }
327        }
328
329        if (testProps.containsKey(TEST_UPGRADE)) {
330            // create "old" content in tables
331            sqlInfo.executeSQLStatements("testUpgradeOldTables", this);
332        }
333
334        // run upgrade for each table if added columns or test
335        for (Entry<String, List<Column>> en : added.entrySet()) {
336            List<Column> addedColumns = en.getValue();
337            String tableKey = en.getKey();
338            upgradeTable(tableKey, addedColumns);
339        }
340        sqlInfo.executeSQLStatements("afterTableCreation", this);
341        sqlInfo.executeSQLStatements("last", this);
342        dialect.performAdditionalStatements(connection);
343    }
344
345    protected void upgradeTable(String tableKey, List<Column> addedColumns) throws SQLException {
346        tableUpgrader.upgrade(tableKey, addedColumns);
347    }
348
349    /** Finds uppercase table names. */
350    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
351        Set<String> tableNames = new HashSet<String>();
352        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
353        while (rs.next()) {
354            String tableName = rs.getString("TABLE_NAME");
355            tableNames.add(tableName.toUpperCase());
356        }
357        rs.close();
358        return tableNames;
359    }
360
361    @Override
362    public int getClusterNodeIdType() {
363        return sqlInfo.getClusterNodeIdType();
364    }
365
366    @Override
367    public void createClusterNode(Serializable nodeId) {
368        Calendar now = Calendar.getInstance();
369        try {
370            String sql = sqlInfo.getCreateClusterNodeSql();
371            List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
372            PreparedStatement ps = connection.prepareStatement(sql);
373            try {
374                if (logger.isLogEnabled()) {
375                    logger.logSQL(sql, Arrays.asList(nodeId, now));
376                }
377                columns.get(0).setToPreparedStatement(ps, 1, nodeId);
378                columns.get(1).setToPreparedStatement(ps, 2, now);
379                ps.execute();
380            } finally {
381                closeStatement(ps);
382            }
383        } catch (SQLException e) {
384            throw new NuxeoException(e);
385        }
386    }
387
388    @Override
389    public void removeClusterNode(Serializable nodeId) {
390        try {
391            // delete from cluster_nodes
392            String sql = sqlInfo.getDeleteClusterNodeSql();
393            Column column = sqlInfo.getDeleteClusterNodeColumn();
394            PreparedStatement ps = connection.prepareStatement(sql);
395            try {
396                if (logger.isLogEnabled()) {
397                    logger.logSQL(sql, Arrays.asList(nodeId));
398                }
399                column.setToPreparedStatement(ps, 1, nodeId);
400                ps.execute();
401            } finally {
402                closeStatement(ps);
403            }
404            // delete un-processed invals from cluster_invals
405            deleteClusterInvals(nodeId);
406        } catch (SQLException e) {
407            throw new NuxeoException(e);
408        }
409    }
410
411    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
412        String sql = sqlInfo.getDeleteClusterInvalsSql();
413        Column column = sqlInfo.getDeleteClusterInvalsColumn();
414        PreparedStatement ps = connection.prepareStatement(sql);
415        try {
416            if (logger.isLogEnabled()) {
417                logger.logSQL(sql, Arrays.asList(nodeId));
418            }
419            column.setToPreparedStatement(ps, 1, nodeId);
420            int n = ps.executeUpdate();
421            countExecute();
422            if (logger.isLogEnabled()) {
423                logger.logCount(n);
424            }
425        } finally {
426            try {
427                closeStatement(ps);
428            } catch (SQLException e) {
429                log.error("deleteClusterInvals: " + e.getMessage(), e);
430            }
431        }
432    }
433
434    @Override
435    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
436        String sql = dialect.getClusterInsertInvalidations();
437        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
438        PreparedStatement ps = null;
439        try {
440            ps = connection.prepareStatement(sql);
441            int kind = Invalidations.MODIFIED;
442            while (true) {
443                Set<RowId> rowIds = invalidations.getKindSet(kind);
444
445                // reorganize by id
446                Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>();
447                for (RowId rowId : rowIds) {
448                    Set<String> tableNames = res.get(rowId.id);
449                    if (tableNames == null) {
450                        res.put(rowId.id, tableNames = new HashSet<String>());
451                    }
452                    tableNames.add(rowId.tableName);
453                }
454
455                // do inserts
456                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
457                    Serializable id = en.getKey();
458                    String fragments = join(en.getValue(), ' ');
459                    if (logger.isLogEnabled()) {
460                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
461                    }
462                    Serializable frags;
463                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
464                        frags = fragments.split(" ");
465                    } else {
466                        frags = fragments;
467                    }
468                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
469                    columns.get(1).setToPreparedStatement(ps, 2, id);
470                    columns.get(2).setToPreparedStatement(ps, 3, frags);
471                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
472                    ps.execute();
473                    countExecute();
474                }
475                if (kind == Invalidations.MODIFIED) {
476                    kind = Invalidations.DELETED;
477                } else {
478                    break;
479                }
480            }
481        } catch (SQLException e) {
482            throw new NuxeoException("Could not invalidate", e);
483        } finally {
484            try {
485                closeStatement(ps);
486            } catch (SQLException e) {
487                log.error(e.getMessage(), e);
488            }
489        }
490    }
491
492    // join that works on a set
493    protected static final String join(Collection<String> strings, char sep) {
494        if (strings.isEmpty()) {
495            throw new RuntimeException();
496        }
497        if (strings.size() == 1) {
498            return strings.iterator().next();
499        }
500        int size = 0;
501        for (String word : strings) {
502            size += word.length() + 1;
503        }
504        StringBuilder buf = new StringBuilder(size);
505        for (String word : strings) {
506            buf.append(word);
507            buf.append(sep);
508        }
509        buf.setLength(size - 1);
510        return buf.toString();
511    }
512
513    @Override
514    public Invalidations getClusterInvalidations(Serializable nodeId) {
515        Invalidations invalidations = new Invalidations();
516        String sql = dialect.getClusterGetInvalidations();
517        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
518        try {
519            if (logger.isLogEnabled()) {
520                logger.logSQL(sql, Arrays.asList(nodeId));
521            }
522            PreparedStatement ps = connection.prepareStatement(sql);
523            ResultSet rs = null;
524            try {
525                setToPreparedStatement(ps, 1, nodeId);
526                rs = ps.executeQuery();
527                countExecute();
528                while (rs.next()) {
529                    // first column ignored, it's the node id
530                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
531                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
532                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
533                    String[] fragments;
534                    if (dialect.supportsArrays() && frags instanceof String[]) {
535                        fragments = (String[]) frags;
536                    } else {
537                        fragments = ((String) frags).split(" ");
538                    }
539                    invalidations.add(id, fragments, kind);
540                }
541            } finally {
542                closeStatement(ps, rs);
543            }
544            if (logger.isLogEnabled()) {
545                // logCount(n);
546                logger.log("  -> " + invalidations);
547            }
548            if (dialect.isClusteringDeleteNeeded()) {
549                deleteClusterInvals(nodeId);
550            }
551            return invalidations;
552        } catch (SQLException e) {
553            throw new NuxeoException("Could not invalidate", e);
554        }
555    }
556
557    @Override
558    public Serializable getRootId(String repositoryId) {
559        String sql = sqlInfo.getSelectRootIdSql();
560        try {
561            if (logger.isLogEnabled()) {
562                logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
563            }
564            PreparedStatement ps = connection.prepareStatement(sql);
565            ResultSet rs = null;
566            try {
567                ps.setString(1, repositoryId);
568                rs = ps.executeQuery();
569                countExecute();
570                if (!rs.next()) {
571                    if (logger.isLogEnabled()) {
572                        logger.log("  -> (none)");
573                    }
574                    return null;
575                }
576                Column column = sqlInfo.getSelectRootIdWhatColumn();
577                Serializable id = column.getFromResultSet(rs, 1);
578                if (logger.isLogEnabled()) {
579                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
580                }
581                // check that we didn't get several rows
582                if (rs.next()) {
583                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
584                }
585                return id;
586            } finally {
587                closeStatement(ps, rs);
588            }
589        } catch (SQLException e) {
590            throw new NuxeoException("Could not select: " + sql, e);
591        }
592    }
593
594    @Override
595    public void setRootId(Serializable repositoryId, Serializable id) {
596        String sql = sqlInfo.getInsertRootIdSql();
597        try {
598            PreparedStatement ps = connection.prepareStatement(sql);
599            try {
600                List<Column> columns = sqlInfo.getInsertRootIdColumns();
601                List<Serializable> debugValues = null;
602                if (logger.isLogEnabled()) {
603                    debugValues = new ArrayList<Serializable>(2);
604                }
605                int i = 0;
606                for (Column column : columns) {
607                    i++;
608                    String key = column.getKey();
609                    Serializable v;
610                    if (key.equals(Model.MAIN_KEY)) {
611                        v = id;
612                    } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
613                        v = repositoryId;
614                    } else {
615                        throw new RuntimeException(key);
616                    }
617                    column.setToPreparedStatement(ps, i, v);
618                    if (debugValues != null) {
619                        debugValues.add(v);
620                    }
621                }
622                if (debugValues != null) {
623                    logger.logSQL(sql, debugValues);
624                    debugValues.clear();
625                }
626                ps.execute();
627                countExecute();
628            } finally {
629                closeStatement(ps);
630            }
631        } catch (SQLException e) {
632            throw new NuxeoException("Could not insert: " + sql, e);
633        }
634    }
635
636    protected QueryMaker findQueryMaker(String queryType) {
637        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
638            QueryMaker queryMaker;
639            try {
640                queryMaker = klass.newInstance();
641            } catch (ReflectiveOperationException e) {
642                throw new NuxeoException(e);
643            }
644            if (queryMaker.accepts(queryType)) {
645                return queryMaker;
646            }
647        }
648        return null;
649    }
650
651    protected void prepareUserReadAcls(QueryFilter queryFilter) {
652        String sql = dialect.getPrepareUserReadAclsSql();
653        Serializable principals = queryFilter.getPrincipals();
654        if (sql == null || principals == null) {
655            return;
656        }
657        if (!dialect.supportsArrays()) {
658            principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP);
659        }
660        PreparedStatement ps = null;
661        try {
662            ps = connection.prepareStatement(sql);
663            if (logger.isLogEnabled()) {
664                logger.logSQL(sql, Collections.singleton(principals));
665            }
666            setToPreparedStatement(ps, 1, principals);
667            ps.execute();
668            countExecute();
669        } catch (SQLException e) {
670            throw new NuxeoException("Failed to prepare user read acl cache", e);
671        } finally {
672            try {
673                closeStatement(ps);
674            } catch (SQLException e) {
675                log.error(e.getMessage(), e);
676            }
677        }
678    }
679
680    @Override
681    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
682            boolean countTotal) {
683        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
684    }
685
686    @Override
687    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
688        if (dialect.needsPrepareUserReadAcls()) {
689            prepareUserReadAcls(queryFilter);
690        }
691        QueryMaker queryMaker = findQueryMaker(queryType);
692        if (queryMaker == null) {
693            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
694        }
695        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
696
697        if (q == null) {
698            logger.log("Query cannot return anything due to conflicting clauses");
699            return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0);
700        }
701        long limit = queryFilter.getLimit();
702        long offset = queryFilter.getOffset();
703
704        if (logger.isLogEnabled()) {
705            String sql = q.selectInfo.sql;
706            if (limit != 0) {
707                sql += " -- LIMIT " + limit + " OFFSET " + offset;
708            }
709            if (countUpTo != 0) {
710                sql += " -- COUNT TOTAL UP TO " + countUpTo;
711            }
712            logger.logSQL(sql, q.selectParams);
713        }
714
715        String sql = q.selectInfo.sql;
716
717        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
718            // full result set not needed for counting
719            sql = dialect.addPagingClause(sql, limit, offset);
720            limit = 0;
721            offset = 0;
722        } else if (countUpTo > 0 && dialect.supportsPaging()) {
723            // ask one more row
724            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
725        }
726
727        PreparedStatement ps = null;
728        ResultSet rs = null;
729        try {
730            ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
731            int i = 1;
732            for (Serializable object : q.selectParams) {
733                setToPreparedStatement(ps, i++, object);
734            }
735            rs = ps.executeQuery();
736            countExecute();
737
738            // limit/offset
739            long totalSize = -1;
740            boolean available;
741            if ((limit == 0) || (offset == 0)) {
742                available = rs.first();
743                if (!available) {
744                    totalSize = 0;
745                }
746                if (limit == 0) {
747                    limit = -1; // infinite
748                }
749            } else {
750                available = rs.absolute((int) offset + 1);
751            }
752
753            Column column = q.selectInfo.whatColumns.get(0);
754            List<Serializable> ids = new LinkedList<Serializable>();
755            int rowNum = 0;
756            while (available && (limit != 0)) {
757                Serializable id = column.getFromResultSet(rs, 1);
758                ids.add(id);
759                rowNum = rs.getRow();
760                available = rs.next();
761                limit--;
762            }
763
764            // total size
765            if (countUpTo != 0 && (totalSize == -1)) {
766                if (!available && (rowNum != 0)) {
767                    // last row read was the actual last
768                    totalSize = rowNum;
769                } else {
770                    // available if limit reached with some left
771                    // rowNum == 0 if skipped too far
772                    rs.last();
773                    totalSize = rs.getRow();
774                }
775                if (countUpTo > 0 && totalSize > countUpTo) {
776                    // the result where truncated we don't know the total size
777                    totalSize = -2;
778                }
779            }
780
781            if (logger.isLogEnabled()) {
782                logger.logIds(ids, countUpTo != 0, totalSize);
783            }
784
785            return new PartialList<Serializable>(ids, totalSize);
786        } catch (SQLException e) {
787            throw new NuxeoException("Invalid query: " + query, e);
788        } finally {
789            try {
790                closeStatement(ps, rs);
791            } catch (SQLException e) {
792                log.error("Cannot close connection", e);
793            }
794        }
795    }
796
797    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
798        if (object instanceof Calendar) {
799            Calendar cal = (Calendar) object;
800            ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal);
801        } else if (object instanceof java.sql.Date) {
802            ps.setDate(i, (java.sql.Date) object);
803        } else if (object instanceof Long) {
804            ps.setLong(i, ((Long) object).longValue());
805        } else if (object instanceof WrappedId) {
806            dialect.setId(ps, i, object.toString());
807        } else if (object instanceof Object[]) {
808            int jdbcType;
809            if (object instanceof String[]) {
810                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
811            } else if (object instanceof Boolean[]) {
812                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
813            } else if (object instanceof Long[]) {
814                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
815            } else if (object instanceof Double[]) {
816                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
817            } else if (object instanceof java.sql.Date[]) {
818                jdbcType = Types.DATE;
819            } else if (object instanceof java.sql.Clob[]) {
820                jdbcType = Types.CLOB;
821            } else if (object instanceof Calendar[]) {
822                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
823                object = dialect.getTimestampFromCalendar((Calendar) object);
824            } else if (object instanceof Integer[]) {
825                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
826            } else {
827                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
828            }
829            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
830            ps.setArray(i, array);
831        } else {
832            ps.setObject(i, object);
833        }
834        return i;
835    }
836
837    // queryFilter used for principals and permissions
838    @Override
839    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
840            Object... params) {
841        if (dialect.needsPrepareUserReadAcls()) {
842            prepareUserReadAcls(queryFilter);
843        }
844        QueryMaker queryMaker = findQueryMaker(queryType);
845        if (queryMaker == null) {
846            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
847        }
848        try {
849            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
850        } catch (SQLException e) {
851            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
852        }
853    }
854
855    @Override
856    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
857        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
858        if (select == null) {
859            return getAncestorsIdsIterative(ids);
860        }
861        Serializable whereIds = newIdArray(ids);
862        Set<Serializable> res = new HashSet<Serializable>();
863        PreparedStatement ps = null;
864        ResultSet rs = null;
865        try {
866            if (logger.isLogEnabled()) {
867                logger.logSQL(select.sql, Collections.singleton(whereIds));
868            }
869            Column what = select.whatColumns.get(0);
870            ps = connection.prepareStatement(select.sql);
871            setToPreparedStatementIdArray(ps, 1, whereIds);
872            rs = ps.executeQuery();
873            countExecute();
874            List<Serializable> debugIds = null;
875            if (logger.isLogEnabled()) {
876                debugIds = new LinkedList<Serializable>();
877            }
878            while (rs.next()) {
879                if (dialect.supportsArraysReturnInsteadOfRows()) {
880                    Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
881                    for (Serializable id : resultIds) {
882                        if (id != null) {
883                            res.add(id);
884                            if (logger.isLogEnabled()) {
885                                debugIds.add(id);
886                            }
887                        }
888                    }
889                } else {
890                    Serializable id = what.getFromResultSet(rs, 1);
891                    if (id != null) {
892                        res.add(id);
893                        if (logger.isLogEnabled()) {
894                            debugIds.add(id);
895                        }
896                    }
897                }
898            }
899            if (logger.isLogEnabled()) {
900                logger.logIds(debugIds, false, 0);
901            }
902            return res;
903        } catch (SQLException e) {
904            throw new NuxeoException("Failed to get ancestors ids", e);
905        } finally {
906            try {
907                closeStatement(ps, rs);
908            } catch (SQLException e) {
909                log.error(e.getMessage(), e);
910            }
911        }
912    }
913
914    /**
915     * Uses iterative parentid selection.
916     */
917    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
918        PreparedStatement ps = null;
919        ResultSet rs = null;
920        try {
921            LinkedList<Serializable> todo = new LinkedList<Serializable>(ids);
922            Set<Serializable> done = new HashSet<Serializable>();
923            Set<Serializable> res = new HashSet<Serializable>();
924            while (!todo.isEmpty()) {
925                done.addAll(todo);
926                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
927                if (logger.isLogEnabled()) {
928                    logger.logSQL(select.sql, todo);
929                }
930                Column what = select.whatColumns.get(0);
931                Column where = select.whereColumns.get(0);
932                ps = connection.prepareStatement(select.sql);
933                int i = 1;
934                for (Serializable id : todo) {
935                    where.setToPreparedStatement(ps, i++, id);
936                }
937                rs = ps.executeQuery();
938                countExecute();
939                todo = new LinkedList<Serializable>();
940                List<Serializable> debugIds = null;
941                if (logger.isLogEnabled()) {
942                    debugIds = new LinkedList<Serializable>();
943                }
944                while (rs.next()) {
945                    Serializable id = what.getFromResultSet(rs, 1);
946                    if (id != null) {
947                        res.add(id);
948                        if (!done.contains(id)) {
949                            todo.add(id);
950                        }
951                        if (logger.isLogEnabled()) {
952                            debugIds.add(id);
953                        }
954                    }
955                }
956                if (logger.isLogEnabled()) {
957                    logger.logIds(debugIds, false, 0);
958                }
959                rs.close();
960                ps.close();
961            }
962            return res;
963        } catch (SQLException e) {
964            throw new NuxeoException("Failed to get ancestors ids", e);
965        } finally {
966            try {
967                closeStatement(ps, rs);
968            } catch (SQLException e) {
969                log.error(e.getMessage(), e);
970            }
971        }
972    }
973
974    @Override
975    public void updateReadAcls() {
976        if (!dialect.supportsReadAcl()) {
977            return;
978        }
979        if (log.isDebugEnabled()) {
980            log.debug("updateReadAcls: updating");
981        }
982        Statement st = null;
983        try {
984            st = connection.createStatement();
985            String sql = dialect.getUpdateReadAclsSql();
986            if (logger.isLogEnabled()) {
987                logger.log(sql);
988            }
989            st.execute(sql);
990            countExecute();
991        } catch (SQLException e) {
992            throw new NuxeoException("Failed to update read acls", e);
993        } finally {
994            try {
995                closeStatement(st);
996            } catch (SQLException e) {
997                log.error(e.getMessage(), e);
998            }
999        }
1000        if (log.isDebugEnabled()) {
1001            log.debug("updateReadAcls: done.");
1002        }
1003    }
1004
1005    @Override
1006    public void rebuildReadAcls() {
1007        if (!dialect.supportsReadAcl()) {
1008            return;
1009        }
1010        log.debug("rebuildReadAcls: rebuilding ...");
1011        Statement st = null;
1012        try {
1013            st = connection.createStatement();
1014            String sql = dialect.getRebuildReadAclsSql();
1015            logger.log(sql);
1016            st.execute(sql);
1017            countExecute();
1018        } catch (SQLException e) {
1019            throw new NuxeoException("Failed to rebuild read acls", e);
1020        } finally {
1021            try {
1022                closeStatement(st);
1023            } catch (SQLException e) {
1024                log.error(e.getMessage(), e);
1025            }
1026        }
1027        log.debug("rebuildReadAcls: done.");
1028    }
1029
1030    /*
1031     * ----- Locking -----
1032     */
1033
1034    protected Connection connection(boolean autocommit) {
1035        try {
1036            connection.setAutoCommit(autocommit);
1037        } catch (SQLException e) {
1038            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1039        }
1040        return connection;
1041    }
1042
1043    /**
1044     * Calls the callable, inside a transaction if in cluster mode.
1045     * <p>
1046     * Called under {@link #serializationLock}.
1047     */
1048    protected Lock callInTransaction(LockCallable callable, boolean tx) {
1049        boolean ok = false;
1050        try {
1051            if (log.isDebugEnabled()) {
1052                log.debug("callInTransaction setAutoCommit " + !tx);
1053            }
1054            connection.setAutoCommit(!tx);
1055        } catch (SQLException e) {
1056            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1057        }
1058        try {
1059            Lock result = callable.call();
1060            ok = true;
1061            return result;
1062        } finally {
1063            if (tx) {
1064                try {
1065                    try {
1066                        if (ok) {
1067                            if (log.isDebugEnabled()) {
1068                                log.debug("callInTransaction commit");
1069                            }
1070                            connection.commit();
1071                        } else {
1072                            if (log.isDebugEnabled()) {
1073                                log.debug("callInTransaction rollback");
1074                            }
1075                            connection.rollback();
1076                        }
1077                    } finally {
1078                        // restore autoCommit=true
1079                        if (log.isDebugEnabled()) {
1080                            log.debug("callInTransaction restoring autoCommit=true");
1081                        }
1082                        connection.setAutoCommit(true);
1083                    }
1084                } catch (SQLException e) {
1085                    throw new NuxeoException(e);
1086                }
1087            }
1088        }
1089    }
1090
1091    public interface LockCallable extends Callable<Lock> {
1092        @Override
1093        public Lock call();
1094    }
1095
1096    @Override
1097    public Lock getLock(Serializable id) {
1098        if (log.isDebugEnabled()) {
1099            try {
1100                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1101            } catch (SQLException e) {
1102                throw new RuntimeException(e);
1103            }
1104        }
1105        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1106        Row row = readSimpleRow(rowId);
1107        return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY),
1108                (Calendar) row.get(Model.LOCK_CREATED_KEY));
1109    }
1110
1111    @Override
1112    public Lock setLock(final Serializable id, final Lock lock) {
1113        if (log.isDebugEnabled()) {
1114            log.debug("setLock " + id + " owner=" + lock.getOwner());
1115        }
1116        SetLock call = new SetLock(id, lock);
1117        return callInTransaction(call, clusteringEnabled);
1118    }
1119
1120    protected class SetLock implements LockCallable {
1121        protected final Serializable id;
1122
1123        protected final Lock lock;
1124
1125        protected SetLock(Serializable id, Lock lock) {
1126            super();
1127            this.id = id;
1128            this.lock = lock;
1129        }
1130
1131        @Override
1132        public Lock call() {
1133            Lock oldLock = getLock(id);
1134            if (oldLock == null) {
1135                Row row = new Row(Model.LOCK_TABLE_NAME, id);
1136                row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1137                row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1138                insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1139            }
1140            return oldLock;
1141        }
1142    }
1143
1144    @Override
1145    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1146        if (log.isDebugEnabled()) {
1147            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1148        }
1149        RemoveLock call = new RemoveLock(id, owner, force);
1150        return callInTransaction(call, !force);
1151    }
1152
1153    protected class RemoveLock implements LockCallable {
1154        protected final Serializable id;
1155
1156        protected final String owner;
1157
1158        protected final boolean force;
1159
1160        protected RemoveLock(Serializable id, String owner, boolean force) {
1161            super();
1162            this.id = id;
1163            this.owner = owner;
1164            this.force = force;
1165        }
1166
1167        @Override
1168        public Lock call() {
1169            Lock oldLock = force ? null : getLock(id);
1170            if (!force && owner != null) {
1171                if (oldLock == null) {
1172                    // not locked, nothing to do
1173                    return null;
1174                }
1175                if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1176                    // existing mismatched lock, flag failure
1177                    return new Lock(oldLock, true);
1178                }
1179            }
1180            if (force || oldLock != null) {
1181                deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1182            }
1183            return oldLock;
1184        }
1185    }
1186
1187    @Override
1188    public void markReferencedBinaries() {
1189        log.debug("Starting binaries GC mark");
1190        Statement st = null;
1191        ResultSet rs = null;
1192        BlobManager blobManager = Framework.getService(BlobManager.class);
1193        String repositoryName = getRepositoryName();
1194        try {
1195            st = connection.createStatement();
1196            int i = -1;
1197            for (String sql : sqlInfo.getBinariesSql) {
1198                i++;
1199                Column col = sqlInfo.getBinariesColumns.get(i);
1200                if (logger.isLogEnabled()) {
1201                    logger.log(sql);
1202                }
1203                rs = st.executeQuery(sql);
1204                countExecute();
1205                int n = 0;
1206                while (rs.next()) {
1207                    n++;
1208                    String key = (String) col.getFromResultSet(rs, 1);
1209                    if (key != null) {
1210                        blobManager.markReferencedBinary(key, repositoryName);
1211                    }
1212                }
1213                if (logger.isLogEnabled()) {
1214                    logger.logCount(n);
1215                }
1216                rs.close();
1217            }
1218        } catch (SQLException e) {
1219            throw new RuntimeException("Failed to mark binaries for gC", e);
1220        } finally {
1221            try {
1222                closeStatement(st, rs);
1223            } catch (SQLException e) {
1224                log.error(e.getMessage(), e);
1225            }
1226        }
1227        log.debug("End of binaries GC mark");
1228    }
1229
1230    /*
1231     * ----- XAResource -----
1232     */
1233
1234    protected static String systemToString(Object o) {
1235        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1236    }
1237
1238    @Override
1239    public void start(Xid xid, int flags) throws XAException {
1240        try {
1241            xaresource.start(xid, flags);
1242            if (logger.isLogEnabled()) {
1243                logger.log("XA start on " + systemToString(xid));
1244            }
1245        } catch (NuxeoException e) {
1246            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1247        } catch (XAException e) {
1248            logger.error("XA start error on " + systemToString(xid), e);
1249            throw e;
1250        }
1251    }
1252
1253    @Override
1254    public void end(Xid xid, int flags) throws XAException {
1255        try {
1256            xaresource.end(xid, flags);
1257            if (logger.isLogEnabled()) {
1258                logger.log("XA end on " + systemToString(xid));
1259            }
1260        } catch (NullPointerException e) {
1261            // H2 when no active transaction
1262            logger.error("XA end error on " + systemToString(xid), e);
1263            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1264        } catch (XAException e) {
1265            if (flags != XAResource.TMFAIL) {
1266                logger.error("XA end error on " + systemToString(xid), e);
1267            }
1268            throw e;
1269        }
1270    }
1271
1272    @Override
1273    public int prepare(Xid xid) throws XAException {
1274        try {
1275            return xaresource.prepare(xid);
1276        } catch (XAException e) {
1277            logger.error("XA prepare error on  " + systemToString(xid), e);
1278            throw e;
1279        }
1280    }
1281
1282    @Override
1283    public void commit(Xid xid, boolean onePhase) throws XAException {
1284        try {
1285            xaresource.commit(xid, onePhase);
1286        } catch (XAException e) {
1287            logger.error("XA commit error on  " + systemToString(xid), e);
1288            throw e;
1289        }
1290    }
1291
1292    // rollback interacts with caches so is in RowMapper
1293
1294    @Override
1295    public void forget(Xid xid) throws XAException {
1296        xaresource.forget(xid);
1297    }
1298
1299    @Override
1300    public Xid[] recover(int flag) throws XAException {
1301        return xaresource.recover(flag);
1302    }
1303
1304    @Override
1305    public boolean setTransactionTimeout(int seconds) throws XAException {
1306        return xaresource.setTransactionTimeout(seconds);
1307    }
1308
1309    @Override
1310    public int getTransactionTimeout() throws XAException {
1311        return xaresource.getTransactionTimeout();
1312    }
1313
1314    @Override
1315    public boolean isSameRM(XAResource xares) throws XAException {
1316        throw new UnsupportedOperationException();
1317    }
1318
1319    @Override
1320    public boolean isConnected() {
1321        return connection != null;
1322    }
1323
1324    @Override
1325    public void connect() {
1326        openConnections();
1327    }
1328
1329    @Override
1330    public void disconnect() {
1331        closeConnections();
1332    }
1333
1334}