001/*
002 * (C) Copyright 2006-2011 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Olivier Grisel
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.directory.sql;
021
022import java.io.Serializable;
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.function.Consumer;
035
036import org.nuxeo.common.xmap.annotation.XNode;
037import org.nuxeo.common.xmap.annotation.XObject;
038import org.nuxeo.ecm.core.schema.types.SchemaImpl;
039import org.nuxeo.ecm.core.schema.types.primitives.StringType;
040import org.nuxeo.ecm.core.storage.sql.ColumnType;
041import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
042import org.nuxeo.ecm.core.storage.sql.jdbc.db.Delete;
043import org.nuxeo.ecm.core.storage.sql.jdbc.db.Insert;
044import org.nuxeo.ecm.core.storage.sql.jdbc.db.Select;
045import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
046import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table.IndexType;
047import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
048import org.nuxeo.ecm.directory.AbstractReference;
049import org.nuxeo.ecm.directory.BaseDirectoryDescriptor;
050import org.nuxeo.ecm.directory.Directory;
051import org.nuxeo.ecm.directory.DirectoryCSVLoader;
052import org.nuxeo.ecm.directory.DirectoryException;
053
054@XObject(value = "tableReference")
055public class TableReference extends AbstractReference implements Cloneable {
056
057    @XNode("@field")
058    public void setFieldName(String fieldName) {
059        this.fieldName = fieldName;
060    }
061
062    @Override
063    @XNode("@directory")
064    public void setTargetDirectoryName(String targetDirectoryName) {
065        this.targetDirectoryName = targetDirectoryName;
066    }
067
068    @XNode("@table")
069    protected String tableName;
070
071    @XNode("@sourceColumn")
072    protected String sourceColumn;
073
074    @XNode("@targetColumn")
075    protected String targetColumn;
076
077    @XNode("@dataFile")
078    protected String dataFileName;
079
080    private Table table;
081
082    private Dialect dialect;
083
084    private boolean initialized = false;
085
086    private SQLDirectory getSQLSourceDirectory() throws DirectoryException {
087        Directory dir = getSourceDirectory();
088        return (SQLDirectory) dir;
089    }
090
091    private void initialize(SQLSession sqlSession) throws DirectoryException {
092        Connection connection = sqlSession.sqlConnection;
093        SQLDirectory directory = getSQLSourceDirectory();
094        Table table = getTable();
095        SQLHelper helper = new SQLHelper(connection, table, directory.getDescriptor().getCreateTablePolicy());
096        boolean loadData = helper.setupTable();
097        if (loadData && dataFileName != null) {
098            // fake schema for DirectoryCSVLoader.loadData
099            SchemaImpl schema = new SchemaImpl(tableName, null);
100            schema.addField(sourceColumn, StringType.INSTANCE, null, 0, Collections.emptySet());
101            schema.addField(targetColumn, StringType.INSTANCE, null, 0, Collections.emptySet());
102            Insert insert = new Insert(table);
103            for (Column column : table.getColumns()) {
104                insert.addColumn(column);
105            }
106            try (PreparedStatement ps = connection.prepareStatement(insert.getStatement())) {
107                Consumer<Map<String, Object>> loader = new Consumer<Map<String, Object>>() {
108                    @Override
109                    public void accept(Map<String, Object> map) {
110                        try {
111                            ps.setString(1, (String) map.get(sourceColumn));
112                            ps.setString(2, (String) map.get(targetColumn));
113                            ps.execute();
114                        } catch (SQLException e) {
115                            throw new DirectoryException(e);
116                        }
117                    }
118                };
119                DirectoryCSVLoader.loadData(dataFileName, BaseDirectoryDescriptor.DEFAULT_DATA_FILE_CHARACTER_SEPARATOR,
120                        schema, loader);
121            } catch (SQLException e) {
122                throw new DirectoryException(String.format("Table '%s' initialization failed", tableName), e);
123            }
124        }
125    }
126
127    @Override
128    public void addLinks(String sourceId, List<String> targetIds) throws DirectoryException {
129        if (targetIds == null) {
130            return;
131        }
132        try (SQLSession session = getSQLSession()) {
133            addLinks(sourceId, targetIds, session);
134        }
135    }
136
137    @Override
138    public void addLinks(List<String> sourceIds, String targetId) throws DirectoryException {
139        if (sourceIds == null) {
140            return;
141        }
142        try (SQLSession session = getSQLSession()) {
143            addLinks(sourceIds, targetId, session);
144        }
145    }
146
147    public void addLinks(String sourceId, List<String> targetIds, SQLSession session) throws DirectoryException {
148        if (targetIds == null) {
149            return;
150        }
151        for (String targetId : targetIds) {
152            addLink(sourceId, targetId, session, true);
153        }
154    }
155
156    public void addLinks(List<String> sourceIds, String targetId, SQLSession session) throws DirectoryException {
157        if (sourceIds == null) {
158            return;
159        }
160        for (String sourceId : sourceIds) {
161            addLink(sourceId, targetId, session, true);
162        }
163    }
164
165    public boolean exists(String sourceId, String targetId, SQLSession session) throws DirectoryException {
166        // String selectSql = String.format(
167        // "SELECT COUNT(*) FROM %s WHERE %s = ? AND %s = ?", tableName,
168        // sourceColumn, targetColumn);
169
170        Table table = getTable();
171        Select select = new Select(table);
172        select.setFrom(table.getQuotedName());
173        select.setWhat("count(*)");
174        String whereString = String.format("%s = ? and %s = ?", table.getColumn(sourceColumn).getQuotedName(),
175                table.getColumn(targetColumn).getQuotedName());
176
177        select.setWhere(whereString);
178
179        String selectSql = select.getStatement();
180        if (session.logger.isLogEnabled()) {
181            session.logger.logSQL(selectSql, Arrays.<Serializable> asList(sourceId, targetId));
182        }
183
184        PreparedStatement ps = null;
185        ResultSet rs = null;
186        try {
187            ps = session.sqlConnection.prepareStatement(selectSql);
188            ps.setString(1, sourceId);
189            ps.setString(2, targetId);
190            rs = ps.executeQuery();
191            rs.next();
192            return rs.getInt(1) > 0;
193        } catch (SQLException e) {
194            throw new DirectoryException(String.format("error reading link from %s to %s", sourceId, targetId), e);
195        } finally {
196            try {
197                if (rs != null) {
198                    rs.close();
199                }
200                if (ps != null) {
201                    ps.close();
202                }
203            } catch (SQLException sqle) {
204                throw new DirectoryException(sqle);
205            }
206        }
207    }
208
209    public void addLink(String sourceId, String targetId, SQLSession session, boolean checkExisting)
210            throws DirectoryException {
211        // OG: the following query should have avoided the round trips but
212        // does not work for some reason that might be related to a bug in the
213        // JDBC driver:
214        //
215        // String sql = String.format(
216        // "INSERT INTO %s (%s, %s) (SELECT ?, ? FROM %s WHERE %s = ? AND %s =
217        // ? HAVING COUNT(*) = 0)", tableName, sourceColumn, targetColumn,
218        // tableName, sourceColumn, targetColumn);
219
220        // first step: check that this link does not exist yet
221        if (checkExisting && exists(sourceId, targetId, session)) {
222            return;
223        }
224
225        // second step: add the link
226
227        // String insertSql = String.format(
228        // "INSERT INTO %s (%s, %s) VALUES (?, ?)", tableName,
229        // sourceColumn, targetColumn);
230        Table table = getTable();
231        Insert insert = new Insert(table);
232        insert.addColumn(table.getColumn(sourceColumn));
233        insert.addColumn(table.getColumn(targetColumn));
234        String insertSql = insert.getStatement();
235        if (session.logger.isLogEnabled()) {
236            session.logger.logSQL(insertSql, Arrays.<Serializable> asList(sourceId, targetId));
237        }
238
239        PreparedStatement ps = null;
240        try {
241            ps = session.sqlConnection.prepareStatement(insertSql);
242            ps.setString(1, sourceId);
243            ps.setString(2, targetId);
244            ps.execute();
245        } catch (SQLException e) {
246            throw new DirectoryException(String.format("error adding link from %s to %s", sourceId, targetId), e);
247        } finally {
248            try {
249                if (ps != null) {
250                    ps.close();
251                }
252            } catch (SQLException sqle) {
253                throw new DirectoryException(sqle);
254            }
255        }
256    }
257
258    protected List<String> getIdsFor(String valueColumn, String filterColumn, String filterValue)
259            throws DirectoryException {
260        try (SQLSession session = getSQLSession()) {
261            // String sql = String.format("SELECT %s FROM %s WHERE %s = ?",
262            // table.getColumn(valueColumn), tableName, filterColumn);
263            Table table = getTable();
264            Select select = new Select(table);
265            select.setWhat(table.getColumn(valueColumn).getQuotedName());
266            select.setFrom(table.getQuotedName());
267            select.setWhere(table.getColumn(filterColumn).getQuotedName() + " = ?");
268
269            String sql = select.getStatement();
270            if (session.logger.isLogEnabled()) {
271                session.logger.logSQL(sql, Collections.<Serializable> singleton(filterValue));
272            }
273
274            List<String> ids = new LinkedList<String>();
275            try (PreparedStatement ps = session.sqlConnection.prepareStatement(sql)) {
276                ps.setString(1, filterValue);
277                try (ResultSet rs = ps.executeQuery()) {
278                    while (rs.next()) {
279                        ids.add(rs.getString(valueColumn));
280                    }
281                    return ids;
282                }
283            } catch (SQLException e) {
284                throw new DirectoryException("error fetching reference values: ", e);
285            }
286        }
287    }
288
289    @Override
290    public List<String> getSourceIdsForTarget(String targetId) throws DirectoryException {
291        return getIdsFor(sourceColumn, targetColumn, targetId);
292    }
293
294    @Override
295    public List<String> getTargetIdsForSource(String sourceId) throws DirectoryException {
296        return getIdsFor(targetColumn, sourceColumn, sourceId);
297    }
298
299    public void removeLinksFor(String column, String entryId, SQLSession session) throws DirectoryException {
300        Table table = getTable();
301        String sql = String.format("DELETE FROM %s WHERE %s = ?", table.getQuotedName(), table.getColumn(column)
302                                                                                              .getQuotedName());
303        if (session.logger.isLogEnabled()) {
304            session.logger.logSQL(sql, Collections.<Serializable> singleton(entryId));
305        }
306        PreparedStatement ps = null;
307        try {
308            ps = session.sqlConnection.prepareStatement(sql);
309            ps.setString(1, entryId);
310            ps.execute();
311        } catch (SQLException e) {
312            throw new DirectoryException("error remove links to " + entryId, e);
313        } finally {
314            try {
315                if (ps != null) {
316                    ps.close();
317                }
318            } catch (SQLException sqle) {
319                throw new DirectoryException(sqle);
320            }
321        }
322    }
323
324    public void removeLinksForSource(String sourceId, SQLSession session) throws DirectoryException {
325        maybeInitialize(session);
326        removeLinksFor(sourceColumn, sourceId, session);
327    }
328
329    public void removeLinksForTarget(String targetId, SQLSession session) throws DirectoryException {
330        maybeInitialize(session);
331        removeLinksFor(targetColumn, targetId, session);
332    }
333
334    @Override
335    public void removeLinksForSource(String sourceId) throws DirectoryException {
336        try (SQLSession session = getSQLSession()) {
337            removeLinksForSource(sourceId, session);
338        }
339    }
340
341    @Override
342    public void removeLinksForTarget(String targetId) throws DirectoryException {
343        try (SQLSession session = getSQLSession()) {
344            removeLinksForTarget(targetId, session);
345        }
346    }
347
348    public void setIdsFor(String idsColumn, List<String> ids, String filterColumn, String filterValue,
349            SQLSession session) throws DirectoryException {
350
351        List<String> idsToDelete = new LinkedList<String>();
352        Set<String> idsToAdd = new HashSet<String>();
353        if (ids != null) { // ids may be null
354            idsToAdd.addAll(ids);
355        }
356        Table table = getTable();
357
358        // iterate over existing links to find what to add and what to remove
359        String selectSql = String.format("SELECT %s FROM %s WHERE %s = ?", table.getColumn(idsColumn).getQuotedName(),
360                table.getQuotedName(), table.getColumn(filterColumn).getQuotedName());
361        PreparedStatement ps = null;
362        try {
363            ps = session.sqlConnection.prepareStatement(selectSql);
364            ps.setString(1, filterValue);
365            ResultSet rs = ps.executeQuery();
366            while (rs.next()) {
367                String existingId = rs.getString(1);
368                if (idsToAdd.contains(existingId)) {
369                    // to not add already existing ids
370                    idsToAdd.remove(existingId);
371                } else {
372                    // delete unwanted existing ids
373                    idsToDelete.add(existingId);
374                }
375            }
376            rs.close();
377        } catch (SQLException e) {
378            throw new DirectoryException("failed to fetch existing links for " + filterValue, e);
379        } finally {
380            try {
381                if (ps != null) {
382                    ps.close();
383                }
384            } catch (SQLException sqle) {
385                throw new DirectoryException(sqle);
386            }
387        }
388
389        if (!idsToDelete.isEmpty()) {
390            // remove unwanted links
391
392            // String deleteSql = String.format(
393            // "DELETE FROM %s WHERE %s = ? AND %s = ?", tableName,
394            // filterColumn, idsColumn);
395            Delete delete = new Delete(table);
396            String whereString = String.format("%s = ? AND %s = ?", table.getColumn(filterColumn).getQuotedName(),
397                    table.getColumn(idsColumn).getQuotedName());
398            delete.setWhere(whereString);
399            String deleteSql = delete.getStatement();
400
401            try {
402                ps = session.sqlConnection.prepareStatement(deleteSql);
403                for (String unwantedId : idsToDelete) {
404                    if (session.logger.isLogEnabled()) {
405                        session.logger.logSQL(deleteSql, Arrays.<Serializable> asList(filterValue, unwantedId));
406                    }
407                    ps.setString(1, filterValue);
408                    ps.setString(2, unwantedId);
409                    ps.execute();
410                }
411            } catch (SQLException e) {
412                throw new DirectoryException("failed to remove unwanted links for " + filterValue, e);
413            } finally {
414                try {
415                    if (ps != null) {
416                        ps.close();
417                    }
418                } catch (SQLException sqle) {
419                    throw new DirectoryException(sqle);
420                }
421            }
422        }
423
424        if (!idsToAdd.isEmpty()) {
425            // add missing links
426            if (filterColumn.equals(sourceColumn)) {
427                for (String missingId : idsToAdd) {
428                    addLink(filterValue, missingId, session, false);
429                }
430            } else {
431                for (String missingId : idsToAdd) {
432                    addLink(missingId, filterValue, session, false);
433                }
434            }
435        }
436    }
437
438    public void setSourceIdsForTarget(String targetId, List<String> sourceIds, SQLSession session)
439            throws DirectoryException {
440        setIdsFor(sourceColumn, sourceIds, targetColumn, targetId, session);
441    }
442
443    public void setTargetIdsForSource(String sourceId, List<String> targetIds, SQLSession session)
444            throws DirectoryException {
445        setIdsFor(targetColumn, targetIds, sourceColumn, sourceId, session);
446    }
447
448    @Override
449    public void setSourceIdsForTarget(String targetId, List<String> sourceIds) throws DirectoryException {
450        try (SQLSession session = getSQLSession()) {
451            setSourceIdsForTarget(targetId, sourceIds, session);
452        }
453    }
454
455    @Override
456    public void setTargetIdsForSource(String sourceId, List<String> targetIds) throws DirectoryException {
457        try (SQLSession session = getSQLSession()) {
458            setTargetIdsForSource(sourceId, targetIds, session);
459        }
460    }
461
462    // TODO add support for the ListDiff type
463
464    protected SQLSession getSQLSession() throws DirectoryException {
465        if (!initialized) {
466            try (SQLSession sqlSession = (SQLSession) getSourceDirectory().getSession()) {
467                initialize(sqlSession);
468                initialized = true;
469            }
470        }
471        return (SQLSession) getSourceDirectory().getSession();
472    }
473
474    /**
475     * Initialize if needed, using an existing session.
476     *
477     * @param sqlSession
478     * @throws DirectoryException
479     */
480    protected void maybeInitialize(SQLSession sqlSession) throws DirectoryException {
481        if (!initialized) {
482            initialize(sqlSession);
483            initialized = true;
484        }
485    }
486
487    public Table getTable() throws DirectoryException {
488        if (table == null) {
489            boolean nativeCase = getSQLSourceDirectory().useNativeCase();
490            table = SQLHelper.addTable(tableName, getDialect(), nativeCase);
491            SQLHelper.addColumn(table, sourceColumn, ColumnType.STRING, nativeCase);
492            SQLHelper.addColumn(table, targetColumn, ColumnType.STRING, nativeCase);
493            // index added for Azure
494            table.addIndex(null, IndexType.MAIN_NON_PRIMARY, sourceColumn);
495        }
496        return table;
497    }
498
499    private Dialect getDialect() throws DirectoryException {
500        if (dialect == null) {
501            dialect = getSQLSourceDirectory().getDialect();
502        }
503        return dialect;
504    }
505
506    public String getSourceColumn() {
507        return sourceColumn;
508    }
509
510    public String getTargetColumn() {
511        return targetColumn;
512    }
513
514    public String getTargetDirectoryName() {
515        return targetDirectoryName;
516    }
517
518    public String getTableName() {
519        return tableName;
520    }
521
522    public String getDataFileName() {
523        return dataFileName;
524    }
525
526    /**
527     * @since 5.6
528     */
529    @Override
530    public TableReference clone() {
531        TableReference clone = (TableReference) super.clone();
532        // basic fields are already copied by super.clone()
533        return clone;
534    }
535
536}