001/*
002 * Copyright (c) 2006-2014 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the Eclipse Public License v1.0
006 * which accompanies this distribution, and is available at
007 * http://www.eclipse.org/legal/epl-v10.html
008 *
009 * Contributors:
010 *     H2 Group
011 *     Florent Guillaume
012 */
013package org.nuxeo.ecm.core.storage.sql.db;
014
015import java.io.File;
016import java.io.IOException;
017import java.io.Reader;
018import java.lang.reflect.Constructor;
019import java.sql.Clob;
020import java.sql.Connection;
021import java.sql.DatabaseMetaData;
022import java.sql.PreparedStatement;
023import java.sql.ResultSet;
024import java.sql.SQLException;
025import java.sql.Statement;
026import java.sql.Types;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.ConcurrentHashMap;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.apache.lucene.analysis.Analyzer;
039import org.apache.lucene.document.Document;
040import org.apache.lucene.document.Field;
041import org.apache.lucene.document.StringField;
042import org.apache.lucene.document.TextField;
043import org.apache.lucene.index.AtomicReaderContext;
044import org.apache.lucene.index.DirectoryReader;
045import org.apache.lucene.index.IndexReader;
046import org.apache.lucene.index.IndexWriter;
047import org.apache.lucene.index.IndexWriterConfig;
048import org.apache.lucene.index.IndexWriterConfig.OpenMode;
049import org.apache.lucene.index.Term;
050import org.apache.lucene.queryparser.classic.ParseException;
051import org.apache.lucene.queryparser.classic.QueryParser;
052import org.apache.lucene.search.BooleanClause;
053import org.apache.lucene.search.BooleanQuery;
054import org.apache.lucene.search.Collector;
055import org.apache.lucene.search.IndexSearcher;
056import org.apache.lucene.search.Scorer;
057import org.apache.lucene.store.Directory;
058import org.apache.lucene.store.FSDirectory;
059import org.apache.lucene.store.LockObtainFailedException;
060import org.apache.lucene.store.RAMDirectory;
061import org.apache.lucene.util.Version;
062import org.h2.message.DbException;
063import org.h2.store.fs.FileUtils;
064import org.h2.tools.SimpleResultSet;
065import org.h2.util.IOUtils;
066import org.h2.util.StringUtils;
067
068/**
069 * An optimized Lucene-based fulltext indexing trigger and search.
070 */
071public class H2Fulltext {
072
073    private static final Version LUCENE_VERSION = Version.LUCENE_4_10_4;
074
075    private static final Map<String, Analyzer> analyzers = new ConcurrentHashMap<String, Analyzer>();
076
077    private static final Map<String, IndexWriter> indexWriters = new ConcurrentHashMap<String, IndexWriter>();
078
079    private static final String FT_SCHEMA = "NXFT";
080
081    private static final String FT_TABLE = FT_SCHEMA + ".INDEXES";
082
083    private static final String PREFIX = "NXFT_";
084
085    private static final String FIELD_KEY = "KEY";
086
087    private static final String FIELD_TEXT = "TEXT";
088
089    private static final String DEFAULT_INDEX_NAME = "PUBLIC_FULLTEXT_default";
090
091    private static final String COL_KEY = "KEY";
092
093    // Utility class.
094    private H2Fulltext() {
095    }
096
097    /**
098     * Initializes fulltext search functionality for this database. This adds the following Java functions to the
099     * database:
100     * <ul>
101     * <li>NXFT_CREATE_INDEX(nameString, schemaString, tableString, columnListString, analyzerString)</li>
102     * <li>NXFT_REINDEX()</li>
103     * <li>NXFT_DROP_ALL()</li>
104     * <li>NXFT_SEARCH(queryString, limitInt, offsetInt): result set</li>
105     * </ul>
106     * It also adds a schema NXFT to the database where bookkeeping information is stored. This function may be called
107     * from a Java application, or by using the SQL statements:
108     *
109     * <pre>
110     *  CREATE ALIAS IF NOT EXISTS NXFT_INIT FOR
111     *      &quot;org.nuxeo.ecm.core.storage.sql.db.H2Fulltext.init&quot;;
112     *  CALL NXFT_INIT();
113     * </pre>
114     *
115     * @param conn
116     */
117    public static void init(Connection conn) throws SQLException {
118        try (Statement st = conn.createStatement()) {
119            st.execute("CREATE SCHEMA IF NOT EXISTS " + FT_SCHEMA);
120            st.execute("CREATE TABLE IF NOT EXISTS " + FT_TABLE
121                    + "(NAME VARCHAR, SCHEMA VARCHAR, TABLE VARCHAR, COLUMNS VARCHAR, "
122                    + "ANALYZER VARCHAR, PRIMARY KEY(NAME))");
123            // BBB migrate old table without the "NAME" column
124            try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " + "TABLE_SCHEMA = '"
125                    + FT_SCHEMA + "' AND TABLE_NAME = 'INDEXES' AND COLUMN_NAME = 'NAME'")) {
126                if (!rs.next()) {
127                    // BBB no NAME column, alter table to create it
128                    st.execute("ALTER TABLE " + FT_TABLE + " ADD COLUMN NAME VARCHAR");
129                    st.execute("UPDATE " + FT_TABLE + " SET NAME = '" + DEFAULT_INDEX_NAME + "'");
130                }
131            }
132
133            String className = H2Fulltext.class.getName();
134            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "CREATE_INDEX FOR \"" + className + ".createIndex\"");
135            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "REINDEX FOR \"" + className + ".reindex\"");
136            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "DROP_ALL FOR \"" + className + ".dropAll\"");
137            st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "SEARCH FOR \"" + className + ".search\"");
138        }
139    }
140
141    // ----- static methods called directly to initialize fulltext -----
142
143    /**
144     * Creates a fulltext index for a table and column list.
145     * <p>
146     * A table may have any number of indexes at a time, but the index name must be unique. If the index already exists,
147     * nothing is done, otherwise the index is created and populated from existing data.
148     * <p>
149     * Usually called through:
150     *
151     * <pre>
152     *   CALL NXFT_CREATE_INDEX('indexname', 'myschema', 'mytable', ('col1', 'col2'), 'lucene.analyzer');
153     * </pre>
154     *
155     * @param conn the connection
156     * @param indexName the index name
157     * @param schema the schema name of the table
158     * @param table the table name
159     * @param columns the column list
160     * @param analyzer the Lucene fulltext analyzer class
161     */
162    public static void createIndex(Connection conn, String indexName, String schema, String table, String columns,
163            String analyzer) throws SQLException {
164        if (indexName == null) {
165            indexName = DEFAULT_INDEX_NAME;
166        }
167        columns = columns.replace("(", "").replace(")", "").replace(" ", "");
168        try (PreparedStatement ps = conn.prepareStatement("DELETE FROM " + FT_TABLE + " WHERE NAME = ?")) {
169            ps.setString(1, indexName);
170            ps.execute();
171        }
172        try (PreparedStatement ps = conn.prepareStatement("INSERT INTO " + FT_TABLE
173                + "(NAME, SCHEMA, TABLE, COLUMNS, ANALYZER) VALUES(?, ?, ?, ?, ?)")) {
174            ps.setString(1, indexName);
175            ps.setString(2, schema);
176            ps.setString(3, table);
177            ps.setString(4, columns);
178            ps.setString(5, analyzer);
179            ps.execute();
180        }
181        createTrigger(conn, schema, table);
182    }
183
184    /**
185     * Re-creates the fulltext index for this database.
186     */
187    public static void reindex(Connection conn) throws SQLException {
188        removeAllTriggers(conn);
189        removeIndexFiles(conn);
190        try (Statement st = conn.createStatement()) {
191            try (ResultSet rs = st.executeQuery("SELECT * FROM " + FT_TABLE)) {
192                Set<String> done = new HashSet<String>();
193                while (rs.next()) {
194                    String schema = rs.getString("SCHEMA");
195                    String table = rs.getString("TABLE");
196                    String key = schema + '.' + table;
197                    if (!done.add(key)) {
198                        continue;
199                    }
200                    createTrigger(conn, schema, table);
201                    indexExistingRows(conn, schema, table);
202                }
203            }
204        }
205    }
206
207    private static void indexExistingRows(Connection conn, String schema, String table) throws SQLException {
208        Trigger trigger = new Trigger();
209        trigger.init(conn, schema, null, table, false, org.h2.api.Trigger.INSERT);
210        try (Statement st = conn.createStatement()) {
211            try (ResultSet rs = st.executeQuery("SELECT * FROM " + StringUtils.quoteIdentifier(schema) + '.'
212                    + StringUtils.quoteIdentifier(table))) {
213                int n = rs.getMetaData().getColumnCount();
214                while (rs.next()) {
215                    Object[] row = new Object[n];
216                    for (int i = 0; i < n; i++) {
217                        row[i] = rs.getObject(i + 1);
218                    }
219                    trigger.fire(conn, null, row);
220                }
221            }
222        }
223    }
224
225    /**
226     * Creates a trigger for the indexes on a table.
227     * <p>
228     * Usually called through:
229     *
230     * <pre>
231     *   CALL NXFT_CREATE_TRIGGERS('myschema', 'mytable');
232     * </pre>
233     *
234     * @param conn the connection
235     * @param schema the schema name of the table
236     * @param table the table name
237     */
238    private static void createTrigger(Connection conn, String schema, String table) throws SQLException {
239        try (Statement st = conn.createStatement()) {
240            schema = StringUtils.quoteIdentifier(schema);
241            String trigger = schema + '.' + StringUtils.quoteIdentifier(PREFIX + table);
242            st.execute("DROP TRIGGER IF EXISTS " + trigger);
243            st.execute(String.format("CREATE TRIGGER %s " + "AFTER INSERT, UPDATE, DELETE ON %s.%s "
244                    + "FOR EACH ROW CALL \"%s\"", trigger, schema, StringUtils.quoteIdentifier(table),
245                    H2Fulltext.Trigger.class.getName()));
246        }
247    }
248
249    private static void removeAllTriggers(Connection conn) throws SQLException {
250        try (Statement st = conn.createStatement()) {
251            try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.TRIGGERS")) {
252                try (Statement st2 = conn.createStatement()) {
253                    while (rs.next()) {
254                        String trigger = rs.getString("TRIGGER_NAME");
255                        if (trigger.startsWith(PREFIX)) {
256                            st2.execute("DROP TRIGGER " + StringUtils.quoteIdentifier(rs.getString("TRIGGER_SCHEMA"))
257                                    + "." + trigger);
258                        }
259                    }
260                }
261            }
262        }
263    }
264
265    /**
266     * Drops all fulltext indexes from the database.
267     */
268    public static void dropAll(Connection conn) throws SQLException {
269        try (Statement st = conn.createStatement()) {
270            st.execute("DROP SCHEMA IF EXISTS " + FT_SCHEMA);
271        }
272        removeAllTriggers(conn);
273        removeIndexFiles(conn);
274    }
275
276    private static String fieldForIndex(String indexName) {
277        if (DEFAULT_INDEX_NAME.equals(indexName)) {
278            return FIELD_TEXT;
279        } else {
280            return FIELD_TEXT + '_' + indexName;
281        }
282    }
283
284    /**
285     * Searches from the given full text index. The returned result set has a single ID column which holds the keys for
286     * the matching rows.
287     * <p>
288     * Usually called through:
289     *
290     * <pre>
291     *   SELECT * FROM NXFT_SEARCH(name, 'text');
292     * </pre>
293     *
294     * @param conn the connection
295     * @param indexName the index name
296     * @param text the search query
297     * @return the result set
298     */
299    public static ResultSet search(Connection conn, String indexName, String text) throws SQLException {
300        DatabaseMetaData meta = conn.getMetaData();
301        if (indexName == null) {
302            indexName = DEFAULT_INDEX_NAME;
303        }
304
305        String schema;
306        String table;
307        String analyzerName;
308
309        // find schema, table and analyzer
310        try (PreparedStatement ps = conn.prepareStatement("SELECT SCHEMA, TABLE, ANALYZER FROM " + FT_TABLE
311                + " WHERE NAME = ?")) {
312            ps.setString(1, indexName);
313            try (ResultSet res = ps.executeQuery()) {
314                if (!res.next()) {
315                    throw new SQLException("No such index: " + indexName);
316                }
317                schema = res.getString(1);
318                table = res.getString(2);
319                analyzerName = res.getString(3);
320            }
321        }
322
323        int type = getPrimaryKeyType(meta, schema, table);
324        SimpleResultSet rs = new SimpleResultSet();
325        rs.addColumn(COL_KEY, type, 0, 0);
326
327        if (meta.getURL().startsWith("jdbc:columnlist:")) {
328            // this is just to query the result set columns
329            return rs;
330        }
331
332        // flush changes
333        final IndexWriter writer = getIndexWriter(getIndexName(conn), getIndexPath(conn), analyzerName);
334        if (writer.hasUncommittedChanges()) {
335            try {
336                writer.commit();
337            } catch (IOException cause) {
338                throw convertException(cause);
339            }
340        }
341
342        // search index
343        try {
344            BooleanQuery query = new BooleanQuery();
345            String defaultField = fieldForIndex(indexName);
346            Analyzer analyzer = getAnalyzer(analyzerName);
347            QueryParser parser = new QueryParser(LUCENE_VERSION, defaultField, analyzer);
348            query.add(parser.parse(text), BooleanClause.Occur.MUST);
349
350            try (IndexReader reader = DirectoryReader.open(writer.getDirectory())) {
351                IndexSearcher searcher = new IndexSearcher(reader);
352                Collector collector = new ResultSetCollector(rs, reader, type);
353                searcher.search(query, collector);
354            }
355        } catch (SQLException | ParseException | IOException e) {
356            throw convertException(e);
357        }
358        return rs;
359    }
360
361    protected static class ResultSetCollector extends Collector {
362        protected final SimpleResultSet rs;
363
364        protected IndexReader reader;
365
366        protected int type;
367
368        protected int docBase;
369
370        public ResultSetCollector(SimpleResultSet rs, IndexReader reader, int type) {
371            this.rs = rs;
372            this.reader = reader;
373            this.type = type;
374        }
375
376        @Override
377        public void setNextReader(AtomicReaderContext context) {
378            docBase = context.docBase;
379        }
380
381        @Override
382        public void setScorer(Scorer scorer) {
383        }
384
385        @Override
386        public boolean acceptsDocsOutOfOrder() {
387            return true;
388        }
389
390        @Override
391        public void collect(int docID) throws IOException {
392            docID += docBase;
393            Document doc = reader.document(docID, Collections.singleton(FIELD_KEY));
394            Object key;
395            try {
396                key = asObject(doc.get(FIELD_KEY), type);
397                rs.addRow(new Object[] { key });
398            } catch (SQLException e) {
399                throw new IOException(e);
400            }
401        }
402    }
403
404    private static int getPrimaryKeyType(DatabaseMetaData meta, String schema, String table) throws SQLException {
405        // find primary key name
406        String primaryKeyName = null;
407        try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) {
408            while (rs.next()) {
409                if (primaryKeyName != null) {
410                    throw new SQLException("Can only index primary keys on one column for " + schema + '.' + table);
411                }
412                primaryKeyName = rs.getString("COLUMN_NAME");
413            }
414            if (primaryKeyName == null) {
415                throw new SQLException("No primary key for " + schema + '.' + table);
416            }
417        }
418        // find primary key type
419        try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) {
420            if (!rs.next()) {
421                throw new SQLException("Could not find primary key");
422            }
423            return rs.getInt("DATA_TYPE");
424        }
425    }
426
427    private static Analyzer getAnalyzer(String analyzerName) throws SQLException {
428        Analyzer analyzer = analyzers.get(analyzerName);
429        if (analyzer == null) {
430            try {
431                Class<?> klass = Class.forName(analyzerName);
432                Constructor<?> constructor = klass.getConstructor(Version.class);
433                analyzer = (Analyzer) constructor.newInstance(LUCENE_VERSION);
434            } catch (ReflectiveOperationException e) {
435                throw new SQLException(e.toString());
436            }
437            analyzers.put(analyzerName, analyzer);
438        }
439        return analyzer;
440    }
441
442    protected static String getIndexName(Connection conn) throws SQLException {
443        String catalog = conn.getCatalog();
444        if (catalog == null) {
445            catalog = "default";
446        }
447        return catalog;
448    }
449
450    protected static String getIndexPath(Connection conn) throws SQLException {
451        try (Statement st = conn.createStatement()) {
452            try (ResultSet rs = st.executeQuery("CALL DATABASE_PATH()")) {
453                rs.next();
454                String path = rs.getString(1);
455                if (path == null) {
456                    return null;
457                }
458                return path + ".lucene";
459            }
460        }
461
462    }
463
464    private static IndexWriter getIndexWriter(String name, String path, String analyzer) throws SQLException {
465        IndexWriter indexWriter = indexWriters.get(name);
466        if (indexWriter != null) {
467            return indexWriter;
468        }
469        synchronized (indexWriters) {
470            indexWriter = indexWriters.get(name);
471            if (indexWriter != null) {
472                return indexWriter;
473            }
474            try {
475                Directory dir = path == null ? new RAMDirectory() : FSDirectory.open(new File(path));
476                Analyzer an = getAnalyzer(analyzer);
477                IndexWriterConfig iwc = new IndexWriterConfig(LUCENE_VERSION, an);
478                iwc.setOpenMode(OpenMode.CREATE_OR_APPEND);
479                indexWriter = new IndexWriter(dir, iwc);
480            } catch (LockObtainFailedException e) {
481                throw convertException("Cannot open fulltext index " + path, e);
482            } catch (IOException e) {
483                throw convertException(e);
484            }
485            indexWriters.put(name, indexWriter);
486            return indexWriter;
487        }
488    }
489
490    private static void removeIndexFiles(Connection conn) throws SQLException {
491        String path = getIndexPath(conn);
492        try {
493            IndexWriter index = indexWriters.remove(path);
494            if (index != null) {
495                try {
496                    index.close();
497                } catch (IOException e) {
498                    throw convertException(e);
499                }
500            }
501        } finally {
502            FileUtils.deleteRecursive(path, false);
503        }
504    }
505
506    private static SQLException convertException(Exception e) {
507        return convertException("Error while indexing document", e);
508    }
509
510    private static SQLException convertException(String message, Exception e) {
511        SQLException e2 = new SQLException(message);
512        e2.initCause(e);
513        return e2;
514    }
515
516    protected static String asString(Object data, int type) throws SQLException {
517        if (data == null) {
518            return "";
519        }
520        switch (type) {
521        case Types.BIT:
522        case Types.BOOLEAN:
523        case Types.INTEGER:
524        case Types.BIGINT:
525        case Types.DECIMAL:
526        case Types.DOUBLE:
527        case Types.FLOAT:
528        case Types.NUMERIC:
529        case Types.REAL:
530        case Types.SMALLINT:
531        case Types.TINYINT:
532        case Types.DATE:
533        case Types.TIME:
534        case Types.TIMESTAMP:
535        case Types.LONGVARCHAR:
536        case Types.CHAR:
537        case Types.VARCHAR:
538            return data.toString();
539        case Types.CLOB:
540            try {
541                if (data instanceof Clob) {
542                    data = ((Clob) data).getCharacterStream();
543                }
544                return IOUtils.readStringAndClose((Reader) data, -1);
545            } catch (IOException e) {
546                throw DbException.convert(e);
547            }
548        case Types.VARBINARY:
549        case Types.LONGVARBINARY:
550        case Types.BINARY:
551        case Types.JAVA_OBJECT:
552        case Types.OTHER:
553        case Types.BLOB:
554        case Types.STRUCT:
555        case Types.REF:
556        case Types.NULL:
557        case Types.ARRAY:
558        case Types.DATALINK:
559        case Types.DISTINCT:
560            throw new SQLException("Unsupported column data type: " + type);
561        default:
562            return "";
563        }
564    }
565
566    // simple cases only, used for primary key
567    private static Object asObject(String string, int type) throws SQLException {
568        switch (type) {
569        case Types.BIGINT:
570            return Long.valueOf(string);
571        case Types.INTEGER:
572        case Types.SMALLINT:
573        case Types.TINYINT:
574            return Integer.valueOf(string);
575        case Types.LONGVARCHAR:
576        case Types.CHAR:
577        case Types.VARCHAR:
578            return string;
579        default:
580            throw new SQLException("Unsupport data type for primary key: " + type);
581        }
582    }
583
584    /**
585     * Trigger used to update the lucene index upon row change.
586     */
587    public static class Trigger implements org.h2.api.Trigger {
588
589        private static final Log log = LogFactory.getLog(Trigger.class);
590
591        private String indexName;
592
593        private String indexPath;
594
595        private IndexWriter indexWriter;
596
597        // DEBUG
598        private Exception lastIndexWriterClose;
599
600        // DEBUG
601        private String lastIndexWriterCloseThread;
602
603        /** Starting at 0. */
604        private int primaryKeyIndex;
605
606        private int primaryKeyType;
607
608        private Map<String, int[]> columnTypes;
609
610        private Map<String, int[]> columnIndices;
611
612        /**
613         * Trigger interface: initialization.
614         */
615        @Override
616        public void init(Connection conn, String schema, String triggerName, String table, boolean before, int opType)
617                throws SQLException {
618            DatabaseMetaData meta = conn.getMetaData();
619
620            // find primary key name
621            String primaryKeyName = null;
622            try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) {
623                while (rs.next()) {
624                    if (primaryKeyName != null) {
625                        throw new SQLException("Can only index primary keys on one column for: " + schema + '.' + table);
626                    }
627                    primaryKeyName = rs.getString("COLUMN_NAME");
628                }
629                if (primaryKeyName == null) {
630                    throw new SQLException("No primary key for " + schema + '.' + table);
631                }
632            }
633
634            // find primary key type
635            try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) {
636                if (!rs.next()) {
637                    throw new SQLException("No primary key for: " + schema + '.' + table);
638                }
639                primaryKeyType = rs.getInt("DATA_TYPE");
640                primaryKeyIndex = rs.getInt("ORDINAL_POSITION") - 1;
641            }
642
643            // find all columns info
644            Map<String, Integer> allColumnTypes = new HashMap<String, Integer>();
645            Map<String, Integer> allColumnIndices = new HashMap<String, Integer>();
646            try (ResultSet rs = meta.getColumns(null, schema, table, null)) {
647                while (rs.next()) {
648                    String name = rs.getString("COLUMN_NAME");
649                    int type = rs.getInt("DATA_TYPE");
650                    int index = rs.getInt("ORDINAL_POSITION") - 1;
651                    allColumnTypes.put(name, Integer.valueOf(type));
652                    allColumnIndices.put(name, Integer.valueOf(index));
653                }
654            }
655
656            // find columns configured for indexing
657            try (PreparedStatement ps = conn.prepareStatement("SELECT NAME, COLUMNS, ANALYZER FROM " + FT_TABLE
658                    + " WHERE SCHEMA = ? AND TABLE = ?")) {
659                ps.setString(1, schema);
660                ps.setString(2, table);
661                try (ResultSet rs = ps.executeQuery()) {
662                    columnTypes = new HashMap<String, int[]>();
663                    columnIndices = new HashMap<String, int[]>();
664                    while (rs.next()) {
665                        String index = rs.getString(1);
666                        String columns = rs.getString(2);
667                        String analyzerName = rs.getString(3);
668                        List<String> columnNames = Arrays.asList(columns.split(","));
669
670                        // find the columns' indices and types
671                        int[] types = new int[columnNames.size()];
672                        int[] indices = new int[columnNames.size()];
673                        int i = 0;
674                        for (String columnName : columnNames) {
675                            types[i] = allColumnTypes.get(columnName).intValue();
676                            indices[i] = allColumnIndices.get(columnName).intValue();
677                            i++;
678                        }
679                        columnTypes.put(index, types);
680                        columnIndices.put(index, indices);
681                        // only one call actually needed for this:
682                        indexName = getIndexName(conn);
683                        indexPath = getIndexPath(conn);
684                        indexWriter = getIndexWriter(indexName, indexPath, analyzerName);
685                    }
686
687                }
688            }
689        }
690
691        /**
692         * Trigger interface.
693         */
694        @Override
695        public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException {
696            if (indexWriter == null) {
697                throw new SQLException("Fulltext index was not initialized");
698            }
699            if (oldRow != null) {
700                delete(oldRow);
701            }
702            if (newRow != null) {
703                insert(newRow);
704            }
705        }
706
707        private void insert(Object[] row) throws SQLException {
708            Document doc = new Document();
709            String key = asString(row[primaryKeyIndex], primaryKeyType);
710            // StringField is not tokenized
711            StringField keyField = new StringField(FIELD_KEY, key, Field.Store.YES);
712            doc.add(keyField);
713
714            // add fulltext for all indexes
715            for (String indexName : columnTypes.keySet()) {
716                int[] types = columnTypes.get(indexName);
717                int[] indices = columnIndices.get(indexName);
718                StringBuilder buf = new StringBuilder();
719                for (int i = 0; i < types.length; i++) {
720                    String data = asString(row[indices[i]], types[i]);
721                    if (i > 0) {
722                        buf.append(' ');
723                    }
724                    buf.append(data);
725                }
726                // TextField is tokenized
727                TextField textField = new TextField(fieldForIndex(indexName), buf.toString(), Field.Store.NO);
728                doc.add(textField);
729            }
730
731            try {
732                indexWriter.addDocument(doc);
733            } catch (IOException e) {
734                throw convertException(e);
735            } catch (org.apache.lucene.store.AlreadyClosedException e) {
736                // DEBUG
737                log.error(
738                        "org.apache.lucene.store.AlreadyClosedException in thread " + Thread.currentThread().getName()
739                                + ", last close was in thread " + lastIndexWriterCloseThread, lastIndexWriterClose);
740                throw e;
741            }
742        }
743
744        private void delete(Object[] row) throws SQLException {
745            String primaryKey = asString(row[primaryKeyIndex], primaryKeyType);
746            try {
747                indexWriter.deleteDocuments(new Term(FIELD_KEY, primaryKey));
748            } catch (IOException e) {
749                throw convertException(e);
750            }
751        }
752
753        @Override
754        public void close() throws SQLException {
755            if (indexWriter != null) {
756                try {
757                    // DEBUG
758                    lastIndexWriterClose = new RuntimeException("debug stack trace");
759                    lastIndexWriterCloseThread = Thread.currentThread().getName();
760                    indexWriter.close();
761                    indexWriter = null;
762                } catch (IOException e) {
763                    throw convertException(e);
764                } finally {
765                    indexWriters.remove(indexName);
766                }
767            }
768        }
769
770        @Override
771        public void remove() {
772            // ignore
773        }
774    }
775
776}