001/* 002 * (C) Copyright 2006-2014 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * H2 Group 018 * Florent Guillaume 019 */ 020package org.nuxeo.ecm.core.storage.sql.db; 021 022import java.io.File; 023import java.io.IOException; 024import java.io.Reader; 025import java.lang.reflect.Constructor; 026import java.sql.Clob; 027import java.sql.Connection; 028import java.sql.DatabaseMetaData; 029import java.sql.PreparedStatement; 030import java.sql.ResultSet; 031import java.sql.SQLException; 032import java.sql.Statement; 033import java.sql.Types; 034import java.util.Arrays; 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Set; 041import java.util.concurrent.ConcurrentHashMap; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.lucene.analysis.Analyzer; 046import org.apache.lucene.document.Document; 047import org.apache.lucene.document.Field; 048import org.apache.lucene.document.StringField; 049import org.apache.lucene.document.TextField; 050import org.apache.lucene.index.AtomicReaderContext; 051import org.apache.lucene.index.DirectoryReader; 052import org.apache.lucene.index.IndexReader; 053import org.apache.lucene.index.IndexWriter; 054import org.apache.lucene.index.IndexWriterConfig; 055import org.apache.lucene.index.IndexWriterConfig.OpenMode; 056import org.apache.lucene.index.Term; 057import org.apache.lucene.queryparser.classic.ParseException; 058import org.apache.lucene.queryparser.classic.QueryParser; 059import org.apache.lucene.search.BooleanClause; 060import org.apache.lucene.search.BooleanQuery; 061import org.apache.lucene.search.Collector; 062import org.apache.lucene.search.IndexSearcher; 063import org.apache.lucene.search.Scorer; 064import org.apache.lucene.store.Directory; 065import org.apache.lucene.store.FSDirectory; 066import org.apache.lucene.store.LockObtainFailedException; 067import org.apache.lucene.store.RAMDirectory; 068import org.apache.lucene.util.Version; 069import org.h2.message.DbException; 070import org.h2.store.fs.FileUtils; 071import org.h2.tools.SimpleResultSet; 072import org.h2.util.IOUtils; 073import org.h2.util.StringUtils; 074 075/** 076 * An optimized Lucene-based fulltext indexing trigger and search. 077 */ 078public class H2Fulltext { 079 080 private static final Version LUCENE_VERSION = Version.LUCENE_4_10_4; 081 082 private static final Map<String, Analyzer> analyzers = new ConcurrentHashMap<String, Analyzer>(); 083 084 private static final Map<String, IndexWriter> indexWriters = new ConcurrentHashMap<String, IndexWriter>(); 085 086 private static final String FT_SCHEMA = "NXFT"; 087 088 private static final String FT_TABLE = FT_SCHEMA + ".INDEXES"; 089 090 private static final String PREFIX = "NXFT_"; 091 092 private static final String FIELD_KEY = "KEY"; 093 094 private static final String FIELD_TEXT = "TEXT"; 095 096 private static final String DEFAULT_INDEX_NAME = "PUBLIC_FULLTEXT_default"; 097 098 private static final String COL_KEY = "KEY"; 099 100 // Utility class. 101 private H2Fulltext() { 102 } 103 104 /** 105 * Initializes fulltext search functionality for this database. This adds the following Java functions to the 106 * database: 107 * <ul> 108 * <li>NXFT_CREATE_INDEX(nameString, schemaString, tableString, columnListString, analyzerString)</li> 109 * <li>NXFT_REINDEX()</li> 110 * <li>NXFT_DROP_ALL()</li> 111 * <li>NXFT_SEARCH(queryString, limitInt, offsetInt): result set</li> 112 * </ul> 113 * It also adds a schema NXFT to the database where bookkeeping information is stored. This function may be called 114 * from a Java application, or by using the SQL statements: 115 * 116 * <pre> 117 * CREATE ALIAS IF NOT EXISTS NXFT_INIT FOR 118 * "org.nuxeo.ecm.core.storage.sql.db.H2Fulltext.init"; 119 * CALL NXFT_INIT(); 120 * </pre> 121 * 122 * @param conn 123 */ 124 public static void init(Connection conn) throws SQLException { 125 try (Statement st = conn.createStatement()) { 126 st.execute("CREATE SCHEMA IF NOT EXISTS " + FT_SCHEMA); 127 st.execute("CREATE TABLE IF NOT EXISTS " + FT_TABLE 128 + "(NAME VARCHAR, SCHEMA VARCHAR, TABLE VARCHAR, COLUMNS VARCHAR, " 129 + "ANALYZER VARCHAR, PRIMARY KEY(NAME))"); 130 // BBB migrate old table without the "NAME" column 131 try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE " + "TABLE_SCHEMA = '" 132 + FT_SCHEMA + "' AND TABLE_NAME = 'INDEXES' AND COLUMN_NAME = 'NAME'")) { 133 if (!rs.next()) { 134 // BBB no NAME column, alter table to create it 135 st.execute("ALTER TABLE " + FT_TABLE + " ADD COLUMN NAME VARCHAR"); 136 st.execute("UPDATE " + FT_TABLE + " SET NAME = '" + DEFAULT_INDEX_NAME + "'"); 137 } 138 } 139 140 String className = H2Fulltext.class.getName(); 141 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "CREATE_INDEX FOR \"" + className + ".createIndex\""); 142 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "REINDEX FOR \"" + className + ".reindex\""); 143 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "DROP_ALL FOR \"" + className + ".dropAll\""); 144 st.execute("CREATE ALIAS IF NOT EXISTS " + PREFIX + "SEARCH FOR \"" + className + ".search\""); 145 } 146 } 147 148 // ----- static methods called directly to initialize fulltext ----- 149 150 /** 151 * Creates a fulltext index for a table and column list. 152 * <p> 153 * A table may have any number of indexes at a time, but the index name must be unique. If the index already exists, 154 * nothing is done, otherwise the index is created and populated from existing data. 155 * <p> 156 * Usually called through: 157 * 158 * <pre> 159 * CALL NXFT_CREATE_INDEX('indexname', 'myschema', 'mytable', ('col1', 'col2'), 'lucene.analyzer'); 160 * </pre> 161 * 162 * @param conn the connection 163 * @param indexName the index name 164 * @param schema the schema name of the table 165 * @param table the table name 166 * @param columns the column list 167 * @param analyzer the Lucene fulltext analyzer class 168 */ 169 public static void createIndex(Connection conn, String indexName, String schema, String table, String columns, 170 String analyzer) throws SQLException { 171 if (indexName == null) { 172 indexName = DEFAULT_INDEX_NAME; 173 } 174 columns = columns.replace("(", "").replace(")", "").replace(" ", ""); 175 try (PreparedStatement ps = conn.prepareStatement("DELETE FROM " + FT_TABLE + " WHERE NAME = ?")) { 176 ps.setString(1, indexName); 177 ps.execute(); 178 } 179 try (PreparedStatement ps = conn.prepareStatement("INSERT INTO " + FT_TABLE 180 + "(NAME, SCHEMA, TABLE, COLUMNS, ANALYZER) VALUES(?, ?, ?, ?, ?)")) { 181 ps.setString(1, indexName); 182 ps.setString(2, schema); 183 ps.setString(3, table); 184 ps.setString(4, columns); 185 ps.setString(5, analyzer); 186 ps.execute(); 187 } 188 createTrigger(conn, schema, table); 189 } 190 191 /** 192 * Re-creates the fulltext index for this database. 193 */ 194 public static void reindex(Connection conn) throws SQLException { 195 removeAllTriggers(conn); 196 removeIndexFiles(conn); 197 try (Statement st = conn.createStatement()) { 198 try (ResultSet rs = st.executeQuery("SELECT * FROM " + FT_TABLE)) { 199 Set<String> done = new HashSet<String>(); 200 while (rs.next()) { 201 String schema = rs.getString("SCHEMA"); 202 String table = rs.getString("TABLE"); 203 String key = schema + '.' + table; 204 if (!done.add(key)) { 205 continue; 206 } 207 createTrigger(conn, schema, table); 208 indexExistingRows(conn, schema, table); 209 } 210 } 211 } 212 } 213 214 private static void indexExistingRows(Connection conn, String schema, String table) throws SQLException { 215 Trigger trigger = new Trigger(); 216 trigger.init(conn, schema, null, table, false, org.h2.api.Trigger.INSERT); 217 try (Statement st = conn.createStatement()) { 218 try (ResultSet rs = st.executeQuery("SELECT * FROM " + StringUtils.quoteIdentifier(schema) + '.' 219 + StringUtils.quoteIdentifier(table))) { 220 int n = rs.getMetaData().getColumnCount(); 221 while (rs.next()) { 222 Object[] row = new Object[n]; 223 for (int i = 0; i < n; i++) { 224 row[i] = rs.getObject(i + 1); 225 } 226 trigger.fire(conn, null, row); 227 } 228 } 229 } 230 } 231 232 /** 233 * Creates a trigger for the indexes on a table. 234 * <p> 235 * Usually called through: 236 * 237 * <pre> 238 * CALL NXFT_CREATE_TRIGGERS('myschema', 'mytable'); 239 * </pre> 240 * 241 * @param conn the connection 242 * @param schema the schema name of the table 243 * @param table the table name 244 */ 245 private static void createTrigger(Connection conn, String schema, String table) throws SQLException { 246 try (Statement st = conn.createStatement()) { 247 schema = StringUtils.quoteIdentifier(schema); 248 String trigger = schema + '.' + StringUtils.quoteIdentifier(PREFIX + table); 249 st.execute("DROP TRIGGER IF EXISTS " + trigger); 250 st.execute(String.format("CREATE TRIGGER %s " + "AFTER INSERT, UPDATE, DELETE ON %s.%s " 251 + "FOR EACH ROW CALL \"%s\"", trigger, schema, StringUtils.quoteIdentifier(table), 252 H2Fulltext.Trigger.class.getName())); 253 } 254 } 255 256 private static void removeAllTriggers(Connection conn) throws SQLException { 257 try (Statement st = conn.createStatement()) { 258 try (ResultSet rs = st.executeQuery("SELECT * FROM INFORMATION_SCHEMA.TRIGGERS")) { 259 try (Statement st2 = conn.createStatement()) { 260 while (rs.next()) { 261 String trigger = rs.getString("TRIGGER_NAME"); 262 if (trigger.startsWith(PREFIX)) { 263 st2.execute("DROP TRIGGER " + StringUtils.quoteIdentifier(rs.getString("TRIGGER_SCHEMA")) 264 + "." + trigger); 265 } 266 } 267 } 268 } 269 } 270 } 271 272 /** 273 * Drops all fulltext indexes from the database. 274 */ 275 public static void dropAll(Connection conn) throws SQLException { 276 try (Statement st = conn.createStatement()) { 277 st.execute("DROP SCHEMA IF EXISTS " + FT_SCHEMA); 278 } 279 removeAllTriggers(conn); 280 removeIndexFiles(conn); 281 } 282 283 private static String fieldForIndex(String indexName) { 284 if (DEFAULT_INDEX_NAME.equals(indexName)) { 285 return FIELD_TEXT; 286 } else { 287 return FIELD_TEXT + '_' + indexName; 288 } 289 } 290 291 /** 292 * Searches from the given full text index. The returned result set has a single ID column which holds the keys for 293 * the matching rows. 294 * <p> 295 * Usually called through: 296 * 297 * <pre> 298 * SELECT * FROM NXFT_SEARCH(name, 'text'); 299 * </pre> 300 * 301 * @param conn the connection 302 * @param indexName the index name 303 * @param text the search query 304 * @return the result set 305 */ 306 public static ResultSet search(Connection conn, String indexName, String text) throws SQLException { 307 DatabaseMetaData meta = conn.getMetaData(); 308 if (indexName == null) { 309 indexName = DEFAULT_INDEX_NAME; 310 } 311 312 String schema; 313 String table; 314 String analyzerName; 315 316 // find schema, table and analyzer 317 try (PreparedStatement ps = conn.prepareStatement("SELECT SCHEMA, TABLE, ANALYZER FROM " + FT_TABLE 318 + " WHERE NAME = ?")) { 319 ps.setString(1, indexName); 320 try (ResultSet res = ps.executeQuery()) { 321 if (!res.next()) { 322 throw new SQLException("No such index: " + indexName); 323 } 324 schema = res.getString(1); 325 table = res.getString(2); 326 analyzerName = res.getString(3); 327 } 328 } 329 330 int type = getPrimaryKeyType(meta, schema, table); 331 SimpleResultSet rs = new SimpleResultSet(); 332 rs.addColumn(COL_KEY, type, 0, 0); 333 334 if (meta.getURL().startsWith("jdbc:columnlist:")) { 335 // this is just to query the result set columns 336 return rs; 337 } 338 339 // flush changes 340 final IndexWriter writer = getIndexWriter(getIndexName(conn), getIndexPath(conn), analyzerName); 341 if (writer.hasUncommittedChanges()) { 342 try { 343 writer.commit(); 344 } catch (IOException cause) { 345 throw convertException(cause); 346 } 347 } 348 349 // search index 350 try { 351 BooleanQuery query = new BooleanQuery(); 352 String defaultField = fieldForIndex(indexName); 353 Analyzer analyzer = getAnalyzer(analyzerName); 354 QueryParser parser = new QueryParser(LUCENE_VERSION, defaultField, analyzer); 355 query.add(parser.parse(text), BooleanClause.Occur.MUST); 356 357 try (IndexReader reader = DirectoryReader.open(writer.getDirectory())) { 358 IndexSearcher searcher = new IndexSearcher(reader); 359 Collector collector = new ResultSetCollector(rs, reader, type); 360 searcher.search(query, collector); 361 } 362 } catch (SQLException | ParseException | IOException e) { 363 throw convertException(e); 364 } 365 return rs; 366 } 367 368 protected static class ResultSetCollector extends Collector { 369 protected final SimpleResultSet rs; 370 371 protected IndexReader reader; 372 373 protected int type; 374 375 protected int docBase; 376 377 public ResultSetCollector(SimpleResultSet rs, IndexReader reader, int type) { 378 this.rs = rs; 379 this.reader = reader; 380 this.type = type; 381 } 382 383 @Override 384 public void setNextReader(AtomicReaderContext context) { 385 docBase = context.docBase; 386 } 387 388 @Override 389 public void setScorer(Scorer scorer) { 390 } 391 392 @Override 393 public boolean acceptsDocsOutOfOrder() { 394 return true; 395 } 396 397 @Override 398 public void collect(int docID) throws IOException { 399 docID += docBase; 400 Document doc = reader.document(docID, Collections.singleton(FIELD_KEY)); 401 Object key; 402 try { 403 key = asObject(doc.get(FIELD_KEY), type); 404 rs.addRow(new Object[] { key }); 405 } catch (SQLException e) { 406 throw new IOException(e); 407 } 408 } 409 } 410 411 private static int getPrimaryKeyType(DatabaseMetaData meta, String schema, String table) throws SQLException { 412 // find primary key name 413 String primaryKeyName = null; 414 try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) { 415 while (rs.next()) { 416 if (primaryKeyName != null) { 417 throw new SQLException("Can only index primary keys on one column for " + schema + '.' + table); 418 } 419 primaryKeyName = rs.getString("COLUMN_NAME"); 420 } 421 if (primaryKeyName == null) { 422 throw new SQLException("No primary key for " + schema + '.' + table); 423 } 424 } 425 // find primary key type 426 try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) { 427 if (!rs.next()) { 428 throw new SQLException("Could not find primary key"); 429 } 430 return rs.getInt("DATA_TYPE"); 431 } 432 } 433 434 private static Analyzer getAnalyzer(String analyzerName) throws SQLException { 435 Analyzer analyzer = analyzers.get(analyzerName); 436 if (analyzer == null) { 437 try { 438 Class<?> klass = Class.forName(analyzerName); 439 Constructor<?> constructor = klass.getConstructor(Version.class); 440 analyzer = (Analyzer) constructor.newInstance(LUCENE_VERSION); 441 } catch (ReflectiveOperationException e) { 442 throw new SQLException(e.toString()); 443 } 444 analyzers.put(analyzerName, analyzer); 445 } 446 return analyzer; 447 } 448 449 protected static String getIndexName(Connection conn) throws SQLException { 450 String catalog = conn.getCatalog(); 451 if (catalog == null) { 452 catalog = "default"; 453 } 454 return catalog; 455 } 456 457 protected static String getIndexPath(Connection conn) throws SQLException { 458 try (Statement st = conn.createStatement()) { 459 try (ResultSet rs = st.executeQuery("CALL DATABASE_PATH()")) { 460 rs.next(); 461 String path = rs.getString(1); 462 if (path == null) { 463 return null; 464 } 465 return path + ".lucene"; 466 } 467 } 468 469 } 470 471 private static IndexWriter getIndexWriter(String name, String path, String analyzer) throws SQLException { 472 IndexWriter indexWriter = indexWriters.get(name); 473 if (indexWriter != null) { 474 return indexWriter; 475 } 476 synchronized (indexWriters) { 477 indexWriter = indexWriters.get(name); 478 if (indexWriter != null) { 479 return indexWriter; 480 } 481 try { 482 Directory dir = path == null ? new RAMDirectory() : FSDirectory.open(new File(path)); 483 Analyzer an = getAnalyzer(analyzer); 484 IndexWriterConfig iwc = new IndexWriterConfig(LUCENE_VERSION, an); 485 iwc.setOpenMode(OpenMode.CREATE_OR_APPEND); 486 indexWriter = new IndexWriter(dir, iwc); 487 } catch (LockObtainFailedException e) { 488 throw convertException("Cannot open fulltext index " + path, e); 489 } catch (IOException e) { 490 throw convertException(e); 491 } 492 indexWriters.put(name, indexWriter); 493 return indexWriter; 494 } 495 } 496 497 private static void removeIndexFiles(Connection conn) throws SQLException { 498 String path = getIndexPath(conn); 499 try { 500 IndexWriter index = indexWriters.remove(path); 501 if (index != null) { 502 try { 503 index.close(); 504 } catch (IOException e) { 505 throw convertException(e); 506 } 507 } 508 } finally { 509 FileUtils.deleteRecursive(path, false); 510 } 511 } 512 513 private static SQLException convertException(Exception e) { 514 return convertException("Error while indexing document", e); 515 } 516 517 private static SQLException convertException(String message, Exception e) { 518 SQLException e2 = new SQLException(message); 519 e2.initCause(e); 520 return e2; 521 } 522 523 protected static String asString(Object data, int type) throws SQLException { 524 if (data == null) { 525 return ""; 526 } 527 switch (type) { 528 case Types.BIT: 529 case Types.BOOLEAN: 530 case Types.INTEGER: 531 case Types.BIGINT: 532 case Types.DECIMAL: 533 case Types.DOUBLE: 534 case Types.FLOAT: 535 case Types.NUMERIC: 536 case Types.REAL: 537 case Types.SMALLINT: 538 case Types.TINYINT: 539 case Types.DATE: 540 case Types.TIME: 541 case Types.TIMESTAMP: 542 case Types.LONGVARCHAR: 543 case Types.CHAR: 544 case Types.VARCHAR: 545 return data.toString(); 546 case Types.CLOB: 547 try { 548 if (data instanceof Clob) { 549 data = ((Clob) data).getCharacterStream(); 550 } 551 return IOUtils.readStringAndClose((Reader) data, -1); 552 } catch (IOException e) { 553 throw DbException.convert(e); 554 } 555 case Types.VARBINARY: 556 case Types.LONGVARBINARY: 557 case Types.BINARY: 558 case Types.JAVA_OBJECT: 559 case Types.OTHER: 560 case Types.BLOB: 561 case Types.STRUCT: 562 case Types.REF: 563 case Types.NULL: 564 case Types.ARRAY: 565 case Types.DATALINK: 566 case Types.DISTINCT: 567 throw new SQLException("Unsupported column data type: " + type); 568 default: 569 return ""; 570 } 571 } 572 573 // simple cases only, used for primary key 574 private static Object asObject(String string, int type) throws SQLException { 575 switch (type) { 576 case Types.BIGINT: 577 return Long.valueOf(string); 578 case Types.INTEGER: 579 case Types.SMALLINT: 580 case Types.TINYINT: 581 return Integer.valueOf(string); 582 case Types.LONGVARCHAR: 583 case Types.CHAR: 584 case Types.VARCHAR: 585 return string; 586 default: 587 throw new SQLException("Unsupport data type for primary key: " + type); 588 } 589 } 590 591 /** 592 * Trigger used to update the lucene index upon row change. 593 */ 594 public static class Trigger implements org.h2.api.Trigger { 595 596 private static final Log log = LogFactory.getLog(Trigger.class); 597 598 private String indexName; 599 600 private String indexPath; 601 602 private IndexWriter indexWriter; 603 604 // DEBUG 605 private Exception lastIndexWriterClose; 606 607 // DEBUG 608 private String lastIndexWriterCloseThread; 609 610 /** Starting at 0. */ 611 private int primaryKeyIndex; 612 613 private int primaryKeyType; 614 615 private Map<String, int[]> columnTypes; 616 617 private Map<String, int[]> columnIndices; 618 619 /** 620 * Trigger interface: initialization. 621 */ 622 @Override 623 public void init(Connection conn, String schema, String triggerName, String table, boolean before, int opType) 624 throws SQLException { 625 DatabaseMetaData meta = conn.getMetaData(); 626 627 // find primary key name 628 String primaryKeyName = null; 629 try (ResultSet rs = meta.getPrimaryKeys(null, schema, table)) { 630 while (rs.next()) { 631 if (primaryKeyName != null) { 632 throw new SQLException("Can only index primary keys on one column for: " + schema + '.' + table); 633 } 634 primaryKeyName = rs.getString("COLUMN_NAME"); 635 } 636 if (primaryKeyName == null) { 637 throw new SQLException("No primary key for " + schema + '.' + table); 638 } 639 } 640 641 // find primary key type 642 try (ResultSet rs = meta.getColumns(null, schema, table, primaryKeyName)) { 643 if (!rs.next()) { 644 throw new SQLException("No primary key for: " + schema + '.' + table); 645 } 646 primaryKeyType = rs.getInt("DATA_TYPE"); 647 primaryKeyIndex = rs.getInt("ORDINAL_POSITION") - 1; 648 } 649 650 // find all columns info 651 Map<String, Integer> allColumnTypes = new HashMap<String, Integer>(); 652 Map<String, Integer> allColumnIndices = new HashMap<String, Integer>(); 653 try (ResultSet rs = meta.getColumns(null, schema, table, null)) { 654 while (rs.next()) { 655 String name = rs.getString("COLUMN_NAME"); 656 int type = rs.getInt("DATA_TYPE"); 657 int index = rs.getInt("ORDINAL_POSITION") - 1; 658 allColumnTypes.put(name, Integer.valueOf(type)); 659 allColumnIndices.put(name, Integer.valueOf(index)); 660 } 661 } 662 663 // find columns configured for indexing 664 try (PreparedStatement ps = conn.prepareStatement("SELECT NAME, COLUMNS, ANALYZER FROM " + FT_TABLE 665 + " WHERE SCHEMA = ? AND TABLE = ?")) { 666 ps.setString(1, schema); 667 ps.setString(2, table); 668 try (ResultSet rs = ps.executeQuery()) { 669 columnTypes = new HashMap<String, int[]>(); 670 columnIndices = new HashMap<String, int[]>(); 671 while (rs.next()) { 672 String index = rs.getString(1); 673 String columns = rs.getString(2); 674 String analyzerName = rs.getString(3); 675 List<String> columnNames = Arrays.asList(columns.split(",")); 676 677 // find the columns' indices and types 678 int[] types = new int[columnNames.size()]; 679 int[] indices = new int[columnNames.size()]; 680 int i = 0; 681 for (String columnName : columnNames) { 682 types[i] = allColumnTypes.get(columnName).intValue(); 683 indices[i] = allColumnIndices.get(columnName).intValue(); 684 i++; 685 } 686 columnTypes.put(index, types); 687 columnIndices.put(index, indices); 688 // only one call actually needed for this: 689 indexName = getIndexName(conn); 690 indexPath = getIndexPath(conn); 691 indexWriter = getIndexWriter(indexName, indexPath, analyzerName); 692 } 693 694 } 695 } 696 } 697 698 /** 699 * Trigger interface. 700 */ 701 @Override 702 public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException { 703 if (indexWriter == null) { 704 throw new SQLException("Fulltext index was not initialized"); 705 } 706 if (oldRow != null) { 707 delete(oldRow); 708 } 709 if (newRow != null) { 710 insert(newRow); 711 } 712 } 713 714 private void insert(Object[] row) throws SQLException { 715 Document doc = new Document(); 716 String key = asString(row[primaryKeyIndex], primaryKeyType); 717 // StringField is not tokenized 718 StringField keyField = new StringField(FIELD_KEY, key, Field.Store.YES); 719 doc.add(keyField); 720 721 // add fulltext for all indexes 722 for (String indexName : columnTypes.keySet()) { 723 int[] types = columnTypes.get(indexName); 724 int[] indices = columnIndices.get(indexName); 725 StringBuilder buf = new StringBuilder(); 726 for (int i = 0; i < types.length; i++) { 727 String data = asString(row[indices[i]], types[i]); 728 if (i > 0) { 729 buf.append(' '); 730 } 731 buf.append(data); 732 } 733 // TextField is tokenized 734 TextField textField = new TextField(fieldForIndex(indexName), buf.toString(), Field.Store.NO); 735 doc.add(textField); 736 } 737 738 try { 739 indexWriter.addDocument(doc); 740 } catch (IOException e) { 741 throw convertException(e); 742 } catch (org.apache.lucene.store.AlreadyClosedException e) { 743 // DEBUG 744 log.error( 745 "org.apache.lucene.store.AlreadyClosedException in thread " + Thread.currentThread().getName() 746 + ", last close was in thread " + lastIndexWriterCloseThread, lastIndexWriterClose); 747 throw e; 748 } 749 } 750 751 private void delete(Object[] row) throws SQLException { 752 String primaryKey = asString(row[primaryKeyIndex], primaryKeyType); 753 try { 754 indexWriter.deleteDocuments(new Term(FIELD_KEY, primaryKey)); 755 } catch (IOException e) { 756 throw convertException(e); 757 } 758 } 759 760 @Override 761 public void close() throws SQLException { 762 if (indexWriter != null) { 763 try { 764 // DEBUG 765 lastIndexWriterClose = new RuntimeException("debug stack trace"); 766 lastIndexWriterCloseThread = Thread.currentThread().getName(); 767 indexWriter.close(); 768 indexWriter = null; 769 } catch (IOException e) { 770 throw convertException(e); 771 } finally { 772 indexWriters.remove(indexName); 773 } 774 } 775 } 776 777 @Override 778 public void remove() { 779 // ignore 780 } 781 } 782 783}