001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.audit; 021 022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 023 024import java.io.IOException; 025import java.io.OutputStream; 026import java.io.Serializable; 027import java.util.ArrayList; 028import java.util.Calendar; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.collections.MapUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.codehaus.jackson.JsonFactory; 038import org.codehaus.jackson.JsonGenerator; 039import org.elasticsearch.action.bulk.BulkItemResponse; 040import org.elasticsearch.action.bulk.BulkRequestBuilder; 041import org.elasticsearch.action.bulk.BulkResponse; 042import org.elasticsearch.action.get.GetResponse; 043import org.elasticsearch.action.search.SearchRequestBuilder; 044import org.elasticsearch.action.search.SearchResponse; 045import org.elasticsearch.action.search.SearchType; 046import org.elasticsearch.client.Client; 047import org.elasticsearch.common.io.stream.BytesStreamOutput; 048import org.elasticsearch.common.unit.TimeValue; 049import org.elasticsearch.common.xcontent.XContentBuilder; 050import org.elasticsearch.index.query.BoolQueryBuilder; 051import org.elasticsearch.index.query.QueryBuilder; 052import org.elasticsearch.index.query.QueryBuilders; 053import org.elasticsearch.index.query.TermQueryBuilder; 054import org.elasticsearch.search.SearchHit; 055import org.elasticsearch.search.aggregations.AggregationBuilders; 056import org.elasticsearch.search.aggregations.metrics.max.Max; 057import org.elasticsearch.search.sort.SortOrder; 058import org.joda.time.DateTime; 059import org.joda.time.format.ISODateTimeFormat; 060import org.nuxeo.common.utils.TextTemplate; 061import org.nuxeo.ecm.core.api.DocumentModel; 062import org.nuxeo.ecm.core.api.NuxeoException; 063import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 064import org.nuxeo.ecm.core.uidgen.UIDSequencer; 065import org.nuxeo.ecm.core.work.api.Work; 066import org.nuxeo.ecm.core.work.api.Work.State; 067import org.nuxeo.ecm.core.work.api.WorkManager; 068import org.nuxeo.ecm.platform.audit.api.AuditReader; 069import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 070import org.nuxeo.ecm.platform.audit.api.FilterMapEntry; 071import org.nuxeo.ecm.platform.audit.api.LogEntry; 072import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException; 073import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser; 074import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 075import org.nuxeo.ecm.platform.audit.service.AuditBackend; 076import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 077import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 078import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 079import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 080import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 081import org.nuxeo.elasticsearch.ElasticSearchConstants; 082import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 083import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 084import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter; 085import org.nuxeo.runtime.api.Framework; 086import org.nuxeo.runtime.model.DefaultComponent; 087 088/** 089 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 090 * 091 * @author tiry 092 */ 093public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 094 095 public static final String SEQ_NAME = "audit"; 096 097 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 098 099 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 100 101 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 102 103 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 104 105 public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 106 super(component, config); 107 } 108 109 protected Client esClient; 110 111 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 112 113 protected BaseLogEntryProvider provider = new BaseLogEntryProvider() { 114 115 @Override 116 public int removeEntries(String eventId, String pathPattern) { 117 throw new UnsupportedOperationException("Not implemented yet!"); 118 } 119 120 @Override 121 public void addLogEntry(LogEntry logEntry) { 122 List<LogEntry> entries = new ArrayList<>(); 123 entries.add(logEntry); 124 addLogEntries(entries); 125 } 126 127 @Override 128 public List<LogEntry> getLogEntriesFor(String uuid, String repositoryId) { 129 throw new UnsupportedOperationException("Not implemented yet!"); 130 } 131 132 @Override 133 public List<LogEntry> getLogEntriesFor(String uuid) { 134 throw new UnsupportedOperationException("Not implemented yet!"); 135 } 136 137 @Override 138 public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, 139 boolean doDefaultSort) { 140 throw new UnsupportedOperationException("Not implemented yet!"); 141 } 142 }; 143 144 protected Client getClient() { 145 log.info("Activate Elasticsearch backend for Audit"); 146 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 147 Client client = esa.getClient(); 148 ensureUIDSequencer(client); 149 return client; 150 } 151 152 protected boolean isMigrationDone() { 153 AuditReader reader = Framework.getService(AuditReader.class); 154 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 155 return !entries.isEmpty(); 156 } 157 158 @Override 159 public int getApplicationStartedOrder() { 160 int elasticOrder = ((DefaultComponent) Framework.getRuntime() 161 .getComponent("org.nuxeo.elasticsearch.ElasticSearchComponent")) 162 .getApplicationStartedOrder(); 163 int uidgenOrder = ((DefaultComponent) Framework.getRuntime() 164 .getComponent("org.nuxeo.ecm.core.uidgen.UIDGeneratorService")) 165 .getApplicationStartedOrder(); 166 return Integer.max(elasticOrder, uidgenOrder) + 1; 167 } 168 169 @Override 170 public void onApplicationStarted() { 171 esClient = getClient(); 172 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 173 if (!isMigrationDone()) { 174 log.info(String.format( 175 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 176 MIGRATION_FLAG_PROP)); 177 // Drop audit index first in case of a previous bad migration 178 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 179 esa.dropAndInitIndex(getESIndexName()); 180 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 181 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 182 if (batchSizeProp != null) { 183 batchSize = Integer.parseInt(batchSizeProp); 184 } 185 migrate(batchSize); 186 } else { 187 log.warn(String.format( 188 "Property %s is true but migration is already done, please set this property to false", 189 MIGRATION_FLAG_PROP)); 190 } 191 } else { 192 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 193 } 194 } 195 196 @Override 197 public void onApplicationStopped() { 198 if (esClient == null) { 199 return; 200 } 201 try { 202 esClient.close(); 203 } finally { 204 esClient = null; 205 } 206 } 207 208 @Override 209 public List<LogEntry> getLogEntriesFor(String uuid, String repositoryId) { 210 TermQueryBuilder docFilter = QueryBuilders.termQuery("docUUID", uuid); 211 TermQueryBuilder repoFilter = QueryBuilders.termQuery("repositoryId", repositoryId); 212 QueryBuilder filter; 213 filter = QueryBuilders.boolQuery().must(docFilter); 214 filter = QueryBuilders.boolQuery().must(repoFilter); 215 return getLogEntries(filter, false); 216 } 217 218 @Override 219 public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) { 220 TermQueryBuilder docFilter = QueryBuilders.termQuery("docUUID", uuid); 221 QueryBuilder filter; 222 if (MapUtils.isEmpty(filterMap)) { 223 filter = docFilter; 224 } else { 225 filter = QueryBuilders.boolQuery().must(docFilter); 226 for (String key : filterMap.keySet()) { 227 FilterMapEntry entry = filterMap.get(key); 228 ((BoolQueryBuilder) filter).must(QueryBuilders.termQuery(entry.getColumnName(), entry.getObject())); 229 } 230 } 231 return getLogEntries(filter, doDefaultSort); 232 } 233 234 protected List<LogEntry> getLogEntries(QueryBuilder filter, boolean doDefaultSort) { 235 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 236 if (doDefaultSort) { 237 builder.addSort("eventDate", SortOrder.DESC); 238 } 239 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 240 builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setScroll(keepAlive).setSize(100); 241 242 logSearchRequest(builder); 243 SearchResponse searchResponse = builder.get(); 244 logSearchResponse(searchResponse); 245 246 // Build log entries 247 List<LogEntry> logEntries = buildLogEntries(searchResponse); 248 // Scroll on next results 249 for (; // 250 searchResponse.getHits().getHits().length > 0 251 && logEntries.size() < searchResponse.getHits().getTotalHits(); // 252 searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) { 253 // Build log entries 254 logEntries.addAll(buildLogEntries(searchResponse)); 255 } 256 return logEntries; 257 } 258 259 SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) { 260 if (log.isDebugEnabled()) { 261 log.debug(String.format( 262 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 263 keepAlive, scrollId)); 264 } 265 SearchResponse response = esClient.prepareSearchScroll(scrollId).setScroll(keepAlive).execute().actionGet(); 266 logSearchResponse(response); 267 return response; 268 } 269 270 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 271 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 272 for (SearchHit hit : searchResponse.getHits()) { 273 try { 274 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 275 } catch (IOException e) { 276 log.error("Error while reading Audit Entry from ES", e); 277 } 278 } 279 return entries; 280 } 281 282 protected SearchRequestBuilder getSearchRequestBuilder(Client esClient) { 283 return esClient.prepareSearch(getESIndexName()) 284 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 285 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 286 } 287 288 @Override 289 public LogEntry getLogEntryByID(long id) { 290 GetResponse ret = esClient.prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id)) 291 .get(); 292 if (!ret.isExists()) { 293 return null; 294 } 295 try { 296 return AuditEntryJSONReader.read(ret.getSourceAsString()); 297 } catch (IOException e) { 298 throw new RuntimeException("Unable to read Entry for id " + id, e); 299 } 300 } 301 302 public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) { 303 if (params != null && params.size() > 0) { 304 query = expandQueryVariables(query, params); 305 } 306 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 307 builder.setQuery(query); 308 return builder; 309 } 310 311 public String expandQueryVariables(String query, Object[] params) { 312 Map<String, Object> qParams = new HashMap<>(); 313 for (int i = 0; i < params.length; i++) { 314 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 315 qParams.put("param" + i, params[i]); 316 } 317 return expandQueryVariables(query, qParams); 318 } 319 320 public String expandQueryVariables(String query, Map<String, Object> params) { 321 if (params != null && params.size() > 0) { 322 TextTemplate tmpl = new TextTemplate(); 323 for (String key : params.keySet()) { 324 Object val = params.get(key); 325 if (val == null) { 326 continue; 327 } else if (val instanceof Calendar) { 328 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 329 } else if (val instanceof Date) { 330 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 331 } else { 332 tmpl.setVariable(key, val.toString()); 333 } 334 } 335 query = tmpl.processText(query); 336 } 337 return query; 338 } 339 340 @Override 341 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 342 SearchRequestBuilder builder = buildQuery(query, params); 343 if (pageNb > 0) { 344 builder.setFrom(pageNb * pageSize); 345 } 346 if (pageSize > 0) { 347 builder.setSize(pageSize); 348 } 349 logSearchRequest(builder); 350 SearchResponse searchResponse = builder.get(); 351 logSearchResponse(searchResponse); 352 return buildLogEntries(searchResponse); 353 } 354 355 @Override 356 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 357 int pageSize) { 358 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 359 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 360 if (eventIds != null && eventIds.length > 0) { 361 if (eventIds.length == 1) { 362 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 363 } else { 364 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 365 } 366 } 367 if (categories != null && categories.length > 0) { 368 if (categories.length == 1) { 369 filterBuilder.must(QueryBuilders.termQuery("category", categories[0])); 370 } else { 371 filterBuilder.must(QueryBuilders.termsQuery("category", categories)); 372 } 373 } 374 if (path != null) { 375 filterBuilder.must(QueryBuilders.termQuery("docPath", path)); 376 } 377 378 if (limit != null) { 379 filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(limit)); 380 } 381 382 builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder)); 383 384 if (pageNb > 0) { 385 builder.setFrom(pageNb * pageSize); 386 } 387 if (pageSize > 0) { 388 builder.setSize(pageSize); 389 } 390 logSearchRequest(builder); 391 SearchResponse searchResponse = builder.get(); 392 logSearchResponse(searchResponse); 393 return buildLogEntries(searchResponse); 394 } 395 396 @Override 397 public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path, 398 int pageNb, int pageSize) { 399 400 Date limit = null; 401 if (dateRange != null) { 402 try { 403 limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange); 404 } catch (AuditQueryException aqe) { 405 aqe.addInfo("Wrong date range query. Query was " + dateRange); 406 throw aqe; 407 } 408 } 409 return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize); 410 } 411 412 @Override 413 public void addLogEntries(List<LogEntry> entries) { 414 415 if (entries.isEmpty()) { 416 return; 417 } 418 419 BulkRequestBuilder bulkRequest = esClient.prepareBulk(); 420 JsonFactory factory = new JsonFactory(); 421 422 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 423 UIDSequencer seq = uidGeneratorService.getSequencer(); 424 425 try { 426 427 for (LogEntry entry : entries) { 428 entry.setId(seq.getNext(SEQ_NAME)); 429 if (log.isDebugEnabled()) { 430 log.debug(String.format("Indexing log entry: %s", entry)); 431 } 432 OutputStream out = new BytesStreamOutput(); 433 JsonGenerator jsonGen = factory.createJsonGenerator(out); 434 XContentBuilder builder = jsonBuilder(out); 435 AuditEntryJSONWriter.asJSON(jsonGen, entry); 436 bulkRequest.add(esClient.prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 437 String.valueOf(entry.getId())).setSource(builder)); 438 } 439 440 BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 441 if (bulkResponse.hasFailures()) { 442 for (BulkItemResponse response : bulkResponse.getItems()) { 443 if (response.isFailed()) { 444 log.error("Unable to index audit entry " + response.getItemId() + " :" 445 + response.getFailureMessage()); 446 } 447 } 448 } 449 } catch (IOException e) { 450 throw new NuxeoException("Error while indexing Audit entries", e); 451 } 452 453 } 454 455 @Override 456 public Long getEventsCount(String eventId) { 457 SearchResponse res = esClient.prepareSearch(getESIndexName()) 458 .setTypes(ElasticSearchConstants.ENTRY_TYPE) 459 .setQuery(QueryBuilders.constantScoreQuery( 460 QueryBuilders.termQuery("eventId", eventId))) 461 .setSize(0) 462 .get(); 463 return res.getHits().getTotalHits(); 464 } 465 466 @Override 467 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 468 return syncLogCreationEntries(provider, repoId, path, recurs); 469 } 470 471 protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 472 473 if (searchDocumentModel == null) { 474 return QueryBuilders.matchAllQuery(); 475 } 476 477 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 478 479 int nbFilters = 0; 480 481 for (PredicateDefinition predicate : predicates) { 482 483 // extract data from DocumentModel 484 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 485 Object[] val = new Object[fieldDef.length]; 486 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 487 if (fieldDef[fidx].getXpath() != null) { 488 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 489 } else { 490 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 491 } 492 } 493 494 if (!isNonNullParam(val)) { 495 // skip predicate where all values are null 496 continue; 497 } 498 499 nbFilters++; 500 501 String op = predicate.getOperator(); 502 if (op.equalsIgnoreCase("IN")) { 503 504 String[] values = null; 505 if (val[0] instanceof Iterable<?>) { 506 List<String> l = new ArrayList<>(); 507 Iterable<?> vals = (Iterable<?>) val[0]; 508 509 for (Object v : vals) { 510 if (v != null) { 511 l.add(v.toString()); 512 } 513 } 514 values = l.toArray(new String[l.size()]); 515 } else if (val[0] instanceof Object[]) { 516 values = (String[]) val[0]; 517 } 518 filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values)); 519 } else if (op.equalsIgnoreCase("BETWEEN")) { 520 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(val[0])); 521 if (val.length > 1) { 522 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(val[1])); 523 } 524 } else if (">".equals(op)) { 525 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(val[0])); 526 } else if (">=".equals(op)) { 527 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(val[0])); 528 } else if ("<".equals(op)) { 529 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(val[0])); 530 } else if ("<=".equals(op)) { 531 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(val[0])); 532 } else { 533 filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), val[0])); 534 } 535 } 536 537 if (nbFilters == 0) { 538 return QueryBuilders.matchAllQuery(); 539 } 540 return filterBuilder; 541 } 542 543 public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 544 DocumentModel searchDocumentModel) { 545 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 546 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 547 QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 548 builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)); 549 return builder; 550 } 551 552 protected boolean isNonNullParam(Object[] val) { 553 if (val == null) { 554 return false; 555 } 556 for (Object v : val) { 557 if (v != null) { 558 if (v instanceof String) { 559 if (!((String) v).isEmpty()) { 560 return true; 561 } 562 } else if (v instanceof String[]) { 563 if (((String[]) v).length > 0) { 564 return true; 565 } 566 } else { 567 return true; 568 } 569 } 570 } 571 return false; 572 } 573 574 @SuppressWarnings("deprecation") 575 public String migrate(final int batchSize) { 576 577 final String MIGRATION_WORK_ID = "AuditMigration"; 578 579 WorkManager wm = Framework.getService(WorkManager.class); 580 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 581 if (migrationState != null) { 582 return "Migration already scheduled : " + migrationState.toString(); 583 } 584 585 Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize); 586 wm.schedule(migrationWork); 587 return "Migration work started : " + MIGRATION_WORK_ID; 588 } 589 590 protected void logSearchResponse(SearchResponse response) { 591 if (log.isDebugEnabled()) { 592 log.debug("Response: " + response.toString()); 593 } 594 } 595 596 protected void logSearchRequest(SearchRequestBuilder request) { 597 if (log.isDebugEnabled()) { 598 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 599 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 600 } 601 } 602 603 /** 604 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 605 */ 606 protected void ensureUIDSequencer(Client esClient) { 607 boolean auditIndexExists = esClient.admin() 608 .indices() 609 .prepareExists(getESIndexName()) 610 .execute() 611 .actionGet() 612 .isExists(); 613 if (!auditIndexExists) { 614 return; 615 } 616 617 // Get max log entry id 618 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 619 builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id")); 620 SearchResponse searchResponse = builder.execute().actionGet(); 621 Max agg = searchResponse.getAggregations().get("maxAgg"); 622 int maxLogEntryId = (int) agg.getValue(); 623 624 // Get next sequence id 625 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 626 UIDSequencer seq = uidGeneratorService.getSequencer(); 627 seq.init(); 628 int nextSequenceId = seq.getNext(SEQ_NAME); 629 630 // Increment sequence to max log entry id if needed 631 if (nextSequenceId < maxLogEntryId) { 632 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 633 nextSequenceId, maxLogEntryId)); 634 seq.initSequence(SEQ_NAME, maxLogEntryId); 635 } 636 } 637 638 @Override 639 public ExtendedInfo newExtendedInfo(Serializable value) { 640 return new ESExtendedInfo(value); 641 } 642 643 protected String getESIndexName() { 644 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 645 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 646 } 647 648}