001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     H2 Group
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql.db;
021
022import java.io.IOException;
023import java.io.Reader;
024import java.lang.reflect.Constructor;
025import java.nio.file.Paths;
026import java.sql.Clob;
027import java.sql.Connection;
028import java.sql.DatabaseMetaData;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.sql.Types;
034import java.util.Arrays;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.ConcurrentHashMap;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.lucene.analysis.Analyzer;
046import org.apache.lucene.document.Document;
047import org.apache.lucene.document.Field;
048import org.apache.lucene.document.StringField;
049import org.apache.lucene.document.TextField;
050import org.apache.lucene.index.DirectoryReader;
051import org.apache.lucene.index.IndexReader;
052import org.apache.lucene.index.IndexWriter;
053import org.apache.lucene.index.IndexWriterConfig;
054import org.apache.lucene.index.IndexWriterConfig.OpenMode;
055import org.apache.lucene.index.LeafReaderContext;
056import org.apache.lucene.index.Term;
057import org.apache.lucene.queryparser.classic.ParseException;
058import org.apache.lucene.queryparser.classic.QueryParser;
059import org.apache.lucene.search.BooleanClause;
060import org.apache.lucene.search.BooleanQuery;
061import org.apache.lucene.search.Collector;
062import org.apache.lucene.search.IndexSearcher;
063import org.apache.lucene.search.LeafCollector;
064import org.apache.lucene.search.Scorer;
065import org.apache.lucene.store.Directory;
066import org.apache.lucene.store.FSDirectory;
067import org.apache.lucene.store.LockObtainFailedException;
068import org.apache.lucene.store.RAMDirectory;
069import org.h2.message.DbException;
070import org.h2.store.fs.FileUtils;
071import org.h2.tools.SimpleResultSet;
072import org.h2.util.IOUtils;
073import org.h2.util.StringUtils;
074
075/**
076 * An optimized Lucene-based fulltext indexing trigger and search.
077 */
078public class H2Fulltext {
079
080    private static final Map<String, Analyzer> analyzers = new ConcurrentHashMap<>();
081
082    private static final Map<String, IndexWriter> indexWriters = new ConcurrentHashMap<>();
083
084    private static final String FT_SCHEMA = "NXFT";
085
086    private static final String FT_TABLE = FT_SCHEMA + ".INDEXES";
087
088    private static final String PREFIX = "NXFT_";
089
090    private static final String FIELD_KEY = "KEY";
091
092    private static final String FIELD_TEXT = "TEXT";
093
094    private static final String DEFAULT_INDEX_NAME = "PUBLIC_FULLTEXT_default";
095
096    private static final String COL_KEY = "KEY";
097
098    // Utility class.
099    private H2Fulltext() {
100    }
101
102    /**
103     * Initializes fulltext search functionality for this database. This adds the following Java functions to the
104     * database:
105     * <ul>
106     * <li>NXFT_CREATE_INDEX(nameString, schemaString, tableString, columnListString, analyzerString)</li>
107     * <li>NXFT_REINDEX()</li>
108     * <li>NXFT_DROP_ALL()</li>
109     * <li>NXFT_SEARCH(queryString, limitInt, offsetInt): result set</li>
110     * </ul>
111     * It also adds a schema NXFT to the database where bookkeeping information is stored. This function may be called
112     * from a Java application, or by using the SQL statements:
113     *
114     * <pre>
115     *  CREATE ALIAS IF NOT EXISTS NXFT_INIT FOR
116     *      &quot;org.nuxeo.ecm.core.storage.sql.db.H2Fulltext.init&quot;;
117     *  CALL NXFT_INIT();
118     * </pre>
119     */
120    public static void init(Connection conn) throws SQLException {
121        try (Statement st = conn.createStatement()) {
122            st.execute("CREATE SCHEMA IF NOT EXISTS " + FT_SCHEMA);
123            st.execute("CREATE TABLE IF NOT EXISTS " + FT_TABLE
124                    + "(NAME VARCHAR, SCHEMA VARCHAR, TABLE VARCHAR, COLUMNS VARCHAR, "
125                    + "ANALYZER VARCHAR, PRIMARY KEY(NAME))");
126            // BBB migrate old table without the "NAME" column
127            try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " + "TABLE_SCHEMA = '"
128                    + FT_SCHEMA + "' AND TABLE_NAME = 'INDEXES' AND COLUMN_NAME = 'NAME'")) {
129                if (!rs.next()) {
130                    // BBB no NAME column, alter table to create it
131                    st.execute("ALTER TABLE " + FT_TABLE + " ADD COLUMN NAME VARCHAR");
132                    st.execute("UPDATE " + FT_TABLE + " SET NAME = '" + DEFAULT_INDEX_NAME + "'");
133                }
134            }
135
136            String className = H2Fulltext.class.getName();
137            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "CREATE_INDEX FOR \"" + className + ".createIndex\"");
138            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "REINDEX FOR \"" + className + ".reindex\"");
139            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "DROP_ALL FOR \"" + className + ".dropAll\"");
140            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "SEARCH FOR \"" + className + ".search\"");
141        }
142    }
143
144    // ----- static methods called directly to initialize fulltext -----
145
146    /**
147     * Creates a fulltext index for a table and column list.
148     * <p>
149     * A table may have any number of indexes at a time, but the index name must be unique. If the index already exists,
150     * nothing is done, otherwise the index is created and populated from existing data.
151     * <p>
152     * Usually called through:
153     *
154     * <pre>
155     *   CALL NXFT_CREATE_INDEX('indexname', 'myschema', 'mytable', ('col1', 'col2'), 'lucene.analyzer');
156     * </pre>
157     *
158     * @param conn the connection
159     * @param indexName the index name
160     * @param schema the schema name of the table
161     * @param table the table name
162     * @param columns the column list
163     * @param analyzer the Lucene fulltext analyzer class
164     */
165    public static void createIndex(Connection conn, String indexName, String schema, String table, String columns,
166            String analyzer) throws SQLException {
167        if (indexName == null) {
168            indexName = DEFAULT_INDEX_NAME;
169        }
170        columns = columns.replace("(", "").replace(")", "").replace(" ", "");
171        try (PreparedStatement ps = conn.prepareStatement("DELETE FROM " + FT_TABLE + " WHERE NAME = ?")) {
172            ps.setString(1, indexName);
173            ps.execute();
174        }
175        try (PreparedStatement ps = conn.prepareStatement(
176                "INSERT INTO " + FT_TABLE + "(NAME, SCHEMA, TABLE, COLUMNS, ANALYZER) VALUES(?, ?, ?, ?, ?)")) {
177            ps.setString(1, indexName);
178            ps.setString(2, schema);
179            ps.setString(3, table);
180            ps.setString(4, columns);
181            ps.setString(5, analyzer);
182            ps.execute();
183        }
184        createTrigger(conn, schema, table);
185    }
186
187    /**
188     * Re-creates the fulltext index for this database.
189     */
190    public static void reindex(Connection conn) throws SQLException {
191        removeAllTriggers(conn);
192        removeIndexFiles(conn);
193        try (Statement st = conn.createStatement()) {
194            try (ResultSet rs = st.executeQuery("SELECT * FROM " + FT_TABLE)) {
195                Set<String> done = new HashSet<>();
196                while (rs.next()) {
197                    String schema = rs.getString("SCHEMA");
198                    String table = rs.getString("TABLE");
199                    String key = schema + '.' + table;
200                    if (!done.add(key)) {
201                        continue;
202                    }
203                    createTrigger(conn, schema, table);
204                    indexExistingRows(conn, schema, table);
205                }
206            }
207        }
208    }
209
210    private static void indexExistingRows(Connection conn, String schema, String table) throws SQLException {
211        Trigger trigger = new Trigger();
212        trigger.init(conn, schema, null, table, false, org.h2.api.Trigger.INSERT);
213        try (Statement st = conn.createStatement()) {
214            try (ResultSet rs = st.executeQuery("SELECT * FROM " + StringUtils.quoteIdentifier(schema) + '.'
215                    + StringUtils.quoteIdentifier(table))) {
216                int n = rs.getMetaData().getColumnCount();
217                while (rs.next()) {
218                    Object[] row = new Object[n];
219                    for (int i = 0; i < n; i++) {
220                        row[i] = rs.getObject(i + 1);
221                    }
222                    trigger.fire(conn, null, row);
223                }
224            }
225        }
226    }
227
228    /**
229     * Creates a trigger for the indexes on a table.
230     * <p>
231     * Usually called through:
232     *
233     * <pre>
234     *   CALL NXFT_CREATE_TRIGGERS('myschema', 'mytable');
235     * </pre>
236     *
237     * @param conn the connection
238     * @param schema the schema name of the table
239     * @param table the table name
240     */
241    private static void createTrigger(Connection conn, String schema, String table) throws SQLException {
242        try (Statement st = conn.createStatement()) {
243            schema = StringUtils.quoteIdentifier(schema);
244            String trigger = schema + '.' + StringUtils.quoteIdentifier(PREFIX + table);
245            st.execute("DROP TRIGGER IF EXISTS " + trigger);
246            st.execute(String.format(
247                    "CREATE TRIGGER %s " + "AFTER INSERT, UPDATE, DELETE ON %s.%s " + "FOR EACH ROW CALL \"%s\"",
248                    trigger, schema, StringUtils.quoteIdentifier(table), H2Fulltext.Trigger.class.getName()));
249        }
250    }
251
252    private static void removeAllTriggers(Connection conn) throws SQLException {
253        try (Statement st = conn.createStatement()) {
254            try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.TRIGGERS")) {
255                try (Statement st2 = conn.createStatement()) {
256                    while (rs.next()) {
257                        String trigger = rs.getString("TRIGGER_NAME");
258                        if (trigger.startsWith(PREFIX)) {
259                            st2.execute("DROP TRIGGER " + StringUtils.quoteIdentifier(rs.getString("TRIGGER_SCHEMA"))
260                                    + "." + trigger);
261                        }
262                    }
263                }
264            }
265        }
266    }
267
268    /**
269     * Drops all fulltext indexes from the database.
270     */
271    public static void dropAll(Connection conn) throws SQLException {
272        try (Statement st = conn.createStatement()) {
273            st.execute("DROP SCHEMA IF EXISTS " + FT_SCHEMA);
274        }
275        removeAllTriggers(conn);
276        removeIndexFiles(conn);
277    }
278
279    private static String fieldForIndex(String indexName) {
280        if (DEFAULT_INDEX_NAME.equals(indexName)) {
281            return FIELD_TEXT;
282        } else {
283            return FIELD_TEXT + '_' + indexName;
284        }
285    }
286
287    /**
288     * Searches from the given full text index. The returned result set has a single ID column which holds the keys for
289     * the matching rows.
290     * <p>
291     * Usually called through:
292     *
293     * <pre>
294     *   SELECT * FROM NXFT_SEARCH(name, 'text');
295     * </pre>
296     *
297     * @param conn the connection
298     * @param indexName the index name
299     * @param text the search query
300     * @return the result set
301     */
302    public static ResultSet search(Connection conn, String indexName, String text) throws SQLException {
303        DatabaseMetaData meta = conn.getMetaData();
304        if (indexName == null) {
305            indexName = DEFAULT_INDEX_NAME;
306        }
307
308        String schema;
309        String table;
310        String analyzerName;
311
312        // find schema, table and analyzer
313        try (PreparedStatement ps = conn.prepareStatement(
314                "SELECT SCHEMA, TABLE, ANALYZER FROM " + FT_TABLE + " WHERE NAME = ?")) {
315            ps.setString(1, indexName);
316            try (ResultSet res = ps.executeQuery()) {
317                if (!res.next()) {
318                    throw new SQLException("No such index: " + indexName);
319                }
320                schema = res.getString(1);
321                table = res.getString(2);
322                analyzerName = res.getString(3);
323            }
324        }
325
326        int type = getPrimaryKeyType(meta, schema, table);
327        SimpleResultSet rs = new SimpleResultSet();
328        rs.addColumn(COL_KEY, type, 0, 0);
329
330        if (meta.getURL().startsWith("jdbc:columnlist:")) {
331            // this is just to query the result set columns
332            return rs;
333        }
334
335        // flush changes
336        final IndexWriter writer = getIndexWriter(getIndexName(conn), getIndexPath(conn), analyzerName);
337        if (writer.hasUncommittedChanges()) {
338            try {
339                writer.commit();
340            } catch (IOException cause) {
341                throw convertException(cause);
342            }
343        }
344
345        // search index
346        try {
347            BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
348            String defaultField = fieldForIndex(indexName);
349            Analyzer analyzer = getAnalyzer(analyzerName);
350            QueryParser parser = new QueryParser(defaultField, analyzer);
351            queryBuilder.add(parser.parse(text), BooleanClause.Occur.MUST);
352
353            try (IndexReader reader = DirectoryReader.open(writer.getDirectory())) {
354                IndexSearcher searcher = new IndexSearcher(reader);
355                Collector collector = new ResultSetCollector(rs, reader, type);
356                searcher.search(queryBuilder.build(), collector);
357            }
358        } catch (SQLException | ParseException | IOException e) {
359            throw convertException(e);
360        }
361        return rs;
362    }
363
364    protected static class ResultSetCollector implements Collector, LeafCollector {
365        protected final SimpleResultSet rs;
366
367        protected IndexReader reader;
368
369        protected int type;
370
371        protected int docBase;
372
373        public ResultSetCollector(SimpleResultSet rs, IndexReader reader, int type) {
374            this.rs = rs;
375            this.reader = reader;
376            this.type = type;
377        }
378
379        @Override
380        public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
381            docBase = context.docBase;
382            return this;
383        }
384
385        @Override
386        public void setScorer(Scorer scorer) {
387        }
388
389        @Override
390        public boolean needsScores() {
391            return false;
392        }
393
394        @Override
395        public void collect(int docID) throws IOException {
396            docID += docBase;
397            Document doc = reader.document(docID, Collections.singleton(FIELD_KEY));
398            Object key;
399            try {
400                key = asObject(doc.get(FIELD_KEY), type);
401                rs.addRow(key);
402            } catch (SQLException e) {
403                throw new IOException(e);
404            }
405        }
406    }
407
408    private static int getPrimaryKeyType(DatabaseMetaData meta, String schema, String table) throws SQLException {
409        // find primary key name
410        String primaryKeyName = null;
411        try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) {
412            while (rs.next()) {
413                if (primaryKeyName != null) {
414                    throw new SQLException("Can only index primary keys on one column for " + schema + '.' + table);
415                }
416                primaryKeyName = rs.getString("COLUMN_NAME");
417            }
418            if (primaryKeyName == null) {
419                throw new SQLException("No primary key for " + schema + '.' + table);
420            }
421        }
422        // find primary key type
423        try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) {
424            if (!rs.next()) {
425                throw new SQLException("Could not find primary key");
426            }
427            return rs.getInt("DATA_TYPE");
428        }
429    }
430
431    private static Analyzer getAnalyzer(String analyzerName) throws SQLException {
432        Analyzer analyzer = analyzers.get(analyzerName);
433        if (analyzer == null) {
434            try {
435                Class<?> klass = Class.forName(analyzerName);
436                Constructor<?> constructor = klass.getConstructor();
437                analyzer = (Analyzer) constructor.newInstance();
438            } catch (ReflectiveOperationException e) {
439                throw new SQLException(e.toString());
440            }
441            analyzers.put(analyzerName, analyzer);
442        }
443        return analyzer;
444    }
445
446    protected static String getIndexName(Connection conn) throws SQLException {
447        String catalog = conn.getCatalog();
448        if (catalog == null) {
449            catalog = "default";
450        }
451        return catalog;
452    }
453
454    protected static String getIndexPath(Connection conn) throws SQLException {
455        try (Statement st = conn.createStatement()) {
456            try (ResultSet rs = st.executeQuery("CALL DATABASE_PATH()")) {
457                rs.next();
458                String path = rs.getString(1);
459                if (path == null) {
460                    return null;
461                }
462                return path + ".lucene";
463            }
464        }
465
466    }
467
468    private static IndexWriter getIndexWriter(String name, String path, String analyzer) throws SQLException {
469        IndexWriter indexWriter = indexWriters.get(name);
470        if (indexWriter != null) {
471            return indexWriter;
472        }
473        synchronized (indexWriters) {
474            indexWriter = indexWriters.get(name);
475            if (indexWriter != null) {
476                return indexWriter;
477            }
478            try {
479                Directory dir = path == null ? new RAMDirectory() : FSDirectory.open(Paths.get(path));
480                Analyzer an = getAnalyzer(analyzer);
481                IndexWriterConfig iwc = new IndexWriterConfig(an);
482                iwc.setOpenMode(OpenMode.CREATE_OR_APPEND);
483                indexWriter = new IndexWriter(dir, iwc);
484            } catch (LockObtainFailedException e) {
485                throw convertException("Cannot open fulltext index " + path, e);
486            } catch (IOException e) {
487                throw convertException(e);
488            }
489            indexWriters.put(name, indexWriter);
490            return indexWriter;
491        }
492    }
493
494    private static void removeIndexFiles(Connection conn) throws SQLException {
495        String path = getIndexPath(conn);
496        try {
497            IndexWriter index = indexWriters.remove(path);
498            if (index != null) {
499                try {
500                    index.close();
501                } catch (IOException e) {
502                    throw convertException(e);
503                }
504            }
505        } finally {
506            FileUtils.deleteRecursive(path, false);
507        }
508    }
509
510    private static SQLException convertException(Exception e) {
511        return convertException("Error while indexing document", e);
512    }
513
514    private static SQLException convertException(String message, Exception e) {
515        SQLException e2 = new SQLException(message);
516        e2.initCause(e);
517        return e2;
518    }
519
520    protected static String asString(Object data, int type) throws SQLException {
521        if (data == null) {
522            return "";
523        }
524        switch (type) {
525        case Types.BIT:
526        case Types.BOOLEAN:
527        case Types.INTEGER:
528        case Types.BIGINT:
529        case Types.DECIMAL:
530        case Types.DOUBLE:
531        case Types.FLOAT:
532        case Types.NUMERIC:
533        case Types.REAL:
534        case Types.SMALLINT:
535        case Types.TINYINT:
536        case Types.DATE:
537        case Types.TIME:
538        case Types.TIMESTAMP:
539        case Types.LONGVARCHAR:
540        case Types.CHAR:
541        case Types.VARCHAR:
542            return data.toString();
543        case Types.CLOB:
544            try {
545                if (data instanceof Clob) {
546                    data = ((Clob) data).getCharacterStream();
547                }
548                return IOUtils.readStringAndClose((Reader) data, -1);
549            } catch (IOException e) {
550                throw DbException.convert(e);
551            }
552        case Types.VARBINARY:
553        case Types.LONGVARBINARY:
554        case Types.BINARY:
555        case Types.JAVA_OBJECT:
556        case Types.OTHER:
557        case Types.BLOB:
558        case Types.STRUCT:
559        case Types.REF:
560        case Types.NULL:
561        case Types.ARRAY:
562        case Types.DATALINK:
563        case Types.DISTINCT:
564            throw new SQLException("Unsupported column data type: " + type);
565        default:
566            return "";
567        }
568    }
569
570    // simple cases only, used for primary key
571    private static Object asObject(String string, int type) throws SQLException {
572        switch (type) {
573        case Types.BIGINT:
574            return Long.valueOf(string);
575        case Types.INTEGER:
576        case Types.SMALLINT:
577        case Types.TINYINT:
578            return Integer.valueOf(string);
579        case Types.LONGVARCHAR:
580        case Types.CHAR:
581        case Types.VARCHAR:
582            return string;
583        default:
584            throw new SQLException("Unsupport data type for primary key: " + type);
585        }
586    }
587
588    /**
589     * Trigger used to update the lucene index upon row change.
590     */
591    public static class Trigger implements org.h2.api.Trigger {
592
593        private static final Log log = LogFactory.getLog(Trigger.class);
594
595        private String indexName;
596
597        private String indexPath;
598
599        private IndexWriter indexWriter;
600
601        // DEBUG
602        private Exception lastIndexWriterClose;
603
604        // DEBUG
605        private String lastIndexWriterCloseThread;
606
607        /** Starting at 0. */
608        private int primaryKeyIndex;
609
610        private int primaryKeyType;
611
612        private Map<String, int[]> columnTypes;
613
614        private Map<String, int[]> columnIndices;
615
616        /**
617         * Trigger interface: initialization.
618         */
619        @Override
620        public void init(Connection conn, String schema, String triggerName, String table, boolean before, int opType)
621                throws SQLException {
622            DatabaseMetaData meta = conn.getMetaData();
623
624            // find primary key name
625            String primaryKeyName = null;
626            try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) {
627                while (rs.next()) {
628                    if (primaryKeyName != null) {
629                        throw new SQLException(
630                                "Can only index primary keys on one column for: " + schema + '.' + table);
631                    }
632                    primaryKeyName = rs.getString("COLUMN_NAME");
633                }
634                if (primaryKeyName == null) {
635                    throw new SQLException("No primary key for " + schema + '.' + table);
636                }
637            }
638
639            // find primary key type
640            try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) {
641                if (!rs.next()) {
642                    throw new SQLException("No primary key for: " + schema + '.' + table);
643                }
644                primaryKeyType = rs.getInt("DATA_TYPE");
645                primaryKeyIndex = rs.getInt("ORDINAL_POSITION") - 1;
646            }
647
648            // find all columns info
649            Map<String, Integer> allColumnTypes = new HashMap<>();
650            Map<String, Integer> allColumnIndices = new HashMap<>();
651            try (ResultSet rs = meta.getColumns(null, schema, table, null)) {
652                while (rs.next()) {
653                    String name = rs.getString("COLUMN_NAME");
654                    int type = rs.getInt("DATA_TYPE");
655                    int index = rs.getInt("ORDINAL_POSITION") - 1;
656                    allColumnTypes.put(name, Integer.valueOf(type));
657                    allColumnIndices.put(name, Integer.valueOf(index));
658                }
659            }
660
661            // find columns configured for indexing
662            try (PreparedStatement ps = conn.prepareStatement(
663                    "SELECT NAME, COLUMNS, ANALYZER FROM " + FT_TABLE + " WHERE SCHEMA = ? AND TABLE = ?")) {
664                ps.setString(1, schema);
665                ps.setString(2, table);
666                try (ResultSet rs = ps.executeQuery()) {
667                    columnTypes = new HashMap<>();
668                    columnIndices = new HashMap<>();
669                    while (rs.next()) {
670                        String index = rs.getString(1);
671                        String columns = rs.getString(2);
672                        String analyzerName = rs.getString(3);
673                        List<String> columnNames = Arrays.asList(columns.split(","));
674
675                        // find the columns' indices and types
676                        int[] types = new int[columnNames.size()];
677                        int[] indices = new int[columnNames.size()];
678                        int i = 0;
679                        for (String columnName : columnNames) {
680                            types[i] = allColumnTypes.get(columnName).intValue();
681                            indices[i] = allColumnIndices.get(columnName).intValue();
682                            i++;
683                        }
684                        columnTypes.put(index, types);
685                        columnIndices.put(index, indices);
686                        // only one call actually needed for this:
687                        indexName = getIndexName(conn);
688                        indexPath = getIndexPath(conn);
689                        indexWriter = getIndexWriter(indexName, indexPath, analyzerName);
690                    }
691
692                }
693            }
694        }
695
696        /**
697         * Trigger interface.
698         */
699        @Override
700        public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException {
701            if (indexWriter == null) {
702                throw new SQLException("Fulltext index was not initialized");
703            }
704            if (oldRow != null) {
705                delete(oldRow);
706            }
707            if (newRow != null) {
708                insert(newRow);
709            }
710        }
711
712        private void insert(Object[] row) throws SQLException {
713            Document doc = new Document();
714            String key = asString(row[primaryKeyIndex], primaryKeyType);
715            // StringField is not tokenized
716            StringField keyField = new StringField(FIELD_KEY, key, Field.Store.YES);
717            doc.add(keyField);
718
719            // add fulltext for all indexes
720            for (String indexName : columnTypes.keySet()) {
721                int[] types = columnTypes.get(indexName);
722                int[] indices = columnIndices.get(indexName);
723                StringBuilder buf = new StringBuilder();
724                for (int i = 0; i < types.length; i++) {
725                    String data = asString(row[indices[i]], types[i]);
726                    if (i > 0) {
727                        buf.append(' ');
728                    }
729                    buf.append(data);
730                }
731                // TextField is tokenized
732                TextField textField = new TextField(fieldForIndex(indexName), buf.toString(), Field.Store.NO);
733                doc.add(textField);
734            }
735
736            try {
737                indexWriter.addDocument(doc);
738            } catch (IOException e) {
739                throw convertException(e);
740            } catch (org.apache.lucene.store.AlreadyClosedException e) {
741                // DEBUG
742                log.error("org.apache.lucene.store.AlreadyClosedException in thread " + Thread.currentThread().getName()
743                        + ", last close was in thread " + lastIndexWriterCloseThread, lastIndexWriterClose);
744                throw e;
745            }
746        }
747
748        private void delete(Object[] row) throws SQLException {
749            String primaryKey = asString(row[primaryKeyIndex], primaryKeyType);
750            try {
751                indexWriter.deleteDocuments(new Term(FIELD_KEY, primaryKey));
752            } catch (IOException e) {
753                throw convertException(e);
754            }
755        }
756
757        @Override
758        public void close() throws SQLException {
759            if (indexWriter != null) {
760                try {
761                    // DEBUG
762                    lastIndexWriterClose = new RuntimeException("debug stack trace");
763                    lastIndexWriterCloseThread = Thread.currentThread().getName();
764                    indexWriter.close();
765                    indexWriter = null;
766                } catch (IOException e) {
767                    throw convertException(e);
768                } finally {
769                    indexWriters.remove(indexName);
770                }
771            }
772        }
773
774        @Override
775        public void remove() {
776            // ignore
777        }
778    }
779
780}