001/*
002 * (C) Copyright 2006-2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
023
024import static org.nuxeo.ecm.core.api.ScrollResultImpl.emptyResult;
025
026import java.io.Serializable;
027import java.sql.Array;
028import java.sql.PreparedStatement;
029import java.sql.ResultSet;
030import java.sql.SQLDataException;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.sql.Types;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Calendar;
037import java.util.Collection;
038import java.util.Collections;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.UUID;
047import java.util.concurrent.ConcurrentHashMap;
048
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.nuxeo.ecm.core.api.ConcurrentUpdateException;
052import org.nuxeo.ecm.core.api.IterableQueryResult;
053import org.nuxeo.ecm.core.api.NuxeoException;
054import org.nuxeo.ecm.core.api.PartialList;
055import org.nuxeo.ecm.core.api.ScrollResult;
056import org.nuxeo.ecm.core.api.ScrollResultImpl;
057import org.nuxeo.ecm.core.blob.DocumentBlobManager;
058import org.nuxeo.ecm.core.query.QueryFilter;
059import org.nuxeo.ecm.core.storage.sql.ColumnType;
060import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
061import org.nuxeo.ecm.core.storage.sql.Mapper;
062import org.nuxeo.ecm.core.storage.sql.Model;
063import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
064import org.nuxeo.ecm.core.storage.sql.RowId;
065import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
066import org.nuxeo.ecm.core.storage.sql.VCSClusterInvalidator;
067import org.nuxeo.ecm.core.storage.sql.VCSInvalidations;
068import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
069import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
070import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
071import org.nuxeo.runtime.api.Framework;
072
073/**
074 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
075 * computes statements.
076 * <p>
077 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
078 * statements recorded in the {@link SQLInfo}.
079 */
080public class JDBCMapper extends JDBCRowMapper implements Mapper {
081
082    private static final Log log = LogFactory.getLog(JDBCMapper.class);
083
084    protected static Map<String, CursorResult> cursorResults = new ConcurrentHashMap<>();
085
086    private final QueryMakerService queryMakerService;
087
088    private final PathResolver pathResolver;
089
090    private final RepositoryImpl repository;
091
092    protected static final String NOSCROLL_ID = "noscroll";
093
094    /**
095     * Creates a new Mapper.
096     *
097     * @param model the model
098     * @param pathResolver the path resolver (used for startswith queries)
099     * @param sqlInfo the sql info
100     * @param clusterInvalidator the cluster invalidator
101     * @param repository the repository
102     */
103    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, VCSClusterInvalidator clusterInvalidator,
104            RepositoryImpl repository) {
105        super(model, sqlInfo, clusterInvalidator, repository.getInvalidationsPropagator());
106        this.pathResolver = pathResolver;
107        this.repository = repository;
108        queryMakerService = Framework.getService(QueryMakerService.class);
109    }
110
111    @Override
112    public void close() {
113        closeConnection();
114    }
115
116    @Override
117    public int getTableSize(String tableName) {
118        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
119    }
120
121    @Override
122    public int getClusterNodeIdType() {
123        return sqlInfo.getClusterNodeIdType();
124    }
125
126    @Override
127    public void createClusterNode(Serializable nodeId) {
128        Calendar now = Calendar.getInstance();
129        String sql = sqlInfo.getCreateClusterNodeSql();
130        List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
131        try (PreparedStatement ps = connection.prepareStatement(sql)) {
132            if (logger.isLogEnabled()) {
133                logger.logSQL(sql, Arrays.asList(nodeId, now));
134            }
135            columns.get(0).setToPreparedStatement(ps, 1, nodeId);
136            columns.get(1).setToPreparedStatement(ps, 2, now);
137            ps.execute();
138
139        } catch (SQLException e) {
140            try {
141                checkConcurrentUpdate(e);
142            } catch (ConcurrentUpdateException cue) {
143                cue.addInfo("Duplicate cluster node with id: " + nodeId
144                        + " (a crashed node must be cleaned up, or the cluster configuration fixed)");
145                throw cue;
146            }
147            throw new NuxeoException(e);
148        }
149    }
150
151    @Override
152    public void removeClusterNode(Serializable nodeId) {
153        // delete from cluster_nodes
154        String sql = sqlInfo.getDeleteClusterNodeSql();
155        Column column = sqlInfo.getDeleteClusterNodeColumn();
156        try (PreparedStatement ps = connection.prepareStatement(sql)) {
157            if (logger.isLogEnabled()) {
158                logger.logSQL(sql, Collections.singletonList(nodeId));
159            }
160            column.setToPreparedStatement(ps, 1, nodeId);
161            ps.execute();
162            // delete un-processed invals from cluster_invals
163            deleteClusterInvals(nodeId);
164        } catch (SQLException e) {
165            throw new NuxeoException(e);
166        }
167    }
168
169    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
170        String sql = sqlInfo.getDeleteClusterInvalsSql();
171        Column column = sqlInfo.getDeleteClusterInvalsColumn();
172        try (PreparedStatement ps = connection.prepareStatement(sql)) {
173            if (logger.isLogEnabled()) {
174                logger.logSQL(sql, Collections.singletonList(nodeId));
175            }
176            column.setToPreparedStatement(ps, 1, nodeId);
177            int n = ps.executeUpdate();
178            countExecute();
179            if (logger.isLogEnabled()) {
180                logger.logCount(n);
181            }
182        }
183    }
184
185    @Override
186    public void insertClusterInvalidations(Serializable nodeId, VCSInvalidations invalidations) {
187        String sql = dialect.getClusterInsertInvalidations();
188        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
189        try (PreparedStatement ps = connection.prepareStatement(sql)) {
190            int kind = VCSInvalidations.MODIFIED;
191            while (true) {
192                Set<RowId> rowIds = invalidations.getKindSet(kind);
193
194                // reorganize by id
195                Map<Serializable, Set<String>> res = new HashMap<>();
196                for (RowId rowId : rowIds) {
197                    Set<String> tableNames = res.get(rowId.id);
198                    if (tableNames == null) {
199                        res.put(rowId.id, tableNames = new HashSet<>());
200                    }
201                    tableNames.add(rowId.tableName);
202                }
203
204                // do inserts
205                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
206                    Serializable id = en.getKey();
207                    String fragments = join(en.getValue(), ' ');
208                    if (logger.isLogEnabled()) {
209                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
210                    }
211                    Serializable frags;
212                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
213                        frags = fragments.split(" ");
214                    } else {
215                        frags = fragments;
216                    }
217                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
218                    columns.get(1).setToPreparedStatement(ps, 2, id);
219                    columns.get(2).setToPreparedStatement(ps, 3, frags);
220                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
221                    ps.execute();
222                    countExecute();
223                }
224                if (kind == VCSInvalidations.MODIFIED) {
225                    kind = VCSInvalidations.DELETED;
226                } else {
227                    break;
228                }
229            }
230        } catch (SQLException e) {
231            throw new NuxeoException("Could not invalidate", e);
232        }
233    }
234
235    // join that works on a set
236    protected static String join(Collection<String> strings, char sep) {
237        if (strings.isEmpty()) {
238            throw new RuntimeException();
239        }
240        if (strings.size() == 1) {
241            return strings.iterator().next();
242        }
243        int size = 0;
244        for (String word : strings) {
245            size += word.length() + 1;
246        }
247        StringBuilder sb = new StringBuilder(size);
248        for (String word : strings) {
249            sb.append(word);
250            sb.append(sep);
251        }
252        sb.setLength(size - 1);
253        return sb.toString();
254    }
255
256    @Override
257    public VCSInvalidations getClusterInvalidations(Serializable nodeId) {
258        VCSInvalidations invalidations = new VCSInvalidations();
259        String sql = dialect.getClusterGetInvalidations();
260        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
261        if (logger.isLogEnabled()) {
262            logger.logSQL(sql, Collections.singletonList(nodeId));
263        }
264        try (PreparedStatement ps = connection.prepareStatement(sql)) {
265            setToPreparedStatement(ps, 1, nodeId);
266            try (ResultSet rs = ps.executeQuery()) {
267                countExecute();
268                while (rs.next()) {
269                    // first column ignored, it's the node id
270                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
271                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
272                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
273                    String[] fragments;
274                    if (dialect.supportsArrays() && frags instanceof String[]) {
275                        fragments = (String[]) frags;
276                    } else {
277                        fragments = ((String) frags).split(" ");
278                    }
279                    invalidations.add(id, fragments, kind);
280                }
281            }
282            if (logger.isLogEnabled()) {
283                // logCount(n);
284                logger.log("  -> " + invalidations);
285            }
286            if (dialect.isClusteringDeleteNeeded()) {
287                deleteClusterInvals(nodeId);
288            }
289            return invalidations;
290        } catch (SQLException e) {
291            throw new NuxeoException("Could not invalidate", e);
292        }
293    }
294
295    @Override
296    public Serializable getRootId(String repositoryId) {
297        String sql = sqlInfo.getSelectRootIdSql();
298        if (logger.isLogEnabled()) {
299            logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
300        }
301        try (PreparedStatement ps = connection.prepareStatement(sql)) {
302            ps.setString(1, repositoryId);
303            try (ResultSet rs = ps.executeQuery()) {
304                countExecute();
305                if (!rs.next()) {
306                    if (logger.isLogEnabled()) {
307                        logger.log("  -> (none)");
308                    }
309                    return null;
310                }
311                Column column = sqlInfo.getSelectRootIdWhatColumn();
312                Serializable id = column.getFromResultSet(rs, 1);
313                if (logger.isLogEnabled()) {
314                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
315                }
316                // check that we didn't get several rows
317                if (rs.next()) {
318                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
319                }
320                return id;
321            }
322        } catch (SQLException e) {
323            throw new NuxeoException("Could not select: " + sql, e);
324        }
325    }
326
327    @Override
328    public void setRootId(Serializable repositoryId, Serializable id) {
329        String sql = sqlInfo.getInsertRootIdSql();
330        try (PreparedStatement ps = connection.prepareStatement(sql)) {
331            List<Column> columns = sqlInfo.getInsertRootIdColumns();
332            List<Serializable> debugValues = null;
333            if (logger.isLogEnabled()) {
334                debugValues = new ArrayList<>(2);
335            }
336            int i = 0;
337            for (Column column : columns) {
338                i++;
339                String key = column.getKey();
340                Serializable v;
341                if (key.equals(Model.MAIN_KEY)) {
342                    v = id;
343                } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
344                    v = repositoryId;
345                } else {
346                    throw new RuntimeException(key);
347                }
348                column.setToPreparedStatement(ps, i, v);
349                if (debugValues != null) {
350                    debugValues.add(v);
351                }
352            }
353            if (debugValues != null) {
354                logger.logSQL(sql, debugValues);
355                debugValues.clear();
356            }
357            ps.execute();
358            countExecute();
359        } catch (SQLException e) {
360            throw new NuxeoException("Could not insert: " + sql, e);
361        }
362    }
363
364    protected QueryMaker findQueryMaker(String queryType) {
365        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
366            QueryMaker queryMaker;
367            try {
368                queryMaker = klass.getDeclaredConstructor().newInstance();
369            } catch (ReflectiveOperationException e) {
370                throw new NuxeoException(e);
371            }
372            if (queryMaker.accepts(queryType)) {
373                return queryMaker;
374            }
375        }
376        return null;
377    }
378
379    protected void prepareUserReadAcls(QueryFilter queryFilter) {
380        String sql = dialect.getPrepareUserReadAclsSql();
381        Serializable principals = queryFilter.getPrincipals();
382        if (sql == null || principals == null) {
383            return;
384        }
385        if (!dialect.supportsArrays()) {
386            principals = String.join(Dialect.ARRAY_SEP, (String[]) principals);
387        }
388        try (PreparedStatement ps = connection.prepareStatement(sql)) {
389            if (logger.isLogEnabled()) {
390                logger.logSQL(sql, Collections.singleton(principals));
391            }
392            setToPreparedStatement(ps, 1, principals);
393            ps.execute();
394            countExecute();
395        } catch (SQLException e) {
396            throw new NuxeoException("Failed to prepare user read acl cache", e);
397        }
398    }
399
400    @Override
401    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
402            boolean countTotal) {
403        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
404    }
405
406    @Override
407    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
408        PartialList<Serializable> result = queryProjection(query, queryType, queryFilter, countUpTo,
409                (info, rs) -> info.whatColumns.get(0).getFromResultSet(rs, 1));
410
411        if (logger.isLogEnabled()) {
412            logger.logIds(result, countUpTo != 0, result.totalSize());
413        }
414
415        return result;
416    }
417
418    // queryFilter used for principals and permissions
419    @Override
420    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
421            boolean distinctDocuments, Object... params) {
422        if (dialect.needsPrepareUserReadAcls()) {
423            prepareUserReadAcls(queryFilter);
424        }
425        QueryMaker queryMaker = findQueryMaker(queryType);
426        if (queryMaker == null) {
427            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
428        }
429        query = computeDistinctDocuments(query, distinctDocuments);
430        try {
431            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
432        } catch (SQLException e) {
433            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e, SC_BAD_REQUEST);
434        }
435    }
436
437    @Override
438    public PartialList<Map<String, Serializable>> queryProjection(String query, String queryType,
439            QueryFilter queryFilter, boolean distinctDocuments, long countUpTo, Object... params) {
440        query = computeDistinctDocuments(query, distinctDocuments);
441        PartialList<Map<String, Serializable>> result = queryProjection(query, queryType, queryFilter, countUpTo,
442                (info, rs) -> info.mapMaker.makeMap(rs), params);
443
444        if (logger.isLogEnabled()) {
445            logger.logMaps(result, countUpTo != 0, result.totalSize());
446        }
447
448        return result;
449    }
450
451    protected String computeDistinctDocuments(String query, boolean distinctDocuments) {
452        if (distinctDocuments) {
453            String q = query.toLowerCase();
454            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
455                // Replace "select" by "select distinct", split at "select ".length() index
456                query = "SELECT DISTINCT " + query.substring(7);
457            }
458        }
459        return query;
460    }
461
462    protected <T> PartialList<T> queryProjection(String query, String queryType, QueryFilter queryFilter,
463            long countUpTo, BiFunctionSQLException<SQLInfoSelect, ResultSet, T> extractor, Object... params) {
464        if (dialect.needsPrepareUserReadAcls()) {
465            prepareUserReadAcls(queryFilter);
466        }
467        QueryMaker queryMaker = findQueryMaker(queryType);
468        if (queryMaker == null) {
469            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
470        }
471        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter, params);
472
473        if (q == null) {
474            logger.log("Query cannot return anything due to conflicting clauses");
475            return new PartialList<>(Collections.emptyList(), 0);
476        }
477        long limit = queryFilter.getLimit();
478        long offset = queryFilter.getOffset();
479
480        if (logger.isLogEnabled()) {
481            String sql = q.selectInfo.sql;
482            if (limit != 0) {
483                sql += " -- LIMIT " + limit + " OFFSET " + offset;
484            }
485            if (countUpTo != 0) {
486                sql += " -- COUNT TOTAL UP TO " + countUpTo;
487            }
488            logger.logSQL(sql, q.selectParams);
489        }
490
491        String sql = q.selectInfo.sql;
492
493        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
494            // full result set not needed for counting
495            sql = dialect.addPagingClause(sql, limit, offset);
496            limit = 0;
497            offset = 0;
498        } else if (countUpTo > 0 && dialect.supportsPaging()) {
499            // ask one more row
500            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
501        }
502
503        try (PreparedStatement ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE,
504                ResultSet.CONCUR_READ_ONLY)) {
505            int i = 1;
506            for (Serializable object : q.selectParams) {
507                setToPreparedStatement(ps, i++, object);
508            }
509            try (ResultSet rs = ps.executeQuery()) {
510                countExecute();
511
512                // limit/offset
513                long totalSize = -1;
514                boolean available;
515                if ((limit == 0) || (offset == 0)) {
516                    available = rs.first();
517                    if (!available) {
518                        totalSize = 0;
519                    }
520                    if (limit == 0) {
521                        limit = -1; // infinite
522                    }
523                } else {
524                    available = rs.absolute((int) offset + 1);
525                }
526
527                List<T> projections = new LinkedList<>();
528                int rowNum = 0;
529                while (available && (limit != 0)) {
530                    try {
531                        T projection = extractor.apply(q.selectInfo, rs);
532                        projections.add(projection);
533                        rowNum = rs.getRow();
534                        available = rs.next();
535                        limit--;
536                    } catch (SQLDataException e) {
537                        // actually no data available, MariaDB Connector/J lied, stop now
538                        available = false;
539                    }
540                }
541
542                // total size
543                if (countUpTo != 0 && (totalSize == -1)) {
544                    if (!available && (rowNum != 0)) {
545                        // last row read was the actual last
546                        totalSize = rowNum;
547                    } else {
548                        // available if limit reached with some left
549                        // rowNum == 0 if skipped too far
550                        rs.last();
551                        totalSize = rs.getRow();
552                    }
553                    if (countUpTo > 0 && totalSize > countUpTo) {
554                        // the result where truncated we don't know the total size
555                        totalSize = -2;
556                    }
557                }
558
559                return new PartialList<>(projections, totalSize);
560            }
561        } catch (SQLException e) {
562            throw new NuxeoException("Invalid query: " + query, e, SC_BAD_REQUEST);
563        }
564    }
565
566    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
567        if (object instanceof Calendar) {
568            dialect.setToPreparedStatementTimestamp(ps, i, object, null);
569        } else if (object instanceof java.sql.Date) {
570            ps.setDate(i, (java.sql.Date) object);
571        } else if (object instanceof Long) {
572            ps.setLong(i, ((Long) object).longValue());
573        } else if (object instanceof WrappedId) {
574            dialect.setId(ps, i, object.toString());
575        } else if (object instanceof Object[]) {
576            int jdbcType;
577            if (object instanceof String[]) {
578                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
579            } else if (object instanceof Boolean[]) {
580                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
581            } else if (object instanceof Long[]) {
582                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
583            } else if (object instanceof Double[]) {
584                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
585            } else if (object instanceof java.sql.Date[]) {
586                jdbcType = Types.DATE;
587            } else if (object instanceof java.sql.Clob[]) {
588                jdbcType = Types.CLOB;
589            } else if (object instanceof Calendar[]) {
590                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
591                object = dialect.getTimestampFromCalendar((Calendar[]) object);
592            } else if (object instanceof Integer[]) {
593                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
594            } else {
595                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
596            }
597            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
598            ps.setArray(i, array);
599        } else {
600            ps.setObject(i, object);
601        }
602        return i;
603    }
604
605    @Override
606    public ScrollResult<String> scroll(String query, int batchSize, int keepAliveSeconds) {
607        if (!dialect.supportsScroll()) {
608            return defaultScroll(query);
609        }
610        checkForTimedoutScroll();
611        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
612        return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds);
613    }
614
615    @Override
616    public ScrollResult<String> scroll(String query, QueryFilter queryFilter, int batchSize, int keepAliveSeconds) {
617        if (!dialect.supportsScroll()) {
618            return defaultScroll(query);
619        }
620        if (dialect.needsPrepareUserReadAcls()) {
621            prepareUserReadAcls(queryFilter);
622        }
623        checkForTimedoutScroll();
624        return scrollSearch(query, queryFilter, batchSize, keepAliveSeconds);
625    }
626
627    protected void checkForTimedoutScroll() {
628        cursorResults.forEach((id, cursor) -> cursor.timedOut(id));
629    }
630
631    @SuppressWarnings("resource") // PreparedStatement + ResultSet for cursor, must not be closed
632    protected ScrollResult<String> scrollSearch(String query, QueryFilter queryFilter, int batchSize,
633            int keepAliveSeconds) {
634        QueryMaker queryMaker = findQueryMaker("NXQL");
635        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
636        if (q == null) {
637            logger.log("Query cannot return anything due to conflicting clauses");
638            throw new NuxeoException("Query cannot return anything due to conflicting clauses");
639        }
640        if (logger.isLogEnabled()) {
641            logger.logSQL(q.selectInfo.sql, q.selectParams);
642        }
643        try {
644            if (connection.getAutoCommit()) {
645                throw new NuxeoException("Scroll should be done inside a transaction");
646            }
647            // ps MUST NOT be auto-closed because it's referenced by a cursor
648            PreparedStatement ps = connection.prepareStatement(q.selectInfo.sql, ResultSet.TYPE_FORWARD_ONLY,
649                    ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
650            ps.setFetchSize(batchSize);
651            int i = 1;
652            for (Serializable object : q.selectParams) {
653                setToPreparedStatement(ps, i++, object);
654            }
655            // rs MUST NOT be auto-closed because it's referenced by a cursor
656            ResultSet rs = ps.executeQuery();
657            String scrollId = UUID.randomUUID().toString();
658            registerCursor(scrollId, ps, rs, batchSize, keepAliveSeconds);
659            return scroll(scrollId);
660        } catch (SQLException e) {
661            throw new NuxeoException("Error on query", e);
662        }
663    }
664
665    protected class CursorResult {
666        protected final int keepAliveSeconds;
667
668        protected final PreparedStatement preparedStatement;
669
670        protected final ResultSet resultSet;
671
672        protected final int batchSize;
673
674        protected long lastCallTimestamp;
675
676        CursorResult(PreparedStatement preparedStatement, ResultSet resultSet, int batchSize, int keepAliveSeconds) {
677            this.preparedStatement = preparedStatement;
678            this.resultSet = resultSet;
679            this.batchSize = batchSize;
680            this.keepAliveSeconds = keepAliveSeconds;
681            lastCallTimestamp = System.currentTimeMillis();
682        }
683
684        boolean timedOut(String scrollId) {
685            long now = System.currentTimeMillis();
686            if (now - lastCallTimestamp > (keepAliveSeconds * 1000)) {
687                if (unregisterCursor(scrollId)) {
688                    log.warn("Scroll " + scrollId + " timed out");
689                }
690                return true;
691            }
692            return false;
693        }
694
695        void touch() {
696            lastCallTimestamp = System.currentTimeMillis();
697        }
698
699        synchronized void close() throws SQLException {
700            if (resultSet != null) {
701                resultSet.close();
702            }
703            if (preparedStatement != null) {
704                preparedStatement.close();
705            }
706        }
707    }
708
709    protected void registerCursor(String scrollId, PreparedStatement ps, ResultSet rs, int batchSize,
710            int keepAliveSeconds) {
711        cursorResults.put(scrollId, new CursorResult(ps, rs, batchSize, keepAliveSeconds));
712    }
713
714    protected boolean unregisterCursor(String scrollId) {
715        CursorResult cursor = cursorResults.remove(scrollId);
716        if (cursor != null) {
717            try {
718                cursor.close();
719                return true;
720            } catch (SQLException e) {
721                log.error("Failed to close cursor for scroll: " + scrollId, e);
722                // do not propagate exception on cleaning
723            }
724        }
725        return false;
726    }
727
728    protected ScrollResult<String> defaultScroll(String query) {
729        // the database has no proper support for cursor just return everything in one batch
730        QueryMaker queryMaker = findQueryMaker("NXQL");
731        List<String> ids;
732        QueryFilter queryFilter = new QueryFilter(null, null, null, null, Collections.emptyList(), 0, 0);
733        try (IterableQueryResult ret = new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this)) {
734            ids = new ArrayList<>((int) ret.size());
735            for (Map<String, Serializable> map : ret) {
736                ids.add(map.get("ecm:uuid").toString());
737            }
738        } catch (SQLException e) {
739            throw new NuxeoException("Invalid scroll query: " + query, e);
740        }
741        return new ScrollResultImpl<>(NOSCROLL_ID, ids);
742    }
743
744    @Override
745    public ScrollResult<String> scroll(String scrollId) {
746        if (NOSCROLL_ID.equals(scrollId) || !dialect.supportsScroll()) {
747            // there is only one batch in this case
748            return emptyResult();
749        }
750        CursorResult cursorResult = cursorResults.get(scrollId);
751        if (cursorResult == null) {
752            throw new NuxeoException("Unknown or timed out scrollId");
753        } else if (cursorResult.timedOut(scrollId)) {
754            throw new NuxeoException("Timed out scrollId");
755        }
756        cursorResult.touch();
757        List<String> ids = new ArrayList<>(cursorResult.batchSize);
758        synchronized (cursorResult) {
759            try {
760                if (cursorResult.resultSet == null || cursorResult.resultSet.isClosed()) {
761                    unregisterCursor(scrollId);
762                    return emptyResult();
763                }
764                while (ids.size() < cursorResult.batchSize) {
765                    if (cursorResult.resultSet.next()) {
766                        ids.add(cursorResult.resultSet.getString(1));
767                    } else {
768                        cursorResult.close();
769                        if (ids.isEmpty()) {
770                            unregisterCursor(scrollId);
771                        }
772                        break;
773                    }
774                }
775            } catch (SQLException e) {
776                throw new NuxeoException("Error during scroll", e);
777            }
778        }
779        return new ScrollResultImpl<>(scrollId, ids);
780    }
781
782    @Override
783    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
784        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
785        if (select == null) {
786            return getAncestorsIdsIterative(ids);
787        }
788        Serializable whereIds = newIdArray(ids);
789        Set<Serializable> res = new HashSet<>();
790        if (logger.isLogEnabled()) {
791            logger.logSQL(select.sql, Collections.singleton(whereIds));
792        }
793        Column what = select.whatColumns.get(0);
794        try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
795            setToPreparedStatementIdArray(ps, 1, whereIds);
796            try (ResultSet rs = ps.executeQuery()) {
797                countExecute();
798                List<Serializable> debugIds = null;
799                if (logger.isLogEnabled()) {
800                    debugIds = new LinkedList<>();
801                }
802                while (rs.next()) {
803                    if (dialect.supportsArraysReturnInsteadOfRows()) {
804                        Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
805                        for (Serializable id : resultIds) {
806                            if (id != null) {
807                                res.add(id);
808                                if (logger.isLogEnabled()) {
809                                    debugIds.add(id);
810                                }
811                            }
812                        }
813                    } else {
814                        Serializable id = what.getFromResultSet(rs, 1);
815                        if (id != null) {
816                            res.add(id);
817                            if (logger.isLogEnabled()) {
818                                debugIds.add(id);
819                            }
820                        }
821                    }
822                }
823                if (logger.isLogEnabled()) {
824                    logger.logIds(debugIds, false, 0);
825                }
826            }
827            return res;
828        } catch (SQLException e) {
829            throw new NuxeoException("Failed to get ancestors ids", e);
830        }
831    }
832
833    /**
834     * Uses iterative parentid selection.
835     */
836    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
837        try {
838            LinkedList<Serializable> todo = new LinkedList<>(ids);
839            Set<Serializable> done = new HashSet<>();
840            Set<Serializable> res = new HashSet<>();
841            while (!todo.isEmpty()) {
842                done.addAll(todo);
843                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
844                if (logger.isLogEnabled()) {
845                    logger.logSQL(select.sql, todo);
846                }
847                Column what = select.whatColumns.get(0);
848                Column where = select.whereColumns.get(0);
849                try (PreparedStatement ps = connection.prepareStatement(select.sql)) {
850                    int i = 1;
851                    for (Serializable id : todo) {
852                        where.setToPreparedStatement(ps, i++, id);
853                    }
854                    try (ResultSet rs = ps.executeQuery()) {
855                        countExecute();
856                        todo = new LinkedList<>();
857                        List<Serializable> debugIds = null;
858                        if (logger.isLogEnabled()) {
859                            debugIds = new LinkedList<>();
860                        }
861                        while (rs.next()) {
862                            Serializable id = what.getFromResultSet(rs, 1);
863                            if (id != null) {
864                                res.add(id);
865                                if (!done.contains(id)) {
866                                    todo.add(id);
867                                }
868                                if (logger.isLogEnabled()) {
869                                    debugIds.add(id); // NOSONAR
870                                }
871                            }
872                        }
873                        if (logger.isLogEnabled()) {
874                            logger.logIds(debugIds, false, 0);
875                        }
876                    }
877                }
878            }
879            return res;
880        } catch (SQLException e) {
881            throw new NuxeoException("Failed to get ancestors ids", e);
882        }
883    }
884
885    @Override
886    public void updateReadAcls() {
887        if (!dialect.supportsReadAcl()) {
888            return;
889        }
890        if (log.isDebugEnabled()) {
891            log.debug("updateReadAcls: updating");
892        }
893        try (Statement st = connection.createStatement()) {
894            String sql = dialect.getUpdateReadAclsSql();
895            if (logger.isLogEnabled()) {
896                logger.log(sql);
897            }
898            st.execute(sql);
899            countExecute();
900        } catch (SQLException e) {
901            checkConcurrentUpdate(e);
902            throw new NuxeoException("Failed to update read acls", e);
903        }
904        if (log.isDebugEnabled()) {
905            log.debug("updateReadAcls: done.");
906        }
907    }
908
909    @Override
910    public void rebuildReadAcls() {
911        if (!dialect.supportsReadAcl()) {
912            return;
913        }
914        log.debug("rebuildReadAcls: rebuilding ...");
915        try (Statement st = connection.createStatement()) {
916            String sql = dialect.getRebuildReadAclsSql();
917            logger.log(sql);
918            st.execute(sql);
919            countExecute();
920        } catch (SQLException e) {
921            throw new NuxeoException("Failed to rebuild read acls", e);
922        }
923        log.debug("rebuildReadAcls: done.");
924    }
925
926    @Override
927    public void markReferencedBinaries() {
928        log.debug("Starting binaries GC mark");
929        DocumentBlobManager blobManager = Framework.getService(DocumentBlobManager.class);
930        String repositoryName = getRepositoryName();
931        try (Statement st = connection.createStatement()) {
932            int i = -1;
933            for (String sql : sqlInfo.getBinariesSql) {
934                i++;
935                Column col = sqlInfo.getBinariesColumns.get(i);
936                if (logger.isLogEnabled()) {
937                    logger.log(sql);
938                }
939                try (ResultSet rs = st.executeQuery(sql)) {
940                    countExecute();
941                    int n = 0;
942                    while (rs.next()) {
943                        n++;
944                        String key = (String) col.getFromResultSet(rs, 1);
945                        if (key != null) {
946                            blobManager.markReferencedBinary(key, repositoryName);
947                        }
948                    }
949                    if (logger.isLogEnabled()) {
950                        logger.logCount(n);
951                    }
952                }
953            }
954        } catch (SQLException e) {
955            throw new RuntimeException("Failed to mark binaries for gC", e);
956        }
957        log.debug("End of binaries GC mark");
958    }
959
960    /**
961     * @since 7.10-HF25, 8.10-HF06, 9.2
962     */
963    @FunctionalInterface
964    protected interface BiFunctionSQLException<T, U, R> {
965
966        /**
967         * Applies this function to the given arguments.
968         *
969         * @param t the first function argument
970         * @param u the second function argument
971         * @return the function result
972         */
973        R apply(T t, U u) throws SQLException;
974
975    }
976
977}