001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 *     Benoit Delbosc
019 */
020package org.nuxeo.ecm.core.storage.sql.jdbc;
021
022import java.io.File;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.OutputStream;
026import java.io.PrintStream;
027import java.io.Serializable;
028import java.security.MessageDigest;
029import java.security.NoSuchAlgorithmException;
030import java.sql.Array;
031import java.sql.Connection;
032import java.sql.DatabaseMetaData;
033import java.sql.PreparedStatement;
034import java.sql.ResultSet;
035import java.sql.SQLException;
036import java.sql.Statement;
037import java.sql.Types;
038import java.text.SimpleDateFormat;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Calendar;
042import java.util.Collection;
043import java.util.Collections;
044import java.util.Date;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.Map.Entry;
051import java.util.Set;
052import java.util.concurrent.Callable;
053
054import javax.sql.XADataSource;
055import javax.transaction.xa.XAException;
056import javax.transaction.xa.XAResource;
057import javax.transaction.xa.Xid;
058
059import org.apache.commons.logging.Log;
060import org.apache.commons.logging.LogFactory;
061import org.nuxeo.common.Environment;
062import org.nuxeo.common.utils.StringUtils;
063import org.nuxeo.ecm.core.api.IterableQueryResult;
064import org.nuxeo.ecm.core.api.Lock;
065import org.nuxeo.ecm.core.api.NuxeoException;
066import org.nuxeo.ecm.core.api.PartialList;
067import org.nuxeo.ecm.core.blob.BlobManager;
068import org.nuxeo.ecm.core.model.LockManager;
069import org.nuxeo.ecm.core.query.QueryFilter;
070import org.nuxeo.ecm.core.storage.sql.ClusterInvalidator;
071import org.nuxeo.ecm.core.storage.sql.ColumnType;
072import org.nuxeo.ecm.core.storage.sql.ColumnType.WrappedId;
073import org.nuxeo.ecm.core.storage.sql.Invalidations;
074import org.nuxeo.ecm.core.storage.sql.Mapper;
075import org.nuxeo.ecm.core.storage.sql.Model;
076import org.nuxeo.ecm.core.storage.sql.RepositoryDescriptor;
077import org.nuxeo.ecm.core.storage.sql.RepositoryImpl;
078import org.nuxeo.ecm.core.storage.sql.Row;
079import org.nuxeo.ecm.core.storage.sql.RowId;
080import org.nuxeo.ecm.core.storage.sql.Session.PathResolver;
081import org.nuxeo.ecm.core.storage.sql.jdbc.SQLInfo.SQLInfoSelect;
082import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
083import org.nuxeo.ecm.core.storage.sql.jdbc.db.Database;
084import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
085import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
086import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.DialectOracle;
087import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.SQLStatement.ListCollector;
088import org.nuxeo.runtime.api.Framework;
089
090/**
091 * A {@link JDBCMapper} maps objects to and from a JDBC database. It is specific to a given database connection, as it
092 * computes statements.
093 * <p>
094 * The {@link JDBCMapper} does the mapping according to the policy defined by a {@link Model}, and generates SQL
095 * statements recorded in the {@link SQLInfo}.
096 */
097public class JDBCMapper extends JDBCRowMapper implements Mapper {
098
099    private static final Log log = LogFactory.getLog(JDBCMapper.class);
100
101    public static Map<String, Serializable> testProps = new HashMap<String, Serializable>();
102
103    public static final String TEST_UPGRADE = "testUpgrade";
104
105    // property in sql.txt file
106    public static final String TEST_UPGRADE_VERSIONS = "testUpgradeVersions";
107
108    public static final String TEST_UPGRADE_LAST_CONTRIBUTOR = "testUpgradeLastContributor";
109
110    public static final String TEST_UPGRADE_LOCKS = "testUpgradeLocks";
111
112    protected TableUpgrader tableUpgrader;
113
114    private final QueryMakerService queryMakerService;
115
116    private final PathResolver pathResolver;
117
118    private final RepositoryImpl repository;
119
120    protected boolean clusteringEnabled;
121
122    /**
123     * Creates a new Mapper.
124     *
125     * @param model the model
126     * @param pathResolver the path resolver (used for startswith queries)
127     * @param sqlInfo the sql info
128     * @param xadatasource the XA datasource to use to get connections
129     * @param clusterInvalidator the cluster invalidator
130     * @param repository
131     */
132    public JDBCMapper(Model model, PathResolver pathResolver, SQLInfo sqlInfo, XADataSource xadatasource,
133            ClusterInvalidator clusterInvalidator, boolean noSharing, RepositoryImpl repository) {
134        super(model, sqlInfo, xadatasource, clusterInvalidator, repository.getInvalidationsPropagator(), noSharing);
135        this.pathResolver = pathResolver;
136        this.repository = repository;
137        clusteringEnabled = clusterInvalidator != null;
138        queryMakerService = Framework.getService(QueryMakerService.class);
139
140        tableUpgrader = new TableUpgrader(this);
141        tableUpgrader.add(Model.VERSION_TABLE_NAME, Model.VERSION_IS_LATEST_KEY, "upgradeVersions",
142                TEST_UPGRADE_VERSIONS);
143        tableUpgrader.add("dublincore", "lastContributor", "upgradeLastContributor", TEST_UPGRADE_LAST_CONTRIBUTOR);
144        tableUpgrader.add(Model.LOCK_TABLE_NAME, Model.LOCK_OWNER_KEY, "upgradeLocks", TEST_UPGRADE_LOCKS);
145
146    }
147
148    @Override
149    public int getTableSize(String tableName) {
150        return sqlInfo.getDatabase().getTable(tableName).getColumns().size();
151    }
152
153    /*
154     * ----- Root -----
155     */
156
157    @Override
158    public void createDatabase(String ddlMode) {
159        try {
160            createTables(ddlMode);
161        } catch (SQLException e) {
162            throw new NuxeoException(e);
163        }
164    }
165
166    protected String getTableName(String origName) {
167
168        if (dialect instanceof DialectOracle) {
169            if (origName.length() > 30) {
170
171                StringBuilder sb = new StringBuilder(origName.length());
172
173                try {
174                    MessageDigest digest = MessageDigest.getInstance("MD5");
175                    sb.append(origName.substring(0, 15));
176                    sb.append('_');
177
178                    digest.update(origName.getBytes());
179                    sb.append(Dialect.toHexString(digest.digest()).substring(0, 12));
180
181                    return sb.toString();
182
183                } catch (NoSuchAlgorithmException e) {
184                    throw new RuntimeException("Error", e);
185                }
186            }
187        }
188
189        return origName;
190    }
191
192    protected void createTables(String ddlMode) throws SQLException {
193        ListCollector ddlCollector = new ListCollector();
194
195        sqlInfo.executeSQLStatements(null, ddlMode, connection, logger, ddlCollector); // for missing category
196        sqlInfo.executeSQLStatements("first", ddlMode, connection, logger, ddlCollector);
197        sqlInfo.executeSQLStatements("beforeTableCreation", ddlMode, connection, logger, ddlCollector);
198        if (testProps.containsKey(TEST_UPGRADE)) {
199            // create "old" tables
200            sqlInfo.executeSQLStatements("testUpgrade", ddlMode, connection, logger, null); // do not collect
201        }
202
203        String schemaName = dialect.getConnectionSchema(connection);
204        DatabaseMetaData metadata = connection.getMetaData();
205        Set<String> tableNames = findTableNames(metadata, schemaName);
206        Database database = sqlInfo.getDatabase();
207        Map<String, List<Column>> added = new HashMap<String, List<Column>>();
208
209        for (Table table : database.getTables()) {
210            String tableName = getTableName(table.getPhysicalName());
211            if (!tableNames.contains(tableName.toUpperCase())) {
212
213                /*
214                 * Create missing table.
215                 */
216
217                ddlCollector.add(table.getCreateSql());
218                ddlCollector.addAll(table.getPostCreateSqls(model));
219
220                added.put(table.getKey(), null); // null = table created
221
222                sqlInfo.sqlStatementsProperties.put("create_table_" + tableName.toLowerCase(), Boolean.TRUE);
223
224            } else {
225
226                /*
227                 * Get existing columns.
228                 */
229
230                Map<String, Integer> columnTypes = new HashMap<String, Integer>();
231                Map<String, String> columnTypeNames = new HashMap<String, String>();
232                Map<String, Integer> columnTypeSizes = new HashMap<String, Integer>();
233                try (ResultSet rs = metadata.getColumns(null, schemaName, tableName, "%")) {
234                    while (rs.next()) {
235                        String schema = rs.getString("TABLE_SCHEM");
236                        if (schema != null) { // null for MySQL, doh!
237                            if ("INFORMATION_SCHEMA".equals(schema.toUpperCase())) {
238                                // H2 returns some system tables (locks)
239                                continue;
240                            }
241                        }
242                        String columnName = rs.getString("COLUMN_NAME").toUpperCase();
243                        columnTypes.put(columnName, Integer.valueOf(rs.getInt("DATA_TYPE")));
244                        columnTypeNames.put(columnName, rs.getString("TYPE_NAME"));
245                        columnTypeSizes.put(columnName, Integer.valueOf(rs.getInt("COLUMN_SIZE")));
246                    }
247                }
248
249                /*
250                 * Update types and create missing columns.
251                 */
252
253                List<Column> addedColumns = new LinkedList<Column>();
254                for (Column column : table.getColumns()) {
255                    String upperName = column.getPhysicalName().toUpperCase();
256                    Integer type = columnTypes.remove(upperName);
257                    if (type == null) {
258                        log.warn("Adding missing column in database: " + column.getFullQuotedName());
259                        ddlCollector.add(table.getAddColumnSql(column));
260                        ddlCollector.addAll(table.getPostAddSqls(column, model));
261                        addedColumns.add(column);
262                    } else {
263                        int expected = column.getJdbcType();
264                        int actual = type.intValue();
265                        String actualName = columnTypeNames.get(upperName);
266                        Integer actualSize = columnTypeSizes.get(upperName);
267                        if (!column.setJdbcType(actual, actualName, actualSize.intValue())) {
268                            log.error(String.format("SQL type mismatch for %s: expected %s, database has %s / %s (%s)",
269                                    column.getFullQuotedName(), Integer.valueOf(expected), type, actualName,
270                                    actualSize));
271                        }
272                    }
273                }
274                for (String col : dialect.getIgnoredColumns(table)) {
275                    columnTypes.remove(col.toUpperCase());
276                }
277                if (!columnTypes.isEmpty()) {
278                    log.warn("Database contains additional unused columns for table " + table.getQuotedName() + ": "
279                            + StringUtils.join(new ArrayList<String>(columnTypes.keySet()), ", "));
280                }
281                if (!addedColumns.isEmpty()) {
282                    if (added.containsKey(table.getKey())) {
283                        throw new AssertionError();
284                    }
285                    added.put(table.getKey(), addedColumns);
286                }
287            }
288        }
289
290        if (testProps.containsKey(TEST_UPGRADE)) {
291            // create "old" content in tables
292            sqlInfo.executeSQLStatements("testUpgradeOldTables", ddlMode, connection, logger, ddlCollector);
293        }
294
295        // run upgrade for each table if added columns or test
296        for (Entry<String, List<Column>> en : added.entrySet()) {
297            List<Column> addedColumns = en.getValue();
298            String tableKey = en.getKey();
299            upgradeTable(tableKey, addedColumns, ddlMode, ddlCollector);
300        }
301
302        sqlInfo.executeSQLStatements("afterTableCreation", ddlMode, connection, logger, ddlCollector);
303        sqlInfo.executeSQLStatements("last", ddlMode, connection, logger, ddlCollector);
304
305        // aclr_permission check for PostgreSQL
306        dialect.performAdditionalStatements(connection);
307
308        /*
309         * Execute all the collected DDL, or dump it if requested, depending on ddlMode
310         */
311
312        // ddlMode may be:
313        // ignore (not treated here, nothing done)
314        // dump (implies execute)
315        // dump,execute
316        // dump,ignore (no execute)
317        // execute
318        // abort (implies dump)
319        // compat can be used instead of execute to always recreate stored procedures
320
321        List<String> ddl = ddlCollector.getStrings();
322        boolean ignore = ddlMode.contains(RepositoryDescriptor.DDL_MODE_IGNORE);
323        boolean dump = ddlMode.contains(RepositoryDescriptor.DDL_MODE_DUMP);
324        boolean abort = ddlMode.contains(RepositoryDescriptor.DDL_MODE_ABORT);
325        if (dump || abort) {
326
327            /*
328             * Dump DDL if not empty.
329             */
330
331            if (!ddl.isEmpty()) {
332                File dumpFile = new File(Environment.getDefault().getLog(), "ddl-vcs-" + repository.getName() + ".sql");
333                try (OutputStream out = new FileOutputStream(dumpFile); PrintStream ps = new PrintStream(out)) {
334                    for (String sql : dialect.getDumpStart()) {
335                        ps.println(sql);
336                    }
337                    for (String sql : ddl) {
338                        sql = sql.trim();
339                        if (sql.endsWith(";")) {
340                            sql = sql.substring(0, sql.length() - 1);
341                        }
342                        ps.println(dialect.getSQLForDump(sql));
343                    }
344                    for (String sql : dialect.getDumpStop()) {
345                        ps.println(sql);
346                    }
347                } catch (IOException e) {
348                    throw new NuxeoException(e);
349                }
350
351                /*
352                 * Abort if requested.
353                 */
354
355                if (abort) {
356                    log.error("Dumped DDL to: " + dumpFile);
357                    throw new NuxeoException("Database initialization failed for: " + repository.getName()
358                            + ", DDL must be executed: " + dumpFile);
359                }
360            }
361        }
362        if (!ignore) {
363
364            /*
365             * Execute DDL.
366             */
367
368            try (Statement st = connection.createStatement()) {
369                for (String sql : ddl) {
370                    logger.log(sql.replace("\n", "\n    ")); // indented
371                    try {
372                        st.execute(sql);
373                    } catch (SQLException e) {
374                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
375                    }
376                    countExecute();
377                }
378            }
379
380            /*
381             * Execute post-DDL stuff.
382             */
383
384            try (Statement st = connection.createStatement()) {
385                for (String sql : dialect.getStartupSqls(model, sqlInfo.database)) {
386                    logger.log(sql.replace("\n", "\n    ")); // indented
387                    try {
388                        st.execute(sql);
389                    } catch (SQLException e) {
390                        throw new SQLException("Error executing: " + sql + " : " + e.getMessage(), e);
391                    }
392                    countExecute();
393                }
394            }
395        }
396    }
397
398    protected void upgradeTable(String tableKey, List<Column> addedColumns, String ddlMode, ListCollector ddlCollector)
399            throws SQLException {
400        tableUpgrader.upgrade(tableKey, addedColumns, ddlMode, ddlCollector);
401    }
402
403    /** Finds uppercase table names. */
404    protected static Set<String> findTableNames(DatabaseMetaData metadata, String schemaName) throws SQLException {
405        Set<String> tableNames = new HashSet<String>();
406        ResultSet rs = metadata.getTables(null, schemaName, "%", new String[] { "TABLE" });
407        while (rs.next()) {
408            String tableName = rs.getString("TABLE_NAME");
409            tableNames.add(tableName.toUpperCase());
410        }
411        rs.close();
412        return tableNames;
413    }
414
415    @Override
416    public int getClusterNodeIdType() {
417        return sqlInfo.getClusterNodeIdType();
418    }
419
420    @Override
421    public void createClusterNode(Serializable nodeId) {
422        Calendar now = Calendar.getInstance();
423        try {
424            String sql = sqlInfo.getCreateClusterNodeSql();
425            List<Column> columns = sqlInfo.getCreateClusterNodeColumns();
426            PreparedStatement ps = connection.prepareStatement(sql);
427            try {
428                if (logger.isLogEnabled()) {
429                    logger.logSQL(sql, Arrays.asList(nodeId, now));
430                }
431                columns.get(0).setToPreparedStatement(ps, 1, nodeId);
432                columns.get(1).setToPreparedStatement(ps, 2, now);
433                ps.execute();
434            } finally {
435                closeStatement(ps);
436            }
437        } catch (SQLException e) {
438            throw new NuxeoException(e);
439        }
440    }
441
442    @Override
443    public void removeClusterNode(Serializable nodeId) {
444        try {
445            // delete from cluster_nodes
446            String sql = sqlInfo.getDeleteClusterNodeSql();
447            Column column = sqlInfo.getDeleteClusterNodeColumn();
448            PreparedStatement ps = connection.prepareStatement(sql);
449            try {
450                if (logger.isLogEnabled()) {
451                    logger.logSQL(sql, Arrays.asList(nodeId));
452                }
453                column.setToPreparedStatement(ps, 1, nodeId);
454                ps.execute();
455            } finally {
456                closeStatement(ps);
457            }
458            // delete un-processed invals from cluster_invals
459            deleteClusterInvals(nodeId);
460        } catch (SQLException e) {
461            throw new NuxeoException(e);
462        }
463    }
464
465    protected void deleteClusterInvals(Serializable nodeId) throws SQLException {
466        String sql = sqlInfo.getDeleteClusterInvalsSql();
467        Column column = sqlInfo.getDeleteClusterInvalsColumn();
468        PreparedStatement ps = connection.prepareStatement(sql);
469        try {
470            if (logger.isLogEnabled()) {
471                logger.logSQL(sql, Arrays.asList(nodeId));
472            }
473            column.setToPreparedStatement(ps, 1, nodeId);
474            int n = ps.executeUpdate();
475            countExecute();
476            if (logger.isLogEnabled()) {
477                logger.logCount(n);
478            }
479        } finally {
480            try {
481                closeStatement(ps);
482            } catch (SQLException e) {
483                log.error("deleteClusterInvals: " + e.getMessage(), e);
484            }
485        }
486    }
487
488    @Override
489    public void insertClusterInvalidations(Serializable nodeId, Invalidations invalidations) {
490        String sql = dialect.getClusterInsertInvalidations();
491        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
492        PreparedStatement ps = null;
493        try {
494            ps = connection.prepareStatement(sql);
495            int kind = Invalidations.MODIFIED;
496            while (true) {
497                Set<RowId> rowIds = invalidations.getKindSet(kind);
498
499                // reorganize by id
500                Map<Serializable, Set<String>> res = new HashMap<Serializable, Set<String>>();
501                for (RowId rowId : rowIds) {
502                    Set<String> tableNames = res.get(rowId.id);
503                    if (tableNames == null) {
504                        res.put(rowId.id, tableNames = new HashSet<String>());
505                    }
506                    tableNames.add(rowId.tableName);
507                }
508
509                // do inserts
510                for (Entry<Serializable, Set<String>> en : res.entrySet()) {
511                    Serializable id = en.getKey();
512                    String fragments = join(en.getValue(), ' ');
513                    if (logger.isLogEnabled()) {
514                        logger.logSQL(sql, Arrays.<Serializable> asList(nodeId, id, fragments, Long.valueOf(kind)));
515                    }
516                    Serializable frags;
517                    if (dialect.supportsArrays() && columns.get(2).getJdbcType() == Types.ARRAY) {
518                        frags = fragments.split(" ");
519                    } else {
520                        frags = fragments;
521                    }
522                    columns.get(0).setToPreparedStatement(ps, 1, nodeId);
523                    columns.get(1).setToPreparedStatement(ps, 2, id);
524                    columns.get(2).setToPreparedStatement(ps, 3, frags);
525                    columns.get(3).setToPreparedStatement(ps, 4, Long.valueOf(kind));
526                    ps.execute();
527                    countExecute();
528                }
529                if (kind == Invalidations.MODIFIED) {
530                    kind = Invalidations.DELETED;
531                } else {
532                    break;
533                }
534            }
535        } catch (SQLException e) {
536            throw new NuxeoException("Could not invalidate", e);
537        } finally {
538            try {
539                closeStatement(ps);
540            } catch (SQLException e) {
541                log.error(e.getMessage(), e);
542            }
543        }
544    }
545
546    // join that works on a set
547    protected static final String join(Collection<String> strings, char sep) {
548        if (strings.isEmpty()) {
549            throw new RuntimeException();
550        }
551        if (strings.size() == 1) {
552            return strings.iterator().next();
553        }
554        int size = 0;
555        for (String word : strings) {
556            size += word.length() + 1;
557        }
558        StringBuilder buf = new StringBuilder(size);
559        for (String word : strings) {
560            buf.append(word);
561            buf.append(sep);
562        }
563        buf.setLength(size - 1);
564        return buf.toString();
565    }
566
567    @Override
568    public Invalidations getClusterInvalidations(Serializable nodeId) {
569        Invalidations invalidations = new Invalidations();
570        String sql = dialect.getClusterGetInvalidations();
571        List<Column> columns = sqlInfo.getClusterInvalidationsColumns();
572        try {
573            if (logger.isLogEnabled()) {
574                logger.logSQL(sql, Arrays.asList(nodeId));
575            }
576            PreparedStatement ps = connection.prepareStatement(sql);
577            ResultSet rs = null;
578            try {
579                setToPreparedStatement(ps, 1, nodeId);
580                rs = ps.executeQuery();
581                countExecute();
582                while (rs.next()) {
583                    // first column ignored, it's the node id
584                    Serializable id = columns.get(1).getFromResultSet(rs, 1);
585                    Serializable frags = columns.get(2).getFromResultSet(rs, 2);
586                    int kind = ((Long) columns.get(3).getFromResultSet(rs, 3)).intValue();
587                    String[] fragments;
588                    if (dialect.supportsArrays() && frags instanceof String[]) {
589                        fragments = (String[]) frags;
590                    } else {
591                        fragments = ((String) frags).split(" ");
592                    }
593                    invalidations.add(id, fragments, kind);
594                }
595            } finally {
596                closeStatement(ps, rs);
597            }
598            if (logger.isLogEnabled()) {
599                // logCount(n);
600                logger.log("  -> " + invalidations);
601            }
602            if (dialect.isClusteringDeleteNeeded()) {
603                deleteClusterInvals(nodeId);
604            }
605            return invalidations;
606        } catch (SQLException e) {
607            throw new NuxeoException("Could not invalidate", e);
608        }
609    }
610
611    @Override
612    public Serializable getRootId(String repositoryId) {
613        String sql = sqlInfo.getSelectRootIdSql();
614        try {
615            if (logger.isLogEnabled()) {
616                logger.logSQL(sql, Collections.<Serializable> singletonList(repositoryId));
617            }
618            PreparedStatement ps = connection.prepareStatement(sql);
619            ResultSet rs = null;
620            try {
621                ps.setString(1, repositoryId);
622                rs = ps.executeQuery();
623                countExecute();
624                if (!rs.next()) {
625                    if (logger.isLogEnabled()) {
626                        logger.log("  -> (none)");
627                    }
628                    return null;
629                }
630                Column column = sqlInfo.getSelectRootIdWhatColumn();
631                Serializable id = column.getFromResultSet(rs, 1);
632                if (logger.isLogEnabled()) {
633                    logger.log("  -> " + Model.MAIN_KEY + '=' + id);
634                }
635                // check that we didn't get several rows
636                if (rs.next()) {
637                    throw new NuxeoException("Row query for " + repositoryId + " returned several rows: " + sql);
638                }
639                return id;
640            } finally {
641                closeStatement(ps, rs);
642            }
643        } catch (SQLException e) {
644            throw new NuxeoException("Could not select: " + sql, e);
645        }
646    }
647
648    @Override
649    public void setRootId(Serializable repositoryId, Serializable id) {
650        String sql = sqlInfo.getInsertRootIdSql();
651        try {
652            PreparedStatement ps = connection.prepareStatement(sql);
653            try {
654                List<Column> columns = sqlInfo.getInsertRootIdColumns();
655                List<Serializable> debugValues = null;
656                if (logger.isLogEnabled()) {
657                    debugValues = new ArrayList<Serializable>(2);
658                }
659                int i = 0;
660                for (Column column : columns) {
661                    i++;
662                    String key = column.getKey();
663                    Serializable v;
664                    if (key.equals(Model.MAIN_KEY)) {
665                        v = id;
666                    } else if (key.equals(Model.REPOINFO_REPONAME_KEY)) {
667                        v = repositoryId;
668                    } else {
669                        throw new RuntimeException(key);
670                    }
671                    column.setToPreparedStatement(ps, i, v);
672                    if (debugValues != null) {
673                        debugValues.add(v);
674                    }
675                }
676                if (debugValues != null) {
677                    logger.logSQL(sql, debugValues);
678                    debugValues.clear();
679                }
680                ps.execute();
681                countExecute();
682            } finally {
683                closeStatement(ps);
684            }
685        } catch (SQLException e) {
686            throw new NuxeoException("Could not insert: " + sql, e);
687        }
688    }
689
690    protected QueryMaker findQueryMaker(String queryType) {
691        for (Class<? extends QueryMaker> klass : queryMakerService.getQueryMakers()) {
692            QueryMaker queryMaker;
693            try {
694                queryMaker = klass.newInstance();
695            } catch (ReflectiveOperationException e) {
696                throw new NuxeoException(e);
697            }
698            if (queryMaker.accepts(queryType)) {
699                return queryMaker;
700            }
701        }
702        return null;
703    }
704
705    protected void prepareUserReadAcls(QueryFilter queryFilter) {
706        String sql = dialect.getPrepareUserReadAclsSql();
707        Serializable principals = queryFilter.getPrincipals();
708        if (sql == null || principals == null) {
709            return;
710        }
711        if (!dialect.supportsArrays()) {
712            principals = StringUtils.join((String[]) principals, Dialect.ARRAY_SEP);
713        }
714        PreparedStatement ps = null;
715        try {
716            ps = connection.prepareStatement(sql);
717            if (logger.isLogEnabled()) {
718                logger.logSQL(sql, Collections.singleton(principals));
719            }
720            setToPreparedStatement(ps, 1, principals);
721            ps.execute();
722            countExecute();
723        } catch (SQLException e) {
724            throw new NuxeoException("Failed to prepare user read acl cache", e);
725        } finally {
726            try {
727                closeStatement(ps);
728            } catch (SQLException e) {
729                log.error(e.getMessage(), e);
730            }
731        }
732    }
733
734    @Override
735    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter,
736            boolean countTotal) {
737        return query(query, queryType, queryFilter, countTotal ? -1 : 0);
738    }
739
740    @Override
741    public PartialList<Serializable> query(String query, String queryType, QueryFilter queryFilter, long countUpTo) {
742        if (dialect.needsPrepareUserReadAcls()) {
743            prepareUserReadAcls(queryFilter);
744        }
745        QueryMaker queryMaker = findQueryMaker(queryType);
746        if (queryMaker == null) {
747            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
748        }
749        QueryMaker.Query q = queryMaker.buildQuery(sqlInfo, model, pathResolver, query, queryFilter);
750
751        if (q == null) {
752            logger.log("Query cannot return anything due to conflicting clauses");
753            return new PartialList<Serializable>(Collections.<Serializable> emptyList(), 0);
754        }
755        long limit = queryFilter.getLimit();
756        long offset = queryFilter.getOffset();
757
758        if (logger.isLogEnabled()) {
759            String sql = q.selectInfo.sql;
760            if (limit != 0) {
761                sql += " -- LIMIT " + limit + " OFFSET " + offset;
762            }
763            if (countUpTo != 0) {
764                sql += " -- COUNT TOTAL UP TO " + countUpTo;
765            }
766            logger.logSQL(sql, q.selectParams);
767        }
768
769        String sql = q.selectInfo.sql;
770
771        if (countUpTo == 0 && limit > 0 && dialect.supportsPaging()) {
772            // full result set not needed for counting
773            sql = dialect.addPagingClause(sql, limit, offset);
774            limit = 0;
775            offset = 0;
776        } else if (countUpTo > 0 && dialect.supportsPaging()) {
777            // ask one more row
778            sql = dialect.addPagingClause(sql, Math.max(countUpTo + 1, limit + offset), 0);
779        }
780
781        PreparedStatement ps = null;
782        ResultSet rs = null;
783        try {
784            ps = connection.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
785            int i = 1;
786            for (Serializable object : q.selectParams) {
787                setToPreparedStatement(ps, i++, object);
788            }
789            rs = ps.executeQuery();
790            countExecute();
791
792            // limit/offset
793            long totalSize = -1;
794            boolean available;
795            if ((limit == 0) || (offset == 0)) {
796                available = rs.first();
797                if (!available) {
798                    totalSize = 0;
799                }
800                if (limit == 0) {
801                    limit = -1; // infinite
802                }
803            } else {
804                available = rs.absolute((int) offset + 1);
805            }
806
807            Column column = q.selectInfo.whatColumns.get(0);
808            List<Serializable> ids = new LinkedList<Serializable>();
809            int rowNum = 0;
810            while (available && (limit != 0)) {
811                Serializable id = column.getFromResultSet(rs, 1);
812                ids.add(id);
813                rowNum = rs.getRow();
814                available = rs.next();
815                limit--;
816            }
817
818            // total size
819            if (countUpTo != 0 && (totalSize == -1)) {
820                if (!available && (rowNum != 0)) {
821                    // last row read was the actual last
822                    totalSize = rowNum;
823                } else {
824                    // available if limit reached with some left
825                    // rowNum == 0 if skipped too far
826                    rs.last();
827                    totalSize = rs.getRow();
828                }
829                if (countUpTo > 0 && totalSize > countUpTo) {
830                    // the result where truncated we don't know the total size
831                    totalSize = -2;
832                }
833            }
834
835            if (logger.isLogEnabled()) {
836                logger.logIds(ids, countUpTo != 0, totalSize);
837            }
838
839            return new PartialList<Serializable>(ids, totalSize);
840        } catch (SQLException e) {
841            throw new NuxeoException("Invalid query: " + query, e);
842        } finally {
843            try {
844                closeStatement(ps, rs);
845            } catch (SQLException e) {
846                log.error("Cannot close connection", e);
847            }
848        }
849    }
850
851    public int setToPreparedStatement(PreparedStatement ps, int i, Serializable object) throws SQLException {
852        if (object instanceof Calendar) {
853            Calendar cal = (Calendar) object;
854            ps.setTimestamp(i, dialect.getTimestampFromCalendar(cal), cal);
855        } else if (object instanceof java.sql.Date) {
856            ps.setDate(i, (java.sql.Date) object);
857        } else if (object instanceof Long) {
858            ps.setLong(i, ((Long) object).longValue());
859        } else if (object instanceof WrappedId) {
860            dialect.setId(ps, i, object.toString());
861        } else if (object instanceof Object[]) {
862            int jdbcType;
863            if (object instanceof String[]) {
864                jdbcType = dialect.getJDBCTypeAndString(ColumnType.STRING).jdbcType;
865            } else if (object instanceof Boolean[]) {
866                jdbcType = dialect.getJDBCTypeAndString(ColumnType.BOOLEAN).jdbcType;
867            } else if (object instanceof Long[]) {
868                jdbcType = dialect.getJDBCTypeAndString(ColumnType.LONG).jdbcType;
869            } else if (object instanceof Double[]) {
870                jdbcType = dialect.getJDBCTypeAndString(ColumnType.DOUBLE).jdbcType;
871            } else if (object instanceof java.sql.Date[]) {
872                jdbcType = Types.DATE;
873            } else if (object instanceof java.sql.Clob[]) {
874                jdbcType = Types.CLOB;
875            } else if (object instanceof Calendar[]) {
876                jdbcType = dialect.getJDBCTypeAndString(ColumnType.TIMESTAMP).jdbcType;
877                object = dialect.getTimestampFromCalendar((Calendar) object);
878            } else if (object instanceof Integer[]) {
879                jdbcType = dialect.getJDBCTypeAndString(ColumnType.INTEGER).jdbcType;
880            } else {
881                jdbcType = dialect.getJDBCTypeAndString(ColumnType.CLOB).jdbcType;
882            }
883            Array array = dialect.createArrayOf(jdbcType, (Object[]) object, connection);
884            ps.setArray(i, array);
885        } else {
886            ps.setObject(i, object);
887        }
888        return i;
889    }
890
891    // queryFilter used for principals and permissions
892    @Override
893    public IterableQueryResult queryAndFetch(String query, String queryType, QueryFilter queryFilter,
894            boolean distinctDocuments, Object... params) {
895        if (dialect.needsPrepareUserReadAcls()) {
896            prepareUserReadAcls(queryFilter);
897        }
898        QueryMaker queryMaker = findQueryMaker(queryType);
899        if (queryMaker == null) {
900            throw new NuxeoException("No QueryMaker accepts query: " + queryType + ": " + query);
901        }
902        if (distinctDocuments) {
903            String q = query.toLowerCase();
904            if (q.startsWith("select ") && !q.startsWith("select distinct ")) {
905                query = "SELECT DISTINCT " + query.substring("SELECT ".length());
906            }
907        }
908        try {
909            return new ResultSetQueryResult(queryMaker, query, queryFilter, pathResolver, this, params);
910        } catch (SQLException e) {
911            throw new NuxeoException("Invalid query: " + queryType + ": " + query, e);
912        }
913    }
914
915    @Override
916    public Set<Serializable> getAncestorsIds(Collection<Serializable> ids) {
917        SQLInfoSelect select = sqlInfo.getSelectAncestorsIds();
918        if (select == null) {
919            return getAncestorsIdsIterative(ids);
920        }
921        Serializable whereIds = newIdArray(ids);
922        Set<Serializable> res = new HashSet<Serializable>();
923        PreparedStatement ps = null;
924        ResultSet rs = null;
925        try {
926            if (logger.isLogEnabled()) {
927                logger.logSQL(select.sql, Collections.singleton(whereIds));
928            }
929            Column what = select.whatColumns.get(0);
930            ps = connection.prepareStatement(select.sql);
931            setToPreparedStatementIdArray(ps, 1, whereIds);
932            rs = ps.executeQuery();
933            countExecute();
934            List<Serializable> debugIds = null;
935            if (logger.isLogEnabled()) {
936                debugIds = new LinkedList<Serializable>();
937            }
938            while (rs.next()) {
939                if (dialect.supportsArraysReturnInsteadOfRows()) {
940                    Serializable[] resultIds = dialect.getArrayResult(rs.getArray(1));
941                    for (Serializable id : resultIds) {
942                        if (id != null) {
943                            res.add(id);
944                            if (logger.isLogEnabled()) {
945                                debugIds.add(id);
946                            }
947                        }
948                    }
949                } else {
950                    Serializable id = what.getFromResultSet(rs, 1);
951                    if (id != null) {
952                        res.add(id);
953                        if (logger.isLogEnabled()) {
954                            debugIds.add(id);
955                        }
956                    }
957                }
958            }
959            if (logger.isLogEnabled()) {
960                logger.logIds(debugIds, false, 0);
961            }
962            return res;
963        } catch (SQLException e) {
964            throw new NuxeoException("Failed to get ancestors ids", e);
965        } finally {
966            try {
967                closeStatement(ps, rs);
968            } catch (SQLException e) {
969                log.error(e.getMessage(), e);
970            }
971        }
972    }
973
974    /**
975     * Uses iterative parentid selection.
976     */
977    protected Set<Serializable> getAncestorsIdsIterative(Collection<Serializable> ids) {
978        PreparedStatement ps = null;
979        ResultSet rs = null;
980        try {
981            LinkedList<Serializable> todo = new LinkedList<Serializable>(ids);
982            Set<Serializable> done = new HashSet<Serializable>();
983            Set<Serializable> res = new HashSet<Serializable>();
984            while (!todo.isEmpty()) {
985                done.addAll(todo);
986                SQLInfoSelect select = sqlInfo.getSelectParentIds(todo.size());
987                if (logger.isLogEnabled()) {
988                    logger.logSQL(select.sql, todo);
989                }
990                Column what = select.whatColumns.get(0);
991                Column where = select.whereColumns.get(0);
992                ps = connection.prepareStatement(select.sql);
993                int i = 1;
994                for (Serializable id : todo) {
995                    where.setToPreparedStatement(ps, i++, id);
996                }
997                rs = ps.executeQuery();
998                countExecute();
999                todo = new LinkedList<Serializable>();
1000                List<Serializable> debugIds = null;
1001                if (logger.isLogEnabled()) {
1002                    debugIds = new LinkedList<Serializable>();
1003                }
1004                while (rs.next()) {
1005                    Serializable id = what.getFromResultSet(rs, 1);
1006                    if (id != null) {
1007                        res.add(id);
1008                        if (!done.contains(id)) {
1009                            todo.add(id);
1010                        }
1011                        if (logger.isLogEnabled()) {
1012                            debugIds.add(id);
1013                        }
1014                    }
1015                }
1016                if (logger.isLogEnabled()) {
1017                    logger.logIds(debugIds, false, 0);
1018                }
1019                rs.close();
1020                ps.close();
1021            }
1022            return res;
1023        } catch (SQLException e) {
1024            throw new NuxeoException("Failed to get ancestors ids", e);
1025        } finally {
1026            try {
1027                closeStatement(ps, rs);
1028            } catch (SQLException e) {
1029                log.error(e.getMessage(), e);
1030            }
1031        }
1032    }
1033
1034    @Override
1035    public void updateReadAcls() {
1036        if (!dialect.supportsReadAcl()) {
1037            return;
1038        }
1039        if (log.isDebugEnabled()) {
1040            log.debug("updateReadAcls: updating");
1041        }
1042        Statement st = null;
1043        try {
1044            st = connection.createStatement();
1045            String sql = dialect.getUpdateReadAclsSql();
1046            if (logger.isLogEnabled()) {
1047                logger.log(sql);
1048            }
1049            st.execute(sql);
1050            countExecute();
1051        } catch (SQLException e) {
1052            throw new NuxeoException("Failed to update read acls", e);
1053        } finally {
1054            try {
1055                closeStatement(st);
1056            } catch (SQLException e) {
1057                log.error(e.getMessage(), e);
1058            }
1059        }
1060        if (log.isDebugEnabled()) {
1061            log.debug("updateReadAcls: done.");
1062        }
1063    }
1064
1065    @Override
1066    public void rebuildReadAcls() {
1067        if (!dialect.supportsReadAcl()) {
1068            return;
1069        }
1070        log.debug("rebuildReadAcls: rebuilding ...");
1071        Statement st = null;
1072        try {
1073            st = connection.createStatement();
1074            String sql = dialect.getRebuildReadAclsSql();
1075            logger.log(sql);
1076            st.execute(sql);
1077            countExecute();
1078        } catch (SQLException e) {
1079            throw new NuxeoException("Failed to rebuild read acls", e);
1080        } finally {
1081            try {
1082                closeStatement(st);
1083            } catch (SQLException e) {
1084                log.error(e.getMessage(), e);
1085            }
1086        }
1087        log.debug("rebuildReadAcls: done.");
1088    }
1089
1090    /*
1091     * ----- Locking -----
1092     */
1093
1094    protected Connection connection(boolean autocommit) {
1095        try {
1096            connection.setAutoCommit(autocommit);
1097        } catch (SQLException e) {
1098            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1099        }
1100        return connection;
1101    }
1102
1103    /**
1104     * Calls the callable, inside a transaction if in cluster mode.
1105     * <p>
1106     * Called under {@link #serializationLock}.
1107     */
1108    protected Lock callInTransaction(LockCallable callable, boolean tx) {
1109        boolean ok = false;
1110        try {
1111            if (log.isDebugEnabled()) {
1112                log.debug("callInTransaction setAutoCommit " + !tx);
1113            }
1114            connection.setAutoCommit(!tx);
1115        } catch (SQLException e) {
1116            throw new NuxeoException("Cannot set auto commit mode onto " + this + "'s connection", e);
1117        }
1118        try {
1119            Lock result = callable.call();
1120            ok = true;
1121            return result;
1122        } finally {
1123            if (tx) {
1124                try {
1125                    try {
1126                        if (ok) {
1127                            if (log.isDebugEnabled()) {
1128                                log.debug("callInTransaction commit");
1129                            }
1130                            connection.commit();
1131                        } else {
1132                            if (log.isDebugEnabled()) {
1133                                log.debug("callInTransaction rollback");
1134                            }
1135                            connection.rollback();
1136                        }
1137                    } finally {
1138                        // restore autoCommit=true
1139                        if (log.isDebugEnabled()) {
1140                            log.debug("callInTransaction restoring autoCommit=true");
1141                        }
1142                        connection.setAutoCommit(true);
1143                    }
1144                } catch (SQLException e) {
1145                    throw new NuxeoException(e);
1146                }
1147            }
1148        }
1149    }
1150
1151    public interface LockCallable extends Callable<Lock> {
1152        @Override
1153        public Lock call();
1154    }
1155
1156    @Override
1157    public Lock getLock(Serializable id) {
1158        if (log.isDebugEnabled()) {
1159            try {
1160                log.debug("getLock " + id + " while autoCommit=" + connection.getAutoCommit());
1161            } catch (SQLException e) {
1162                throw new RuntimeException(e);
1163            }
1164        }
1165        RowId rowId = new RowId(Model.LOCK_TABLE_NAME, id);
1166        Row row = readSimpleRow(rowId);
1167        return row == null ? null : new Lock((String) row.get(Model.LOCK_OWNER_KEY),
1168                (Calendar) row.get(Model.LOCK_CREATED_KEY));
1169    }
1170
1171    @Override
1172    public Lock setLock(final Serializable id, final Lock lock) {
1173        if (log.isDebugEnabled()) {
1174            log.debug("setLock " + id + " owner=" + lock.getOwner());
1175        }
1176        SetLock call = new SetLock(id, lock);
1177        return callInTransaction(call, clusteringEnabled);
1178    }
1179
1180    protected class SetLock implements LockCallable {
1181        protected final Serializable id;
1182
1183        protected final Lock lock;
1184
1185        protected SetLock(Serializable id, Lock lock) {
1186            super();
1187            this.id = id;
1188            this.lock = lock;
1189        }
1190
1191        @Override
1192        public Lock call() {
1193            Lock oldLock = getLock(id);
1194            if (oldLock == null) {
1195                Row row = new Row(Model.LOCK_TABLE_NAME, id);
1196                row.put(Model.LOCK_OWNER_KEY, lock.getOwner());
1197                row.put(Model.LOCK_CREATED_KEY, lock.getCreated());
1198                insertSimpleRows(Model.LOCK_TABLE_NAME, Collections.singletonList(row));
1199            }
1200            return oldLock;
1201        }
1202    }
1203
1204    @Override
1205    public Lock removeLock(final Serializable id, final String owner, final boolean force) {
1206        if (log.isDebugEnabled()) {
1207            log.debug("removeLock " + id + " owner=" + owner + " force=" + force);
1208        }
1209        RemoveLock call = new RemoveLock(id, owner, force);
1210        return callInTransaction(call, !force);
1211    }
1212
1213    protected class RemoveLock implements LockCallable {
1214        protected final Serializable id;
1215
1216        protected final String owner;
1217
1218        protected final boolean force;
1219
1220        protected RemoveLock(Serializable id, String owner, boolean force) {
1221            super();
1222            this.id = id;
1223            this.owner = owner;
1224            this.force = force;
1225        }
1226
1227        @Override
1228        public Lock call() {
1229            Lock oldLock = force ? null : getLock(id);
1230            if (!force && owner != null) {
1231                if (oldLock == null) {
1232                    // not locked, nothing to do
1233                    return null;
1234                }
1235                if (!LockManager.canLockBeRemoved(oldLock.getOwner(), owner)) {
1236                    // existing mismatched lock, flag failure
1237                    return new Lock(oldLock, true);
1238                }
1239            }
1240            if (force || oldLock != null) {
1241                deleteRows(Model.LOCK_TABLE_NAME, Collections.singleton(id));
1242            }
1243            return oldLock;
1244        }
1245    }
1246
1247    @Override
1248    public void markReferencedBinaries() {
1249        log.debug("Starting binaries GC mark");
1250        Statement st = null;
1251        ResultSet rs = null;
1252        BlobManager blobManager = Framework.getService(BlobManager.class);
1253        String repositoryName = getRepositoryName();
1254        try {
1255            st = connection.createStatement();
1256            int i = -1;
1257            for (String sql : sqlInfo.getBinariesSql) {
1258                i++;
1259                Column col = sqlInfo.getBinariesColumns.get(i);
1260                if (logger.isLogEnabled()) {
1261                    logger.log(sql);
1262                }
1263                rs = st.executeQuery(sql);
1264                countExecute();
1265                int n = 0;
1266                while (rs.next()) {
1267                    n++;
1268                    String key = (String) col.getFromResultSet(rs, 1);
1269                    if (key != null) {
1270                        blobManager.markReferencedBinary(key, repositoryName);
1271                    }
1272                }
1273                if (logger.isLogEnabled()) {
1274                    logger.logCount(n);
1275                }
1276                rs.close();
1277            }
1278        } catch (SQLException e) {
1279            throw new RuntimeException("Failed to mark binaries for gC", e);
1280        } finally {
1281            try {
1282                closeStatement(st, rs);
1283            } catch (SQLException e) {
1284                log.error(e.getMessage(), e);
1285            }
1286        }
1287        log.debug("End of binaries GC mark");
1288    }
1289
1290    /*
1291     * ----- XAResource -----
1292     */
1293
1294    protected static String systemToString(Object o) {
1295        return o.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(o));
1296    }
1297
1298    @Override
1299    public void start(Xid xid, int flags) throws XAException {
1300        try {
1301            xaresource.start(xid, flags);
1302            if (logger.isLogEnabled()) {
1303                logger.log("XA start on " + systemToString(xid));
1304            }
1305        } catch (NuxeoException e) {
1306            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1307        } catch (XAException e) {
1308            logger.error("XA start error on " + systemToString(xid), e);
1309            throw e;
1310        }
1311    }
1312
1313    @Override
1314    public void end(Xid xid, int flags) throws XAException {
1315        try {
1316            xaresource.end(xid, flags);
1317            if (logger.isLogEnabled()) {
1318                logger.log("XA end on " + systemToString(xid));
1319            }
1320        } catch (NullPointerException e) {
1321            // H2 when no active transaction
1322            logger.error("XA end error on " + systemToString(xid), e);
1323            throw (XAException) new XAException(XAException.XAER_RMERR).initCause(e);
1324        } catch (XAException e) {
1325            if (flags != XAResource.TMFAIL) {
1326                logger.error("XA end error on " + systemToString(xid), e);
1327            }
1328            throw e;
1329        }
1330    }
1331
1332    @Override
1333    public int prepare(Xid xid) throws XAException {
1334        try {
1335            return xaresource.prepare(xid);
1336        } catch (XAException e) {
1337            logger.error("XA prepare error on  " + systemToString(xid), e);
1338            throw e;
1339        }
1340    }
1341
1342    @Override
1343    public void commit(Xid xid, boolean onePhase) throws XAException {
1344        try {
1345            xaresource.commit(xid, onePhase);
1346        } catch (XAException e) {
1347            logger.error("XA commit error on  " + systemToString(xid), e);
1348            throw e;
1349        }
1350    }
1351
1352    // rollback interacts with caches so is in RowMapper
1353
1354    @Override
1355    public void forget(Xid xid) throws XAException {
1356        xaresource.forget(xid);
1357    }
1358
1359    @Override
1360    public Xid[] recover(int flag) throws XAException {
1361        return xaresource.recover(flag);
1362    }
1363
1364    @Override
1365    public boolean setTransactionTimeout(int seconds) throws XAException {
1366        return xaresource.setTransactionTimeout(seconds);
1367    }
1368
1369    @Override
1370    public int getTransactionTimeout() throws XAException {
1371        return xaresource.getTransactionTimeout();
1372    }
1373
1374    @Override
1375    public boolean isSameRM(XAResource xares) throws XAException {
1376        throw new UnsupportedOperationException();
1377    }
1378
1379    @Override
1380    public boolean isConnected() {
1381        return connection != null;
1382    }
1383
1384    @Override
1385    public void connect() {
1386        openConnections();
1387    }
1388
1389    @Override
1390    public void disconnect() {
1391        closeConnections();
1392    }
1393
1394}