001/* 002 * Copyright (c) 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the Eclipse Public License v1.0 006 * which accompanies this distribution, and is available at 007 * http://www.eclipse.org/legal/epl-v10.html 008 * 009 * Contributors: 010 * H2 Group 011 * Florent Guillaume 012 */ 013package org.nuxeo.ecm.core.storage.sql.db; 014 015import java.io.File; 016import java.io.IOException; 017import java.io.Reader; 018import java.lang.reflect.Constructor; 019import java.sql.Clob; 020import java.sql.Connection; 021import java.sql.DatabaseMetaData; 022import java.sql.PreparedStatement; 023import java.sql.ResultSet; 024import java.sql.SQLException; 025import java.sql.Statement; 026import java.sql.Types; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.ConcurrentHashMap; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.apache.lucene.analysis.Analyzer; 039import org.apache.lucene.document.Document; 040import org.apache.lucene.document.Field; 041import org.apache.lucene.document.StringField; 042import org.apache.lucene.document.TextField; 043import org.apache.lucene.index.AtomicReaderContext; 044import org.apache.lucene.index.DirectoryReader; 045import org.apache.lucene.index.IndexReader; 046import org.apache.lucene.index.IndexWriter; 047import org.apache.lucene.index.IndexWriterConfig; 048import org.apache.lucene.index.IndexWriterConfig.OpenMode; 049import org.apache.lucene.index.Term; 050import org.apache.lucene.queryparser.classic.ParseException; 051import org.apache.lucene.queryparser.classic.QueryParser; 052import org.apache.lucene.search.BooleanClause; 053import org.apache.lucene.search.BooleanQuery; 054import org.apache.lucene.search.Collector; 055import org.apache.lucene.search.IndexSearcher; 056import org.apache.lucene.search.Scorer; 057import org.apache.lucene.store.Directory; 058import org.apache.lucene.store.FSDirectory; 059import org.apache.lucene.store.LockObtainFailedException; 060import org.apache.lucene.store.RAMDirectory; 061import org.apache.lucene.util.Version; 062import org.h2.message.DbException; 063import org.h2.store.fs.FileUtils; 064import org.h2.tools.SimpleResultSet; 065import org.h2.util.IOUtils; 066import org.h2.util.StringUtils; 067 068/** 069 * An optimized Lucene-based fulltext indexing trigger and search. 070 */ 071public class H2Fulltext { 072 073 private static final Version LUCENE_VERSION = Version.LUCENE_4_10_4; 074 075 private static final Map<String, Analyzer> analyzers = new ConcurrentHashMap<String, Analyzer>(); 076 077 private static final Map<String, IndexWriter> indexWriters = new ConcurrentHashMap<String, IndexWriter>(); 078 079 private static final String FT_SCHEMA = "NXFT"; 080 081 private static final String FT_TABLE = FT_SCHEMA + ".INDEXES"; 082 083 private static final String PREFIX = "NXFT_"; 084 085 private static final String FIELD_KEY = "KEY"; 086 087 private static final String FIELD_TEXT = "TEXT"; 088 089 private static final String DEFAULT_INDEX_NAME = "PUBLIC_FULLTEXT_default"; 090 091 private static final String COL_KEY = "KEY"; 092 093 // Utility class. 094 private H2Fulltext() { 095 } 096 097 /** 098 * Initializes fulltext search functionality for this database. This adds the following Java functions to the 099 * database: 100 * <ul> 101 * <li>NXFT_CREATE_INDEX(nameString, schemaString, tableString, columnListString, analyzerString)</li> 102 * <li>NXFT_REINDEX()</li> 103 * <li>NXFT_DROP_ALL()</li> 104 * <li>NXFT_SEARCH(queryString, limitInt, offsetInt): result set</li> 105 * </ul> 106 * It also adds a schema NXFT to the database where bookkeeping information is stored. This function may be called 107 * from a Java application, or by using the SQL statements: 108 * 109 * <pre> 110 * CREATE ALIAS IF NOT EXISTS NXFT_INIT FOR 111 * "org.nuxeo.ecm.core.storage.sql.db.H2Fulltext.init"; 112 * CALL NXFT_INIT(); 113 * </pre> 114 * 115 * @param conn 116 */ 117 public static void init(Connection conn) throws SQLException { 118 try (Statement st = conn.createStatement()) { 119 st.execute("CREATE SCHEMA IF NOT EXISTS " + FT_SCHEMA); 120 st.execute("CREATE TABLE IF NOT EXISTS " + FT_TABLE 121 + "(NAME VARCHAR, SCHEMA VARCHAR, TABLE VARCHAR, COLUMNS VARCHAR, " 122 + "ANALYZER VARCHAR, PRIMARY KEY(NAME))"); 123 // BBB migrate old table without the "NAME" column 124 try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " + "TABLE_SCHEMA = '" 125 + FT_SCHEMA + "' AND TABLE_NAME = 'INDEXES' AND COLUMN_NAME = 'NAME'")) { 126 if (!rs.next()) { 127 // BBB no NAME column, alter table to create it 128 st.execute("ALTER TABLE " + FT_TABLE + " ADD COLUMN NAME VARCHAR"); 129 st.execute("UPDATE " + FT_TABLE + " SET NAME = '" + DEFAULT_INDEX_NAME + "'"); 130 } 131 } 132 133 String className = H2Fulltext.class.getName(); 134 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "CREATE_INDEX FOR \"" + className + ".createIndex\""); 135 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "REINDEX FOR \"" + className + ".reindex\""); 136 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "DROP_ALL FOR \"" + className + ".dropAll\""); 137 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "SEARCH FOR \"" + className + ".search\""); 138 } 139 } 140 141 // ----- static methods called directly to initialize fulltext ----- 142 143 /** 144 * Creates a fulltext index for a table and column list. 145 * <p> 146 * A table may have any number of indexes at a time, but the index name must be unique. If the index already exists, 147 * nothing is done, otherwise the index is created and populated from existing data. 148 * <p> 149 * Usually called through: 150 * 151 * <pre> 152 * CALL NXFT_CREATE_INDEX('indexname', 'myschema', 'mytable', ('col1', 'col2'), 'lucene.analyzer'); 153 * </pre> 154 * 155 * @param conn the connection 156 * @param indexName the index name 157 * @param schema the schema name of the table 158 * @param table the table name 159 * @param columns the column list 160 * @param analyzer the Lucene fulltext analyzer class 161 */ 162 public static void createIndex(Connection conn, String indexName, String schema, String table, String columns, 163 String analyzer) throws SQLException { 164 if (indexName == null) { 165 indexName = DEFAULT_INDEX_NAME; 166 } 167 columns = columns.replace("(", "").replace(")", "").replace(" ", ""); 168 try (PreparedStatement ps = conn.prepareStatement("DELETE FROM " + FT_TABLE + " WHERE NAME = ?")) { 169 ps.setString(1, indexName); 170 ps.execute(); 171 } 172 try (PreparedStatement ps = conn.prepareStatement("INSERT INTO " + FT_TABLE 173 + "(NAME, SCHEMA, TABLE, COLUMNS, ANALYZER) VALUES(?, ?, ?, ?, ?)")) { 174 ps.setString(1, indexName); 175 ps.setString(2, schema); 176 ps.setString(3, table); 177 ps.setString(4, columns); 178 ps.setString(5, analyzer); 179 ps.execute(); 180 } 181 createTrigger(conn, schema, table); 182 } 183 184 /** 185 * Re-creates the fulltext index for this database. 186 */ 187 public static void reindex(Connection conn) throws SQLException { 188 removeAllTriggers(conn); 189 removeIndexFiles(conn); 190 try (Statement st = conn.createStatement()) { 191 try (ResultSet rs = st.executeQuery("SELECT * FROM " + FT_TABLE)) { 192 Set<String> done = new HashSet<String>(); 193 while (rs.next()) { 194 String schema = rs.getString("SCHEMA"); 195 String table = rs.getString("TABLE"); 196 String key = schema + '.' + table; 197 if (!done.add(key)) { 198 continue; 199 } 200 createTrigger(conn, schema, table); 201 indexExistingRows(conn, schema, table); 202 } 203 } 204 } 205 } 206 207 private static void indexExistingRows(Connection conn, String schema, String table) throws SQLException { 208 Trigger trigger = new Trigger(); 209 trigger.init(conn, schema, null, table, false, org.h2.api.Trigger.INSERT); 210 try (Statement st = conn.createStatement()) { 211 try (ResultSet rs = st.executeQuery("SELECT * FROM " + StringUtils.quoteIdentifier(schema) + '.' 212 + StringUtils.quoteIdentifier(table))) { 213 int n = rs.getMetaData().getColumnCount(); 214 while (rs.next()) { 215 Object[] row = new Object[n]; 216 for (int i = 0; i < n; i++) { 217 row[i] = rs.getObject(i + 1); 218 } 219 trigger.fire(conn, null, row); 220 } 221 } 222 } 223 } 224 225 /** 226 * Creates a trigger for the indexes on a table. 227 * <p> 228 * Usually called through: 229 * 230 * <pre> 231 * CALL NXFT_CREATE_TRIGGERS('myschema', 'mytable'); 232 * </pre> 233 * 234 * @param conn the connection 235 * @param schema the schema name of the table 236 * @param table the table name 237 */ 238 private static void createTrigger(Connection conn, String schema, String table) throws SQLException { 239 try (Statement st = conn.createStatement()) { 240 schema = StringUtils.quoteIdentifier(schema); 241 String trigger = schema + '.' + StringUtils.quoteIdentifier(PREFIX + table); 242 st.execute("DROP TRIGGER IF EXISTS " + trigger); 243 st.execute(String.format("CREATE TRIGGER %s " + "AFTER INSERT, UPDATE, DELETE ON %s.%s " 244 + "FOR EACH ROW CALL \"%s\"", trigger, schema, StringUtils.quoteIdentifier(table), 245 H2Fulltext.Trigger.class.getName())); 246 } 247 } 248 249 private static void removeAllTriggers(Connection conn) throws SQLException { 250 try (Statement st = conn.createStatement()) { 251 try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.TRIGGERS")) { 252 try (Statement st2 = conn.createStatement()) { 253 while (rs.next()) { 254 String trigger = rs.getString("TRIGGER_NAME"); 255 if (trigger.startsWith(PREFIX)) { 256 st2.execute("DROP TRIGGER " + StringUtils.quoteIdentifier(rs.getString("TRIGGER_SCHEMA")) 257 + "." + trigger); 258 } 259 } 260 } 261 } 262 } 263 } 264 265 /** 266 * Drops all fulltext indexes from the database. 267 */ 268 public static void dropAll(Connection conn) throws SQLException { 269 try (Statement st = conn.createStatement()) { 270 st.execute("DROP SCHEMA IF EXISTS " + FT_SCHEMA); 271 } 272 removeAllTriggers(conn); 273 removeIndexFiles(conn); 274 } 275 276 private static String fieldForIndex(String indexName) { 277 if (DEFAULT_INDEX_NAME.equals(indexName)) { 278 return FIELD_TEXT; 279 } else { 280 return FIELD_TEXT + '_' + indexName; 281 } 282 } 283 284 /** 285 * Searches from the given full text index. The returned result set has a single ID column which holds the keys for 286 * the matching rows. 287 * <p> 288 * Usually called through: 289 * 290 * <pre> 291 * SELECT * FROM NXFT_SEARCH(name, 'text'); 292 * </pre> 293 * 294 * @param conn the connection 295 * @param indexName the index name 296 * @param text the search query 297 * @return the result set 298 */ 299 public static ResultSet search(Connection conn, String indexName, String text) throws SQLException { 300 DatabaseMetaData meta = conn.getMetaData(); 301 if (indexName == null) { 302 indexName = DEFAULT_INDEX_NAME; 303 } 304 305 String schema; 306 String table; 307 String analyzerName; 308 309 // find schema, table and analyzer 310 try (PreparedStatement ps = conn.prepareStatement("SELECT SCHEMA, TABLE, ANALYZER FROM " + FT_TABLE 311 + " WHERE NAME = ?")) { 312 ps.setString(1, indexName); 313 try (ResultSet res = ps.executeQuery()) { 314 if (!res.next()) { 315 throw new SQLException("No such index: " + indexName); 316 } 317 schema = res.getString(1); 318 table = res.getString(2); 319 analyzerName = res.getString(3); 320 } 321 } 322 323 int type = getPrimaryKeyType(meta, schema, table); 324 SimpleResultSet rs = new SimpleResultSet(); 325 rs.addColumn(COL_KEY, type, 0, 0); 326 327 if (meta.getURL().startsWith("jdbc:columnlist:")) { 328 // this is just to query the result set columns 329 return rs; 330 } 331 332 // flush changes 333 final IndexWriter writer = getIndexWriter(getIndexName(conn), getIndexPath(conn), analyzerName); 334 if (writer.hasUncommittedChanges()) { 335 try { 336 writer.commit(); 337 } catch (IOException cause) { 338 throw convertException(cause); 339 } 340 } 341 342 // search index 343 try { 344 BooleanQuery query = new BooleanQuery(); 345 String defaultField = fieldForIndex(indexName); 346 Analyzer analyzer = getAnalyzer(analyzerName); 347 QueryParser parser = new QueryParser(LUCENE_VERSION, defaultField, analyzer); 348 query.add(parser.parse(text), BooleanClause.Occur.MUST); 349 350 try (IndexReader reader = DirectoryReader.open(writer.getDirectory())) { 351 IndexSearcher searcher = new IndexSearcher(reader); 352 Collector collector = new ResultSetCollector(rs, reader, type); 353 searcher.search(query, collector); 354 } 355 } catch (SQLException | ParseException | IOException e) { 356 throw convertException(e); 357 } 358 return rs; 359 } 360 361 protected static class ResultSetCollector extends Collector { 362 protected final SimpleResultSet rs; 363 364 protected IndexReader reader; 365 366 protected int type; 367 368 protected int docBase; 369 370 public ResultSetCollector(SimpleResultSet rs, IndexReader reader, int type) { 371 this.rs = rs; 372 this.reader = reader; 373 this.type = type; 374 } 375 376 @Override 377 public void setNextReader(AtomicReaderContext context) { 378 docBase = context.docBase; 379 } 380 381 @Override 382 public void setScorer(Scorer scorer) { 383 } 384 385 @Override 386 public boolean acceptsDocsOutOfOrder() { 387 return true; 388 } 389 390 @Override 391 public void collect(int docID) throws IOException { 392 docID += docBase; 393 Document doc = reader.document(docID, Collections.singleton(FIELD_KEY)); 394 Object key; 395 try { 396 key = asObject(doc.get(FIELD_KEY), type); 397 rs.addRow(new Object[] { key }); 398 } catch (SQLException e) { 399 throw new IOException(e); 400 } 401 } 402 } 403 404 private static int getPrimaryKeyType(DatabaseMetaData meta, String schema, String table) throws SQLException { 405 // find primary key name 406 String primaryKeyName = null; 407 try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) { 408 while (rs.next()) { 409 if (primaryKeyName != null) { 410 throw new SQLException("Can only index primary keys on one column for " + schema + '.' + table); 411 } 412 primaryKeyName = rs.getString("COLUMN_NAME"); 413 } 414 if (primaryKeyName == null) { 415 throw new SQLException("No primary key for " + schema + '.' + table); 416 } 417 } 418 // find primary key type 419 try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) { 420 if (!rs.next()) { 421 throw new SQLException("Could not find primary key"); 422 } 423 return rs.getInt("DATA_TYPE"); 424 } 425 } 426 427 private static Analyzer getAnalyzer(String analyzerName) throws SQLException { 428 Analyzer analyzer = analyzers.get(analyzerName); 429 if (analyzer == null) { 430 try { 431 Class<?> klass = Class.forName(analyzerName); 432 Constructor<?> constructor = klass.getConstructor(Version.class); 433 analyzer = (Analyzer) constructor.newInstance(LUCENE_VERSION); 434 } catch (ReflectiveOperationException e) { 435 throw new SQLException(e.toString()); 436 } 437 analyzers.put(analyzerName, analyzer); 438 } 439 return analyzer; 440 } 441 442 protected static String getIndexName(Connection conn) throws SQLException { 443 String catalog = conn.getCatalog(); 444 if (catalog == null) { 445 catalog = "default"; 446 } 447 return catalog; 448 } 449 450 protected static String getIndexPath(Connection conn) throws SQLException { 451 try (Statement st = conn.createStatement()) { 452 try (ResultSet rs = st.executeQuery("CALL DATABASE_PATH()")) { 453 rs.next(); 454 String path = rs.getString(1); 455 if (path == null) { 456 return null; 457 } 458 return path + ".lucene"; 459 } 460 } 461 462 } 463 464 private static IndexWriter getIndexWriter(String name, String path, String analyzer) throws SQLException { 465 IndexWriter indexWriter = indexWriters.get(name); 466 if (indexWriter != null) { 467 return indexWriter; 468 } 469 synchronized (indexWriters) { 470 indexWriter = indexWriters.get(name); 471 if (indexWriter != null) { 472 return indexWriter; 473 } 474 try { 475 Directory dir = path == null ? new RAMDirectory() : FSDirectory.open(new File(path)); 476 Analyzer an = getAnalyzer(analyzer); 477 IndexWriterConfig iwc = new IndexWriterConfig(LUCENE_VERSION, an); 478 iwc.setOpenMode(OpenMode.CREATE_OR_APPEND); 479 indexWriter = new IndexWriter(dir, iwc); 480 } catch (LockObtainFailedException e) { 481 throw convertException("Cannot open fulltext index " + path, e); 482 } catch (IOException e) { 483 throw convertException(e); 484 } 485 indexWriters.put(name, indexWriter); 486 return indexWriter; 487 } 488 } 489 490 private static void removeIndexFiles(Connection conn) throws SQLException { 491 String path = getIndexPath(conn); 492 try { 493 IndexWriter index = indexWriters.remove(path); 494 if (index != null) { 495 try { 496 index.close(); 497 } catch (IOException e) { 498 throw convertException(e); 499 } 500 } 501 } finally { 502 FileUtils.deleteRecursive(path, false); 503 } 504 } 505 506 private static SQLException convertException(Exception e) { 507 return convertException("Error while indexing document", e); 508 } 509 510 private static SQLException convertException(String message, Exception e) { 511 SQLException e2 = new SQLException(message); 512 e2.initCause(e); 513 return e2; 514 } 515 516 protected static String asString(Object data, int type) throws SQLException { 517 if (data == null) { 518 return ""; 519 } 520 switch (type) { 521 case Types.BIT: 522 case Types.BOOLEAN: 523 case Types.INTEGER: 524 case Types.BIGINT: 525 case Types.DECIMAL: 526 case Types.DOUBLE: 527 case Types.FLOAT: 528 case Types.NUMERIC: 529 case Types.REAL: 530 case Types.SMALLINT: 531 case Types.TINYINT: 532 case Types.DATE: 533 case Types.TIME: 534 case Types.TIMESTAMP: 535 case Types.LONGVARCHAR: 536 case Types.CHAR: 537 case Types.VARCHAR: 538 return data.toString(); 539 case Types.CLOB: 540 try { 541 if (data instanceof Clob) { 542 data = ((Clob) data).getCharacterStream(); 543 } 544 return IOUtils.readStringAndClose((Reader) data, -1); 545 } catch (IOException e) { 546 throw DbException.convert(e); 547 } 548 case Types.VARBINARY: 549 case Types.LONGVARBINARY: 550 case Types.BINARY: 551 case Types.JAVA_OBJECT: 552 case Types.OTHER: 553 case Types.BLOB: 554 case Types.STRUCT: 555 case Types.REF: 556 case Types.NULL: 557 case Types.ARRAY: 558 case Types.DATALINK: 559 case Types.DISTINCT: 560 throw new SQLException("Unsupported column data type: " + type); 561 default: 562 return ""; 563 } 564 } 565 566 // simple cases only, used for primary key 567 private static Object asObject(String string, int type) throws SQLException { 568 switch (type) { 569 case Types.BIGINT: 570 return Long.valueOf(string); 571 case Types.INTEGER: 572 case Types.SMALLINT: 573 case Types.TINYINT: 574 return Integer.valueOf(string); 575 case Types.LONGVARCHAR: 576 case Types.CHAR: 577 case Types.VARCHAR: 578 return string; 579 default: 580 throw new SQLException("Unsupport data type for primary key: " + type); 581 } 582 } 583 584 /** 585 * Trigger used to update the lucene index upon row change. 586 */ 587 public static class Trigger implements org.h2.api.Trigger { 588 589 private static final Log log = LogFactory.getLog(Trigger.class); 590 591 private String indexName; 592 593 private String indexPath; 594 595 private IndexWriter indexWriter; 596 597 // DEBUG 598 private Exception lastIndexWriterClose; 599 600 // DEBUG 601 private String lastIndexWriterCloseThread; 602 603 /** Starting at 0. */ 604 private int primaryKeyIndex; 605 606 private int primaryKeyType; 607 608 private Map<String, int[]> columnTypes; 609 610 private Map<String, int[]> columnIndices; 611 612 /** 613 * Trigger interface: initialization. 614 */ 615 @Override 616 public void init(Connection conn, String schema, String triggerName, String table, boolean before, int opType) 617 throws SQLException { 618 DatabaseMetaData meta = conn.getMetaData(); 619 620 // find primary key name 621 String primaryKeyName = null; 622 try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) { 623 while (rs.next()) { 624 if (primaryKeyName != null) { 625 throw new SQLException("Can only index primary keys on one column for: " + schema + '.' + table); 626 } 627 primaryKeyName = rs.getString("COLUMN_NAME"); 628 } 629 if (primaryKeyName == null) { 630 throw new SQLException("No primary key for " + schema + '.' + table); 631 } 632 } 633 634 // find primary key type 635 try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) { 636 if (!rs.next()) { 637 throw new SQLException("No primary key for: " + schema + '.' + table); 638 } 639 primaryKeyType = rs.getInt("DATA_TYPE"); 640 primaryKeyIndex = rs.getInt("ORDINAL_POSITION") - 1; 641 } 642 643 // find all columns info 644 Map<String, Integer> allColumnTypes = new HashMap<String, Integer>(); 645 Map<String, Integer> allColumnIndices = new HashMap<String, Integer>(); 646 try (ResultSet rs = meta.getColumns(null, schema, table, null)) { 647 while (rs.next()) { 648 String name = rs.getString("COLUMN_NAME"); 649 int type = rs.getInt("DATA_TYPE"); 650 int index = rs.getInt("ORDINAL_POSITION") - 1; 651 allColumnTypes.put(name, Integer.valueOf(type)); 652 allColumnIndices.put(name, Integer.valueOf(index)); 653 } 654 } 655 656 // find columns configured for indexing 657 try (PreparedStatement ps = conn.prepareStatement("SELECT NAME, COLUMNS, ANALYZER FROM " + FT_TABLE 658 + " WHERE SCHEMA = ? AND TABLE = ?")) { 659 ps.setString(1, schema); 660 ps.setString(2, table); 661 try (ResultSet rs = ps.executeQuery()) { 662 columnTypes = new HashMap<String, int[]>(); 663 columnIndices = new HashMap<String, int[]>(); 664 while (rs.next()) { 665 String index = rs.getString(1); 666 String columns = rs.getString(2); 667 String analyzerName = rs.getString(3); 668 List<String> columnNames = Arrays.asList(columns.split(",")); 669 670 // find the columns' indices and types 671 int[] types = new int[columnNames.size()]; 672 int[] indices = new int[columnNames.size()]; 673 int i = 0; 674 for (String columnName : columnNames) { 675 types[i] = allColumnTypes.get(columnName).intValue(); 676 indices[i] = allColumnIndices.get(columnName).intValue(); 677 i++; 678 } 679 columnTypes.put(index, types); 680 columnIndices.put(index, indices); 681 // only one call actually needed for this: 682 indexName = getIndexName(conn); 683 indexPath = getIndexPath(conn); 684 indexWriter = getIndexWriter(indexName, indexPath, analyzerName); 685 } 686 687 } 688 } 689 } 690 691 /** 692 * Trigger interface. 693 */ 694 @Override 695 public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException { 696 if (indexWriter == null) { 697 throw new SQLException("Fulltext index was not initialized"); 698 } 699 if (oldRow != null) { 700 delete(oldRow); 701 } 702 if (newRow != null) { 703 insert(newRow); 704 } 705 } 706 707 private void insert(Object[] row) throws SQLException { 708 Document doc = new Document(); 709 String key = asString(row[primaryKeyIndex], primaryKeyType); 710 // StringField is not tokenized 711 StringField keyField = new StringField(FIELD_KEY, key, Field.Store.YES); 712 doc.add(keyField); 713 714 // add fulltext for all indexes 715 for (String indexName : columnTypes.keySet()) { 716 int[] types = columnTypes.get(indexName); 717 int[] indices = columnIndices.get(indexName); 718 StringBuilder buf = new StringBuilder(); 719 for (int i = 0; i < types.length; i++) { 720 String data = asString(row[indices[i]], types[i]); 721 if (i > 0) { 722 buf.append(' '); 723 } 724 buf.append(data); 725 } 726 // TextField is tokenized 727 TextField textField = new TextField(fieldForIndex(indexName), buf.toString(), Field.Store.NO); 728 doc.add(textField); 729 } 730 731 try { 732 indexWriter.addDocument(doc); 733 } catch (IOException e) { 734 throw convertException(e); 735 } catch (org.apache.lucene.store.AlreadyClosedException e) { 736 // DEBUG 737 log.error( 738 "org.apache.lucene.store.AlreadyClosedException in thread " + Thread.currentThread().getName() 739 + ", last close was in thread " + lastIndexWriterCloseThread, lastIndexWriterClose); 740 throw e; 741 } 742 } 743 744 private void delete(Object[] row) throws SQLException { 745 String primaryKey = asString(row[primaryKeyIndex], primaryKeyType); 746 try { 747 indexWriter.deleteDocuments(new Term(FIELD_KEY, primaryKey)); 748 } catch (IOException e) { 749 throw convertException(e); 750 } 751 } 752 753 @Override 754 public void close() throws SQLException { 755 if (indexWriter != null) { 756 try { 757 // DEBUG 758 lastIndexWriterClose = new RuntimeException("debug stack trace"); 759 lastIndexWriterCloseThread = Thread.currentThread().getName(); 760 indexWriter.close(); 761 indexWriter = null; 762 } catch (IOException e) { 763 throw convertException(e); 764 } finally { 765 indexWriters.remove(indexName); 766 } 767 } 768 } 769 770 @Override 771 public void remove() { 772 // ignore 773 } 774 } 775 776}