001/* 002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * H2 Group 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.sql.db; 021 022import java.io.IOException; 023import java.io.Reader; 024import java.lang.reflect.Constructor; 025import java.nio.file.Paths; 026import java.sql.Clob; 027import java.sql.Connection; 028import java.sql.DatabaseMetaData; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.sql.Types; 034import java.util.Arrays; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.ConcurrentHashMap; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.lucene.analysis.Analyzer; 046import org.apache.lucene.document.Document; 047import org.apache.lucene.document.Field; 048import org.apache.lucene.document.StringField; 049import org.apache.lucene.document.TextField; 050import org.apache.lucene.index.DirectoryReader; 051import org.apache.lucene.index.IndexReader; 052import org.apache.lucene.index.IndexWriter; 053import org.apache.lucene.index.IndexWriterConfig; 054import org.apache.lucene.index.IndexWriterConfig.OpenMode; 055import org.apache.lucene.index.LeafReaderContext; 056import org.apache.lucene.index.Term; 057import org.apache.lucene.queryparser.classic.ParseException; 058import org.apache.lucene.queryparser.classic.QueryParser; 059import org.apache.lucene.search.BooleanClause; 060import org.apache.lucene.search.BooleanQuery; 061import org.apache.lucene.search.Collector; 062import org.apache.lucene.search.IndexSearcher; 063import org.apache.lucene.search.LeafCollector; 064import org.apache.lucene.search.Scorer; 065import org.apache.lucene.store.Directory; 066import org.apache.lucene.store.FSDirectory; 067import org.apache.lucene.store.LockObtainFailedException; 068import org.apache.lucene.store.RAMDirectory; 069import org.h2.message.DbException; 070import org.h2.store.fs.FileUtils; 071import org.h2.tools.SimpleResultSet; 072import org.h2.util.IOUtils; 073import org.h2.util.StringUtils; 074 075/** 076 * An optimized Lucene-based fulltext indexing trigger and search. 077 */ 078public class H2Fulltext { 079 080 private static final Map<String, Analyzer> analyzers = new ConcurrentHashMap<>(); 081 082 private static final Map<String, IndexWriter> indexWriters = new ConcurrentHashMap<>(); 083 084 private static final String FT_SCHEMA = "NXFT"; 085 086 private static final String FT_TABLE = FT_SCHEMA + ".INDEXES"; 087 088 private static final String PREFIX = "NXFT_"; 089 090 private static final String FIELD_KEY = "KEY"; 091 092 private static final String FIELD_TEXT = "TEXT"; 093 094 private static final String DEFAULT_INDEX_NAME = "PUBLIC_FULLTEXT_default"; 095 096 private static final String COL_KEY = "KEY"; 097 098 // Utility class. 099 private H2Fulltext() { 100 } 101 102 /** 103 * Initializes fulltext search functionality for this database. This adds the following Java functions to the 104 * database: 105 * <ul> 106 * <li>NXFT_CREATE_INDEX(nameString, schemaString, tableString, columnListString, analyzerString)</li> 107 * <li>NXFT_REINDEX()</li> 108 * <li>NXFT_DROP_ALL()</li> 109 * <li>NXFT_SEARCH(queryString, limitInt, offsetInt): result set</li> 110 * </ul> 111 * It also adds a schema NXFT to the database where bookkeeping information is stored. This function may be called 112 * from a Java application, or by using the SQL statements: 113 * 114 * <pre> 115 * CREATE ALIAS IF NOT EXISTS NXFT_INIT FOR 116 * "org.nuxeo.ecm.core.storage.sql.db.H2Fulltext.init"; 117 * CALL NXFT_INIT(); 118 * </pre> 119 */ 120 public static void init(Connection conn) throws SQLException { 121 try (Statement st = conn.createStatement()) { 122 st.execute("CREATE SCHEMA IF NOT EXISTS " + FT_SCHEMA); 123 st.execute("CREATE TABLE IF NOT EXISTS " + FT_TABLE 124 + "(NAME VARCHAR, SCHEMA VARCHAR, TABLE VARCHAR, COLUMNS VARCHAR, " 125 + "ANALYZER VARCHAR, PRIMARY KEY(NAME))"); 126 // BBB migrate old table without the "NAME" column 127 try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " + "TABLE_SCHEMA = '" 128 + FT_SCHEMA + "' AND TABLE_NAME = 'INDEXES' AND COLUMN_NAME = 'NAME'")) { 129 if (!rs.next()) { 130 // BBB no NAME column, alter table to create it 131 st.execute("ALTER TABLE " + FT_TABLE + " ADD COLUMN NAME VARCHAR"); 132 st.execute("UPDATE " + FT_TABLE + " SET NAME = '" + DEFAULT_INDEX_NAME + "'"); 133 } 134 } 135 136 String className = H2Fulltext.class.getName(); 137 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "CREATE_INDEX FOR \"" + className + ".createIndex\""); 138 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "REINDEX FOR \"" + className + ".reindex\""); 139 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "DROP_ALL FOR \"" + className + ".dropAll\""); 140 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "SEARCH FOR \"" + className + ".search\""); 141 } 142 } 143 144 // ----- static methods called directly to initialize fulltext ----- 145 146 /** 147 * Creates a fulltext index for a table and column list. 148 * <p> 149 * A table may have any number of indexes at a time, but the index name must be unique. If the index already exists, 150 * nothing is done, otherwise the index is created and populated from existing data. 151 * <p> 152 * Usually called through: 153 * 154 * <pre> 155 * CALL NXFT_CREATE_INDEX('indexname', 'myschema', 'mytable', ('col1', 'col2'), 'lucene.analyzer'); 156 * </pre> 157 * 158 * @param conn the connection 159 * @param indexName the index name 160 * @param schema the schema name of the table 161 * @param table the table name 162 * @param columns the column list 163 * @param analyzer the Lucene fulltext analyzer class 164 */ 165 public static void createIndex(Connection conn, String indexName, String schema, String table, String columns, 166 String analyzer) throws SQLException { 167 if (indexName == null) { 168 indexName = DEFAULT_INDEX_NAME; 169 } 170 columns = columns.replace("(", "").replace(")", "").replace(" ", ""); 171 try (PreparedStatement ps = conn.prepareStatement("DELETE FROM " + FT_TABLE + " WHERE NAME = ?")) { 172 ps.setString(1, indexName); 173 ps.execute(); 174 } 175 try (PreparedStatement ps = conn.prepareStatement( 176 "INSERT INTO " + FT_TABLE + "(NAME, SCHEMA, TABLE, COLUMNS, ANALYZER) VALUES(?, ?, ?, ?, ?)")) { 177 ps.setString(1, indexName); 178 ps.setString(2, schema); 179 ps.setString(3, table); 180 ps.setString(4, columns); 181 ps.setString(5, analyzer); 182 ps.execute(); 183 } 184 createTrigger(conn, schema, table); 185 } 186 187 /** 188 * Re-creates the fulltext index for this database. 189 */ 190 public static void reindex(Connection conn) throws SQLException { 191 removeAllTriggers(conn); 192 removeIndexFiles(conn); 193 try (Statement st = conn.createStatement()) { 194 try (ResultSet rs = st.executeQuery("SELECT * FROM " + FT_TABLE)) { 195 Set<String> done = new HashSet<>(); 196 while (rs.next()) { 197 String schema = rs.getString("SCHEMA"); 198 String table = rs.getString("TABLE"); 199 String key = schema + '.' + table; 200 if (!done.add(key)) { 201 continue; 202 } 203 createTrigger(conn, schema, table); 204 indexExistingRows(conn, schema, table); 205 } 206 } 207 } 208 } 209 210 private static void indexExistingRows(Connection conn, String schema, String table) throws SQLException { 211 Trigger trigger = new Trigger(); 212 trigger.init(conn, schema, null, table, false, org.h2.api.Trigger.INSERT); 213 try (Statement st = conn.createStatement()) { 214 try (ResultSet rs = st.executeQuery("SELECT * FROM " + StringUtils.quoteIdentifier(schema) + '.' 215 + StringUtils.quoteIdentifier(table))) { 216 int n = rs.getMetaData().getColumnCount(); 217 while (rs.next()) { 218 Object[] row = new Object[n]; 219 for (int i = 0; i < n; i++) { 220 row[i] = rs.getObject(i + 1); 221 } 222 trigger.fire(conn, null, row); 223 } 224 } 225 } 226 } 227 228 /** 229 * Creates a trigger for the indexes on a table. 230 * <p> 231 * Usually called through: 232 * 233 * <pre> 234 * CALL NXFT_CREATE_TRIGGERS('myschema', 'mytable'); 235 * </pre> 236 * 237 * @param conn the connection 238 * @param schema the schema name of the table 239 * @param table the table name 240 */ 241 private static void createTrigger(Connection conn, String schema, String table) throws SQLException { 242 try (Statement st = conn.createStatement()) { 243 schema = StringUtils.quoteIdentifier(schema); 244 String trigger = schema + '.' + StringUtils.quoteIdentifier(PREFIX + table); 245 st.execute("DROP TRIGGER IF EXISTS " + trigger); 246 st.execute(String.format( 247 "CREATE TRIGGER %s " + "AFTER INSERT, UPDATE, DELETE ON %s.%s " + "FOR EACH ROW CALL \"%s\"", 248 trigger, schema, StringUtils.quoteIdentifier(table), H2Fulltext.Trigger.class.getName())); 249 } 250 } 251 252 private static void removeAllTriggers(Connection conn) throws SQLException { 253 try (Statement st = conn.createStatement()) { 254 try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.TRIGGERS")) { 255 try (Statement st2 = conn.createStatement()) { 256 while (rs.next()) { 257 String trigger = rs.getString("TRIGGER_NAME"); 258 if (trigger.startsWith(PREFIX)) { 259 st2.execute("DROP TRIGGER " + StringUtils.quoteIdentifier(rs.getString("TRIGGER_SCHEMA")) 260 + "." + trigger); 261 } 262 } 263 } 264 } 265 } 266 } 267 268 /** 269 * Drops all fulltext indexes from the database. 270 */ 271 public static void dropAll(Connection conn) throws SQLException { 272 try (Statement st = conn.createStatement()) { 273 st.execute("DROP SCHEMA IF EXISTS " + FT_SCHEMA); 274 } 275 removeAllTriggers(conn); 276 removeIndexFiles(conn); 277 } 278 279 private static String fieldForIndex(String indexName) { 280 if (DEFAULT_INDEX_NAME.equals(indexName)) { 281 return FIELD_TEXT; 282 } else { 283 return FIELD_TEXT + '_' + indexName; 284 } 285 } 286 287 /** 288 * Searches from the given full text index. The returned result set has a single ID column which holds the keys for 289 * the matching rows. 290 * <p> 291 * Usually called through: 292 * 293 * <pre> 294 * SELECT * FROM NXFT_SEARCH(name, 'text'); 295 * </pre> 296 * 297 * @param conn the connection 298 * @param indexName the index name 299 * @param text the search query 300 * @return the result set 301 */ 302 public static ResultSet search(Connection conn, String indexName, String text) throws SQLException { 303 DatabaseMetaData meta = conn.getMetaData(); 304 if (indexName == null) { 305 indexName = DEFAULT_INDEX_NAME; 306 } 307 308 String schema; 309 String table; 310 String analyzerName; 311 312 // find schema, table and analyzer 313 try (PreparedStatement ps = conn.prepareStatement( 314 "SELECT SCHEMA, TABLE, ANALYZER FROM " + FT_TABLE + " WHERE NAME = ?")) { 315 ps.setString(1, indexName); 316 try (ResultSet res = ps.executeQuery()) { 317 if (!res.next()) { 318 throw new SQLException("No such index: " + indexName); 319 } 320 schema = res.getString(1); 321 table = res.getString(2); 322 analyzerName = res.getString(3); 323 } 324 } 325 326 int type = getPrimaryKeyType(meta, schema, table); 327 SimpleResultSet rs = new SimpleResultSet(); 328 rs.addColumn(COL_KEY, type, 0, 0); 329 330 if (meta.getURL().startsWith("jdbc:columnlist:")) { 331 // this is just to query the result set columns 332 return rs; 333 } 334 335 // flush changes 336 final IndexWriter writer = getIndexWriter(getIndexName(conn), getIndexPath(conn), analyzerName); 337 if (writer.hasUncommittedChanges()) { 338 try { 339 writer.commit(); 340 } catch (IOException cause) { 341 throw convertException(cause); 342 } 343 } 344 345 // search index 346 try { 347 BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder(); 348 String defaultField = fieldForIndex(indexName); 349 Analyzer analyzer = getAnalyzer(analyzerName); 350 QueryParser parser = new QueryParser(defaultField, analyzer); 351 queryBuilder.add(parser.parse(text), BooleanClause.Occur.MUST); 352 353 try (IndexReader reader = DirectoryReader.open(writer.getDirectory())) { 354 IndexSearcher searcher = new IndexSearcher(reader); 355 Collector collector = new ResultSetCollector(rs, reader, type); 356 searcher.search(queryBuilder.build(), collector); 357 } 358 } catch (SQLException | ParseException | IOException e) { 359 throw convertException(e); 360 } 361 return rs; 362 } 363 364 protected static class ResultSetCollector implements Collector, LeafCollector { 365 protected final SimpleResultSet rs; 366 367 protected IndexReader reader; 368 369 protected int type; 370 371 protected int docBase; 372 373 public ResultSetCollector(SimpleResultSet rs, IndexReader reader, int type) { 374 this.rs = rs; 375 this.reader = reader; 376 this.type = type; 377 } 378 379 @Override 380 public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { 381 docBase = context.docBase; 382 return this; 383 } 384 385 @Override 386 public void setScorer(Scorer scorer) { 387 } 388 389 @Override 390 public boolean needsScores() { 391 return false; 392 } 393 394 @Override 395 public void collect(int docID) throws IOException { 396 docID += docBase; 397 Document doc = reader.document(docID, Collections.singleton(FIELD_KEY)); 398 Object key; 399 try { 400 key = asObject(doc.get(FIELD_KEY), type); 401 rs.addRow(key); 402 } catch (SQLException e) { 403 throw new IOException(e); 404 } 405 } 406 } 407 408 private static int getPrimaryKeyType(DatabaseMetaData meta, String schema, String table) throws SQLException { 409 // find primary key name 410 String primaryKeyName = null; 411 try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) { 412 while (rs.next()) { 413 if (primaryKeyName != null) { 414 throw new SQLException("Can only index primary keys on one column for " + schema + '.' + table); 415 } 416 primaryKeyName = rs.getString("COLUMN_NAME"); 417 } 418 if (primaryKeyName == null) { 419 throw new SQLException("No primary key for " + schema + '.' + table); 420 } 421 } 422 // find primary key type 423 try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) { 424 if (!rs.next()) { 425 throw new SQLException("Could not find primary key"); 426 } 427 return rs.getInt("DATA_TYPE"); 428 } 429 } 430 431 private static Analyzer getAnalyzer(String analyzerName) throws SQLException { 432 Analyzer analyzer = analyzers.get(analyzerName); 433 if (analyzer == null) { 434 try { 435 Class<?> klass = Class.forName(analyzerName); 436 Constructor<?> constructor = klass.getConstructor(); 437 analyzer = (Analyzer) constructor.newInstance(); 438 } catch (ReflectiveOperationException e) { 439 throw new SQLException(e.toString()); 440 } 441 analyzers.put(analyzerName, analyzer); 442 } 443 return analyzer; 444 } 445 446 protected static String getIndexName(Connection conn) throws SQLException { 447 String catalog = conn.getCatalog(); 448 if (catalog == null) { 449 catalog = "default"; 450 } 451 return catalog; 452 } 453 454 protected static String getIndexPath(Connection conn) throws SQLException { 455 try (Statement st = conn.createStatement()) { 456 try (ResultSet rs = st.executeQuery("CALL DATABASE_PATH()")) { 457 rs.next(); 458 String path = rs.getString(1); 459 if (path == null) { 460 return null; 461 } 462 return path + ".lucene"; 463 } 464 } 465 466 } 467 468 private static IndexWriter getIndexWriter(String name, String path, String analyzer) throws SQLException { 469 IndexWriter indexWriter = indexWriters.get(name); 470 if (indexWriter != null) { 471 return indexWriter; 472 } 473 synchronized (indexWriters) { 474 indexWriter = indexWriters.get(name); 475 if (indexWriter != null) { 476 return indexWriter; 477 } 478 try { 479 Directory dir = path == null ? new RAMDirectory() : FSDirectory.open(Paths.get(path)); 480 Analyzer an = getAnalyzer(analyzer); 481 IndexWriterConfig iwc = new IndexWriterConfig(an); 482 iwc.setOpenMode(OpenMode.CREATE_OR_APPEND); 483 indexWriter = new IndexWriter(dir, iwc); 484 } catch (LockObtainFailedException e) { 485 throw convertException("Cannot open fulltext index " + path, e); 486 } catch (IOException e) { 487 throw convertException(e); 488 } 489 indexWriters.put(name, indexWriter); 490 return indexWriter; 491 } 492 } 493 494 private static void removeIndexFiles(Connection conn) throws SQLException { 495 String path = getIndexPath(conn); 496 try { 497 IndexWriter index = indexWriters.remove(path); 498 if (index != null) { 499 try { 500 index.close(); 501 } catch (IOException e) { 502 throw convertException(e); 503 } 504 } 505 } finally { 506 FileUtils.deleteRecursive(path, false); 507 } 508 } 509 510 private static SQLException convertException(Exception e) { 511 return convertException("Error while indexing document", e); 512 } 513 514 private static SQLException convertException(String message, Exception e) { 515 SQLException e2 = new SQLException(message); 516 e2.initCause(e); 517 return e2; 518 } 519 520 protected static String asString(Object data, int type) throws SQLException { 521 if (data == null) { 522 return ""; 523 } 524 switch (type) { 525 case Types.BIT: 526 case Types.BOOLEAN: 527 case Types.INTEGER: 528 case Types.BIGINT: 529 case Types.DECIMAL: 530 case Types.DOUBLE: 531 case Types.FLOAT: 532 case Types.NUMERIC: 533 case Types.REAL: 534 case Types.SMALLINT: 535 case Types.TINYINT: 536 case Types.DATE: 537 case Types.TIME: 538 case Types.TIMESTAMP: 539 case Types.LONGVARCHAR: 540 case Types.CHAR: 541 case Types.VARCHAR: 542 return data.toString(); 543 case Types.CLOB: 544 try { 545 if (data instanceof Clob) { 546 data = ((Clob) data).getCharacterStream(); 547 } 548 return IOUtils.readStringAndClose((Reader) data, -1); 549 } catch (IOException e) { 550 throw DbException.convert(e); 551 } 552 case Types.VARBINARY: 553 case Types.LONGVARBINARY: 554 case Types.BINARY: 555 case Types.JAVA_OBJECT: 556 case Types.OTHER: 557 case Types.BLOB: 558 case Types.STRUCT: 559 case Types.REF: 560 case Types.NULL: 561 case Types.ARRAY: 562 case Types.DATALINK: 563 case Types.DISTINCT: 564 throw new SQLException("Unsupported column data type: " + type); 565 default: 566 return ""; 567 } 568 } 569 570 // simple cases only, used for primary key 571 private static Object asObject(String string, int type) throws SQLException { 572 switch (type) { 573 case Types.BIGINT: 574 return Long.valueOf(string); 575 case Types.INTEGER: 576 case Types.SMALLINT: 577 case Types.TINYINT: 578 return Integer.valueOf(string); 579 case Types.LONGVARCHAR: 580 case Types.CHAR: 581 case Types.VARCHAR: 582 return string; 583 default: 584 throw new SQLException("Unsupport data type for primary key: " + type); 585 } 586 } 587 588 /** 589 * Trigger used to update the lucene index upon row change. 590 */ 591 public static class Trigger implements org.h2.api.Trigger { 592 593 private static final Log log = LogFactory.getLog(Trigger.class); 594 595 private String indexName; 596 597 private String indexPath; 598 599 private IndexWriter indexWriter; 600 601 // DEBUG 602 private Exception lastIndexWriterClose; 603 604 // DEBUG 605 private String lastIndexWriterCloseThread; 606 607 /** Starting at 0. */ 608 private int primaryKeyIndex; 609 610 private int primaryKeyType; 611 612 private Map<String, int[]> columnTypes; 613 614 private Map<String, int[]> columnIndices; 615 616 /** 617 * Trigger interface: initialization. 618 */ 619 @Override 620 public void init(Connection conn, String schema, String triggerName, String table, boolean before, int opType) 621 throws SQLException { 622 DatabaseMetaData meta = conn.getMetaData(); 623 624 // find primary key name 625 String primaryKeyName = null; 626 try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) { 627 while (rs.next()) { 628 if (primaryKeyName != null) { 629 throw new SQLException( 630 "Can only index primary keys on one column for: " + schema + '.' + table); 631 } 632 primaryKeyName = rs.getString("COLUMN_NAME"); 633 } 634 if (primaryKeyName == null) { 635 throw new SQLException("No primary key for " + schema + '.' + table); 636 } 637 } 638 639 // find primary key type 640 try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) { 641 if (!rs.next()) { 642 throw new SQLException("No primary key for: " + schema + '.' + table); 643 } 644 primaryKeyType = rs.getInt("DATA_TYPE"); 645 primaryKeyIndex = rs.getInt("ORDINAL_POSITION") - 1; 646 } 647 648 // find all columns info 649 Map<String, Integer> allColumnTypes = new HashMap<>(); 650 Map<String, Integer> allColumnIndices = new HashMap<>(); 651 try (ResultSet rs = meta.getColumns(null, schema, table, null)) { 652 while (rs.next()) { 653 String name = rs.getString("COLUMN_NAME"); 654 int type = rs.getInt("DATA_TYPE"); 655 int index = rs.getInt("ORDINAL_POSITION") - 1; 656 allColumnTypes.put(name, Integer.valueOf(type)); 657 allColumnIndices.put(name, Integer.valueOf(index)); 658 } 659 } 660 661 // find columns configured for indexing 662 try (PreparedStatement ps = conn.prepareStatement( 663 "SELECT NAME, COLUMNS, ANALYZER FROM " + FT_TABLE + " WHERE SCHEMA = ? AND TABLE = ?")) { 664 ps.setString(1, schema); 665 ps.setString(2, table); 666 try (ResultSet rs = ps.executeQuery()) { 667 columnTypes = new HashMap<>(); 668 columnIndices = new HashMap<>(); 669 while (rs.next()) { 670 String index = rs.getString(1); 671 String columns = rs.getString(2); 672 String analyzerName = rs.getString(3); 673 List<String> columnNames = Arrays.asList(columns.split(",")); 674 675 // find the columns' indices and types 676 int[] types = new int[columnNames.size()]; 677 int[] indices = new int[columnNames.size()]; 678 int i = 0; 679 for (String columnName : columnNames) { 680 types[i] = allColumnTypes.get(columnName).intValue(); 681 indices[i] = allColumnIndices.get(columnName).intValue(); 682 i++; 683 } 684 columnTypes.put(index, types); 685 columnIndices.put(index, indices); 686 // only one call actually needed for this: 687 indexName = getIndexName(conn); 688 indexPath = getIndexPath(conn); 689 indexWriter = getIndexWriter(indexName, indexPath, analyzerName); 690 } 691 692 } 693 } 694 } 695 696 /** 697 * Trigger interface. 698 */ 699 @Override 700 public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException { 701 if (indexWriter == null) { 702 throw new SQLException("Fulltext index was not initialized"); 703 } 704 if (oldRow != null) { 705 delete(oldRow); 706 } 707 if (newRow != null) { 708 insert(newRow); 709 } 710 } 711 712 private void insert(Object[] row) throws SQLException { 713 Document doc = new Document(); 714 String key = asString(row[primaryKeyIndex], primaryKeyType); 715 // StringField is not tokenized 716 StringField keyField = new StringField(FIELD_KEY, key, Field.Store.YES); 717 doc.add(keyField); 718 719 // add fulltext for all indexes 720 for (String indexName : columnTypes.keySet()) { 721 int[] types = columnTypes.get(indexName); 722 int[] indices = columnIndices.get(indexName); 723 StringBuilder buf = new StringBuilder(); 724 for (int i = 0; i < types.length; i++) { 725 String data = asString(row[indices[i]], types[i]); 726 if (i > 0) { 727 buf.append(' '); 728 } 729 buf.append(data); 730 } 731 // TextField is tokenized 732 TextField textField = new TextField(fieldForIndex(indexName), buf.toString(), Field.Store.NO); 733 doc.add(textField); 734 } 735 736 try { 737 indexWriter.addDocument(doc); 738 } catch (IOException e) { 739 throw convertException(e); 740 } catch (org.apache.lucene.store.AlreadyClosedException e) { 741 // DEBUG 742 log.error("org.apache.lucene.store.AlreadyClosedException in thread " + Thread.currentThread().getName() 743 + ", last close was in thread " + lastIndexWriterCloseThread, lastIndexWriterClose); 744 throw e; 745 } 746 } 747 748 private void delete(Object[] row) throws SQLException { 749 String primaryKey = asString(row[primaryKeyIndex], primaryKeyType); 750 try { 751 indexWriter.deleteDocuments(new Term(FIELD_KEY, primaryKey)); 752 } catch (IOException e) { 753 throw convertException(e); 754 } 755 } 756 757 @Override 758 public void close() throws SQLException { 759 if (indexWriter != null) { 760 try { 761 // DEBUG 762 lastIndexWriterClose = new RuntimeException("debug stack trace"); 763 lastIndexWriterCloseThread = Thread.currentThread().getName(); 764 indexWriter.close(); 765 indexWriter = null; 766 } catch (IOException e) { 767 throw convertException(e); 768 } finally { 769 indexWriters.remove(indexName); 770 } 771 } 772 } 773 774 @Override 775 public void remove() { 776 // ignore 777 } 778 } 779 780}