001/*
002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     H2 Group
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.core.storage.sql.db;
021
022import java.io.File;
023import java.io.IOException;
024import java.io.Reader;
025import java.lang.reflect.Constructor;
026import java.sql.Clob;
027import java.sql.Connection;
028import java.sql.DatabaseMetaData;
029import java.sql.PreparedStatement;
030import java.sql.ResultSet;
031import java.sql.SQLException;
032import java.sql.Statement;
033import java.sql.Types;
034import java.util.Arrays;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.ConcurrentHashMap;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.lucene.analysis.Analyzer;
046import org.apache.lucene.document.Document;
047import org.apache.lucene.document.Field;
048import org.apache.lucene.document.StringField;
049import org.apache.lucene.document.TextField;
050import org.apache.lucene.index.AtomicReaderContext;
051import org.apache.lucene.index.DirectoryReader;
052import org.apache.lucene.index.IndexReader;
053import org.apache.lucene.index.IndexWriter;
054import org.apache.lucene.index.IndexWriterConfig;
055import org.apache.lucene.index.IndexWriterConfig.OpenMode;
056import org.apache.lucene.index.Term;
057import org.apache.lucene.queryparser.classic.ParseException;
058import org.apache.lucene.queryparser.classic.QueryParser;
059import org.apache.lucene.search.BooleanClause;
060import org.apache.lucene.search.BooleanQuery;
061import org.apache.lucene.search.Collector;
062import org.apache.lucene.search.IndexSearcher;
063import org.apache.lucene.search.Scorer;
064import org.apache.lucene.store.Directory;
065import org.apache.lucene.store.FSDirectory;
066import org.apache.lucene.store.LockObtainFailedException;
067import org.apache.lucene.store.RAMDirectory;
068import org.apache.lucene.util.Version;
069import org.h2.message.DbException;
070import org.h2.store.fs.FileUtils;
071import org.h2.tools.SimpleResultSet;
072import org.h2.util.IOUtils;
073import org.h2.util.StringUtils;
074
075/**
076 * An optimized Lucene-based fulltext indexing trigger and search.
077 */
078public class H2Fulltext {
079
080    private static final Version LUCENE_VERSION = Version.LUCENE_4_10_4;
081
082    private static final Map<String, Analyzer> analyzers = new ConcurrentHashMap<String, Analyzer>();
083
084    private static final Map<String, IndexWriter> indexWriters = new ConcurrentHashMap<String, IndexWriter>();
085
086    private static final String FT_SCHEMA = "NXFT";
087
088    private static final String FT_TABLE = FT_SCHEMA + ".INDEXES";
089
090    private static final String PREFIX = "NXFT_";
091
092    private static final String FIELD_KEY = "KEY";
093
094    private static final String FIELD_TEXT = "TEXT";
095
096    private static final String DEFAULT_INDEX_NAME = "PUBLIC_FULLTEXT_default";
097
098    private static final String COL_KEY = "KEY";
099
100    // Utility class.
101    private H2Fulltext() {
102    }
103
104    /**
105     * Initializes fulltext search functionality for this database. This adds the following Java functions to the
106     * database:
107     * <ul>
108     * <li>NXFT_CREATE_INDEX(nameString, schemaString, tableString, columnListString, analyzerString)</li>
109     * <li>NXFT_REINDEX()</li>
110     * <li>NXFT_DROP_ALL()</li>
111     * <li>NXFT_SEARCH(queryString, limitInt, offsetInt): result set</li>
112     * </ul>
113     * It also adds a schema NXFT to the database where bookkeeping information is stored. This function may be called
114     * from a Java application, or by using the SQL statements:
115     *
116     * <pre>
117     *  CREATE ALIAS IF NOT EXISTS NXFT_INIT FOR
118     *      &quot;org.nuxeo.ecm.core.storage.sql.db.H2Fulltext.init&quot;;
119     *  CALL NXFT_INIT();
120     * </pre>
121     *
122     * @param conn
123     */
124    public static void init(Connection conn) throws SQLException {
125        try (Statement st = conn.createStatement()) {
126            st.execute("CREATE SCHEMA IF NOT EXISTS " + FT_SCHEMA);
127            st.execute("CREATE TABLE IF NOT EXISTS " + FT_TABLE
128                    + "(NAME VARCHAR, SCHEMA VARCHAR, TABLE VARCHAR, COLUMNS VARCHAR, "
129                    + "ANALYZER VARCHAR, PRIMARY KEY(NAME))");
130            // BBB migrate old table without the "NAME" column
131            try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " + "TABLE_SCHEMA = '"
132                    + FT_SCHEMA + "' AND TABLE_NAME = 'INDEXES' AND COLUMN_NAME = 'NAME'")) {
133                if (!rs.next()) {
134                    // BBB no NAME column, alter table to create it
135                    st.execute("ALTER TABLE " + FT_TABLE + " ADD COLUMN NAME VARCHAR");
136                    st.execute("UPDATE " + FT_TABLE + " SET NAME = '" + DEFAULT_INDEX_NAME + "'");
137                }
138            }
139
140            String className = H2Fulltext.class.getName();
141            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "CREATE_INDEX FOR \"" + className + ".createIndex\"");
142            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "REINDEX FOR \"" + className + ".reindex\"");
143            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "DROP_ALL FOR \"" + className + ".dropAll\"");
144            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "SEARCH FOR \"" + className + ".search\"");
145        }
146    }
147
148    // ----- static methods called directly to initialize fulltext -----
149
150    /**
151     * Creates a fulltext index for a table and column list.
152     * <p>
153     * A table may have any number of indexes at a time, but the index name must be unique. If the index already exists,
154     * nothing is done, otherwise the index is created and populated from existing data.
155     * <p>
156     * Usually called through:
157     *
158     * <pre>
159     *   CALL NXFT_CREATE_INDEX('indexname', 'myschema', 'mytable', ('col1', 'col2'), 'lucene.analyzer');
160     * </pre>
161     *
162     * @param conn the connection
163     * @param indexName the index name
164     * @param schema the schema name of the table
165     * @param table the table name
166     * @param columns the column list
167     * @param analyzer the Lucene fulltext analyzer class
168     */
169    public static void createIndex(Connection conn, String indexName, String schema, String table, String columns,
170            String analyzer) throws SQLException {
171        if (indexName == null) {
172            indexName = DEFAULT_INDEX_NAME;
173        }
174        columns = columns.replace("(", "").replace(")", "").replace(" ", "");
175        try (PreparedStatement ps = conn.prepareStatement("DELETE FROM " + FT_TABLE + " WHERE NAME = ?")) {
176            ps.setString(1, indexName);
177            ps.execute();
178        }
179        try (PreparedStatement ps = conn.prepareStatement("INSERT INTO " + FT_TABLE
180                + "(NAME, SCHEMA, TABLE, COLUMNS, ANALYZER) VALUES(?, ?, ?, ?, ?)")) {
181            ps.setString(1, indexName);
182            ps.setString(2, schema);
183            ps.setString(3, table);
184            ps.setString(4, columns);
185            ps.setString(5, analyzer);
186            ps.execute();
187        }
188        createTrigger(conn, schema, table);
189    }
190
191    /**
192     * Re-creates the fulltext index for this database.
193     */
194    public static void reindex(Connection conn) throws SQLException {
195        removeAllTriggers(conn);
196        removeIndexFiles(conn);
197        try (Statement st = conn.createStatement()) {
198            try (ResultSet rs = st.executeQuery("SELECT * FROM " + FT_TABLE)) {
199                Set<String> done = new HashSet<String>();
200                while (rs.next()) {
201                    String schema = rs.getString("SCHEMA");
202                    String table = rs.getString("TABLE");
203                    String key = schema + '.' + table;
204                    if (!done.add(key)) {
205                        continue;
206                    }
207                    createTrigger(conn, schema, table);
208                    indexExistingRows(conn, schema, table);
209                }
210            }
211        }
212    }
213
214    private static void indexExistingRows(Connection conn, String schema, String table) throws SQLException {
215        Trigger trigger = new Trigger();
216        trigger.init(conn, schema, null, table, false, org.h2.api.Trigger.INSERT);
217        try (Statement st = conn.createStatement()) {
218            try (ResultSet rs = st.executeQuery("SELECT * FROM " + StringUtils.quoteIdentifier(schema) + '.'
219                    + StringUtils.quoteIdentifier(table))) {
220                int n = rs.getMetaData().getColumnCount();
221                while (rs.next()) {
222                    Object[] row = new Object[n];
223                    for (int i = 0; i < n; i++) {
224                        row[i] = rs.getObject(i + 1);
225                    }
226                    trigger.fire(conn, null, row);
227                }
228            }
229        }
230    }
231
232    /**
233     * Creates a trigger for the indexes on a table.
234     * <p>
235     * Usually called through:
236     *
237     * <pre>
238     *   CALL NXFT_CREATE_TRIGGERS('myschema', 'mytable');
239     * </pre>
240     *
241     * @param conn the connection
242     * @param schema the schema name of the table
243     * @param table the table name
244     */
245    private static void createTrigger(Connection conn, String schema, String table) throws SQLException {
246        try (Statement st = conn.createStatement()) {
247            schema = StringUtils.quoteIdentifier(schema);
248            String trigger = schema + '.' + StringUtils.quoteIdentifier(PREFIX + table);
249            st.execute("DROP TRIGGER IF EXISTS " + trigger);
250            st.execute(String.format("CREATE TRIGGER %s " + "AFTER INSERT, UPDATE, DELETE ON %s.%s "
251                    + "FOR EACH ROW CALL \"%s\"", trigger, schema, StringUtils.quoteIdentifier(table),
252                    H2Fulltext.Trigger.class.getName()));
253        }
254    }
255
256    private static void removeAllTriggers(Connection conn) throws SQLException {
257        try (Statement st = conn.createStatement()) {
258            try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.TRIGGERS")) {
259                try (Statement st2 = conn.createStatement()) {
260                    while (rs.next()) {
261                        String trigger = rs.getString("TRIGGER_NAME");
262                        if (trigger.startsWith(PREFIX)) {
263                            st2.execute("DROP TRIGGER " + StringUtils.quoteIdentifier(rs.getString("TRIGGER_SCHEMA"))
264                                    + "." + trigger);
265                        }
266                    }
267                }
268            }
269        }
270    }
271
272    /**
273     * Drops all fulltext indexes from the database.
274     */
275    public static void dropAll(Connection conn) throws SQLException {
276        try (Statement st = conn.createStatement()) {
277            st.execute("DROP SCHEMA IF EXISTS " + FT_SCHEMA);
278        }
279        removeAllTriggers(conn);
280        removeIndexFiles(conn);
281    }
282
283    private static String fieldForIndex(String indexName) {
284        if (DEFAULT_INDEX_NAME.equals(indexName)) {
285            return FIELD_TEXT;
286        } else {
287            return FIELD_TEXT + '_' + indexName;
288        }
289    }
290
291    /**
292     * Searches from the given full text index. The returned result set has a single ID column which holds the keys for
293     * the matching rows.
294     * <p>
295     * Usually called through:
296     *
297     * <pre>
298     *   SELECT * FROM NXFT_SEARCH(name, 'text');
299     * </pre>
300     *
301     * @param conn the connection
302     * @param indexName the index name
303     * @param text the search query
304     * @return the result set
305     */
306    public static ResultSet search(Connection conn, String indexName, String text) throws SQLException {
307        DatabaseMetaData meta = conn.getMetaData();
308        if (indexName == null) {
309            indexName = DEFAULT_INDEX_NAME;
310        }
311
312        String schema;
313        String table;
314        String analyzerName;
315
316        // find schema, table and analyzer
317        try (PreparedStatement ps = conn.prepareStatement("SELECT SCHEMA, TABLE, ANALYZER FROM " + FT_TABLE
318                + " WHERE NAME = ?")) {
319            ps.setString(1, indexName);
320            try (ResultSet res = ps.executeQuery()) {
321                if (!res.next()) {
322                    throw new SQLException("No such index: " + indexName);
323                }
324                schema = res.getString(1);
325                table = res.getString(2);
326                analyzerName = res.getString(3);
327            }
328        }
329
330        int type = getPrimaryKeyType(meta, schema, table);
331        SimpleResultSet rs = new SimpleResultSet();
332        rs.addColumn(COL_KEY, type, 0, 0);
333
334        if (meta.getURL().startsWith("jdbc:columnlist:")) {
335            // this is just to query the result set columns
336            return rs;
337        }
338
339        // flush changes
340        final IndexWriter writer = getIndexWriter(getIndexName(conn), getIndexPath(conn), analyzerName);
341        if (writer.hasUncommittedChanges()) {
342            try {
343                writer.commit();
344            } catch (IOException cause) {
345                throw convertException(cause);
346            }
347        }
348
349        // search index
350        try {
351            BooleanQuery query = new BooleanQuery();
352            String defaultField = fieldForIndex(indexName);
353            Analyzer analyzer = getAnalyzer(analyzerName);
354            QueryParser parser = new QueryParser(LUCENE_VERSION, defaultField, analyzer);
355            query.add(parser.parse(text), BooleanClause.Occur.MUST);
356
357            try (IndexReader reader = DirectoryReader.open(writer.getDirectory())) {
358                IndexSearcher searcher = new IndexSearcher(reader);
359                Collector collector = new ResultSetCollector(rs, reader, type);
360                searcher.search(query, collector);
361            }
362        } catch (SQLException | ParseException | IOException e) {
363            throw convertException(e);
364        }
365        return rs;
366    }
367
368    protected static class ResultSetCollector extends Collector {
369        protected final SimpleResultSet rs;
370
371        protected IndexReader reader;
372
373        protected int type;
374
375        protected int docBase;
376
377        public ResultSetCollector(SimpleResultSet rs, IndexReader reader, int type) {
378            this.rs = rs;
379            this.reader = reader;
380            this.type = type;
381        }
382
383        @Override
384        public void setNextReader(AtomicReaderContext context) {
385            docBase = context.docBase;
386        }
387
388        @Override
389        public void setScorer(Scorer scorer) {
390        }
391
392        @Override
393        public boolean acceptsDocsOutOfOrder() {
394            return true;
395        }
396
397        @Override
398        public void collect(int docID) throws IOException {
399            docID += docBase;
400            Document doc = reader.document(docID, Collections.singleton(FIELD_KEY));
401            Object key;
402            try {
403                key = asObject(doc.get(FIELD_KEY), type);
404                rs.addRow(new Object[] { key });
405            } catch (SQLException e) {
406                throw new IOException(e);
407            }
408        }
409    }
410
411    private static int getPrimaryKeyType(DatabaseMetaData meta, String schema, String table) throws SQLException {
412        // find primary key name
413        String primaryKeyName = null;
414        try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) {
415            while (rs.next()) {
416                if (primaryKeyName != null) {
417                    throw new SQLException("Can only index primary keys on one column for " + schema + '.' + table);
418                }
419                primaryKeyName = rs.getString("COLUMN_NAME");
420            }
421            if (primaryKeyName == null) {
422                throw new SQLException("No primary key for " + schema + '.' + table);
423            }
424        }
425        // find primary key type
426        try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) {
427            if (!rs.next()) {
428                throw new SQLException("Could not find primary key");
429            }
430            return rs.getInt("DATA_TYPE");
431        }
432    }
433
434    private static Analyzer getAnalyzer(String analyzerName) throws SQLException {
435        Analyzer analyzer = analyzers.get(analyzerName);
436        if (analyzer == null) {
437            try {
438                Class<?> klass = Class.forName(analyzerName);
439                Constructor<?> constructor = klass.getConstructor(Version.class);
440                analyzer = (Analyzer) constructor.newInstance(LUCENE_VERSION);
441            } catch (ReflectiveOperationException e) {
442                throw new SQLException(e.toString());
443            }
444            analyzers.put(analyzerName, analyzer);
445        }
446        return analyzer;
447    }
448
449    protected static String getIndexName(Connection conn) throws SQLException {
450        String catalog = conn.getCatalog();
451        if (catalog == null) {
452            catalog = "default";
453        }
454        return catalog;
455    }
456
457    protected static String getIndexPath(Connection conn) throws SQLException {
458        try (Statement st = conn.createStatement()) {
459            try (ResultSet rs = st.executeQuery("CALL DATABASE_PATH()")) {
460                rs.next();
461                String path = rs.getString(1);
462                if (path == null) {
463                    return null;
464                }
465                return path + ".lucene";
466            }
467        }
468
469    }
470
471    private static IndexWriter getIndexWriter(String name, String path, String analyzer) throws SQLException {
472        IndexWriter indexWriter = indexWriters.get(name);
473        if (indexWriter != null) {
474            return indexWriter;
475        }
476        synchronized (indexWriters) {
477            indexWriter = indexWriters.get(name);
478            if (indexWriter != null) {
479                return indexWriter;
480            }
481            try {
482                Directory dir = path == null ? new RAMDirectory() : FSDirectory.open(new File(path));
483                Analyzer an = getAnalyzer(analyzer);
484                IndexWriterConfig iwc = new IndexWriterConfig(LUCENE_VERSION, an);
485                iwc.setOpenMode(OpenMode.CREATE_OR_APPEND);
486                indexWriter = new IndexWriter(dir, iwc);
487            } catch (LockObtainFailedException e) {
488                throw convertException("Cannot open fulltext index " + path, e);
489            } catch (IOException e) {
490                throw convertException(e);
491            }
492            indexWriters.put(name, indexWriter);
493            return indexWriter;
494        }
495    }
496
497    private static void removeIndexFiles(Connection conn) throws SQLException {
498        String path = getIndexPath(conn);
499        try {
500            IndexWriter index = indexWriters.remove(path);
501            if (index != null) {
502                try {
503                    index.close();
504                } catch (IOException e) {
505                    throw convertException(e);
506                }
507            }
508        } finally {
509            FileUtils.deleteRecursive(path, false);
510        }
511    }
512
513    private static SQLException convertException(Exception e) {
514        return convertException("Error while indexing document", e);
515    }
516
517    private static SQLException convertException(String message, Exception e) {
518        SQLException e2 = new SQLException(message);
519        e2.initCause(e);
520        return e2;
521    }
522
523    protected static String asString(Object data, int type) throws SQLException {
524        if (data == null) {
525            return "";
526        }
527        switch (type) {
528        case Types.BIT:
529        case Types.BOOLEAN:
530        case Types.INTEGER:
531        case Types.BIGINT:
532        case Types.DECIMAL:
533        case Types.DOUBLE:
534        case Types.FLOAT:
535        case Types.NUMERIC:
536        case Types.REAL:
537        case Types.SMALLINT:
538        case Types.TINYINT:
539        case Types.DATE:
540        case Types.TIME:
541        case Types.TIMESTAMP:
542        case Types.LONGVARCHAR:
543        case Types.CHAR:
544        case Types.VARCHAR:
545            return data.toString();
546        case Types.CLOB:
547            try {
548                if (data instanceof Clob) {
549                    data = ((Clob) data).getCharacterStream();
550                }
551                return IOUtils.readStringAndClose((Reader) data, -1);
552            } catch (IOException e) {
553                throw DbException.convert(e);
554            }
555        case Types.VARBINARY:
556        case Types.LONGVARBINARY:
557        case Types.BINARY:
558        case Types.JAVA_OBJECT:
559        case Types.OTHER:
560        case Types.BLOB:
561        case Types.STRUCT:
562        case Types.REF:
563        case Types.NULL:
564        case Types.ARRAY:
565        case Types.DATALINK:
566        case Types.DISTINCT:
567            throw new SQLException("Unsupported column data type: " + type);
568        default:
569            return "";
570        }
571    }
572
573    // simple cases only, used for primary key
574    private static Object asObject(String string, int type) throws SQLException {
575        switch (type) {
576        case Types.BIGINT:
577            return Long.valueOf(string);
578        case Types.INTEGER:
579        case Types.SMALLINT:
580        case Types.TINYINT:
581            return Integer.valueOf(string);
582        case Types.LONGVARCHAR:
583        case Types.CHAR:
584        case Types.VARCHAR:
585            return string;
586        default:
587            throw new SQLException("Unsupport data type for primary key: " + type);
588        }
589    }
590
591    /**
592     * Trigger used to update the lucene index upon row change.
593     */
594    public static class Trigger implements org.h2.api.Trigger {
595
596        private static final Log log = LogFactory.getLog(Trigger.class);
597
598        private String indexName;
599
600        private String indexPath;
601
602        private IndexWriter indexWriter;
603
604        // DEBUG
605        private Exception lastIndexWriterClose;
606
607        // DEBUG
608        private String lastIndexWriterCloseThread;
609
610        /** Starting at 0. */
611        private int primaryKeyIndex;
612
613        private int primaryKeyType;
614
615        private Map<String, int[]> columnTypes;
616
617        private Map<String, int[]> columnIndices;
618
619        /**
620         * Trigger interface: initialization.
621         */
622        @Override
623        public void init(Connection conn, String schema, String triggerName, String table, boolean before, int opType)
624                throws SQLException {
625            DatabaseMetaData meta = conn.getMetaData();
626
627            // find primary key name
628            String primaryKeyName = null;
629            try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) {
630                while (rs.next()) {
631                    if (primaryKeyName != null) {
632                        throw new SQLException("Can only index primary keys on one column for: " + schema + '.' + table);
633                    }
634                    primaryKeyName = rs.getString("COLUMN_NAME");
635                }
636                if (primaryKeyName == null) {
637                    throw new SQLException("No primary key for " + schema + '.' + table);
638                }
639            }
640
641            // find primary key type
642            try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) {
643                if (!rs.next()) {
644                    throw new SQLException("No primary key for: " + schema + '.' + table);
645                }
646                primaryKeyType = rs.getInt("DATA_TYPE");
647                primaryKeyIndex = rs.getInt("ORDINAL_POSITION") - 1;
648            }
649
650            // find all columns info
651            Map<String, Integer> allColumnTypes = new HashMap<String, Integer>();
652            Map<String, Integer> allColumnIndices = new HashMap<String, Integer>();
653            try (ResultSet rs = meta.getColumns(null, schema, table, null)) {
654                while (rs.next()) {
655                    String name = rs.getString("COLUMN_NAME");
656                    int type = rs.getInt("DATA_TYPE");
657                    int index = rs.getInt("ORDINAL_POSITION") - 1;
658                    allColumnTypes.put(name, Integer.valueOf(type));
659                    allColumnIndices.put(name, Integer.valueOf(index));
660                }
661            }
662
663            // find columns configured for indexing
664            try (PreparedStatement ps = conn.prepareStatement("SELECT NAME, COLUMNS, ANALYZER FROM " + FT_TABLE
665                    + " WHERE SCHEMA = ? AND TABLE = ?")) {
666                ps.setString(1, schema);
667                ps.setString(2, table);
668                try (ResultSet rs = ps.executeQuery()) {
669                    columnTypes = new HashMap<String, int[]>();
670                    columnIndices = new HashMap<String, int[]>();
671                    while (rs.next()) {
672                        String index = rs.getString(1);
673                        String columns = rs.getString(2);
674                        String analyzerName = rs.getString(3);
675                        List<String> columnNames = Arrays.asList(columns.split(","));
676
677                        // find the columns' indices and types
678                        int[] types = new int[columnNames.size()];
679                        int[] indices = new int[columnNames.size()];
680                        int i = 0;
681                        for (String columnName : columnNames) {
682                            types[i] = allColumnTypes.get(columnName).intValue();
683                            indices[i] = allColumnIndices.get(columnName).intValue();
684                            i++;
685                        }
686                        columnTypes.put(index, types);
687                        columnIndices.put(index, indices);
688                        // only one call actually needed for this:
689                        indexName = getIndexName(conn);
690                        indexPath = getIndexPath(conn);
691                        indexWriter = getIndexWriter(indexName, indexPath, analyzerName);
692                    }
693
694                }
695            }
696        }
697
698        /**
699         * Trigger interface.
700         */
701        @Override
702        public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException {
703            if (indexWriter == null) {
704                throw new SQLException("Fulltext index was not initialized");
705            }
706            if (oldRow != null) {
707                delete(oldRow);
708            }
709            if (newRow != null) {
710                insert(newRow);
711            }
712        }
713
714        private void insert(Object[] row) throws SQLException {
715            Document doc = new Document();
716            String key = asString(row[primaryKeyIndex], primaryKeyType);
717            // StringField is not tokenized
718            StringField keyField = new StringField(FIELD_KEY, key, Field.Store.YES);
719            doc.add(keyField);
720
721            // add fulltext for all indexes
722            for (String indexName : columnTypes.keySet()) {
723                int[] types = columnTypes.get(indexName);
724                int[] indices = columnIndices.get(indexName);
725                StringBuilder buf = new StringBuilder();
726                for (int i = 0; i < types.length; i++) {
727                    String data = asString(row[indices[i]], types[i]);
728                    if (i > 0) {
729                        buf.append(' ');
730                    }
731                    buf.append(data);
732                }
733                // TextField is tokenized
734                TextField textField = new TextField(fieldForIndex(indexName), buf.toString(), Field.Store.NO);
735                doc.add(textField);
736            }
737
738            try {
739                indexWriter.addDocument(doc);
740            } catch (IOException e) {
741                throw convertException(e);
742            } catch (org.apache.lucene.store.AlreadyClosedException e) {
743                // DEBUG
744                log.error(
745                        "org.apache.lucene.store.AlreadyClosedException in thread " + Thread.currentThread().getName()
746                                + ", last close was in thread " + lastIndexWriterCloseThread, lastIndexWriterClose);
747                throw e;
748            }
749        }
750
751        private void delete(Object[] row) throws SQLException {
752            String primaryKey = asString(row[primaryKeyIndex], primaryKeyType);
753            try {
754                indexWriter.deleteDocuments(new Term(FIELD_KEY, primaryKey));
755            } catch (IOException e) {
756                throw convertException(e);
757            }
758        }
759
760        @Override
761        public void close() throws SQLException {
762            if (indexWriter != null) {
763                try {
764                    // DEBUG
765                    lastIndexWriterClose = new RuntimeException("debug stack trace");
766                    lastIndexWriterCloseThread = Thread.currentThread().getName();
767                    indexWriter.close();
768                    indexWriter = null;
769                } catch (IOException e) {
770                    throw convertException(e);
771                } finally {
772                    indexWriters.remove(indexName);
773                }
774            }
775        }
776
777        @Override
778        public void remove() {
779            // ignore
780        }
781    }
782
783}