001/* 002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Tiry 016 * Benoit Delbosc 017 */ 018package org.nuxeo.elasticsearch.audit; 019 020import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 021 022import java.io.IOException; 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.Calendar; 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.TimeZone; 033 034import org.apache.commons.collections.MapUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.codehaus.jackson.JsonFactory; 038import org.codehaus.jackson.JsonGenerator; 039import org.elasticsearch.action.bulk.BulkItemResponse; 040import org.elasticsearch.action.bulk.BulkRequestBuilder; 041import org.elasticsearch.action.bulk.BulkResponse; 042import org.elasticsearch.action.count.CountResponse; 043import org.elasticsearch.action.get.GetResponse; 044import org.elasticsearch.action.search.SearchRequestBuilder; 045import org.elasticsearch.action.search.SearchResponse; 046import org.elasticsearch.action.search.SearchType; 047import org.elasticsearch.client.Client; 048import org.elasticsearch.common.xcontent.XContentBuilder; 049import org.elasticsearch.index.query.BoolFilterBuilder; 050import org.elasticsearch.index.query.FilterBuilder; 051import org.elasticsearch.index.query.FilterBuilders; 052import org.elasticsearch.index.query.QueryBuilder; 053import org.elasticsearch.index.query.QueryBuilders; 054import org.elasticsearch.index.query.TermFilterBuilder; 055import org.elasticsearch.search.SearchHit; 056import org.elasticsearch.search.aggregations.AggregationBuilders; 057import org.elasticsearch.search.aggregations.metrics.max.Max; 058import org.elasticsearch.search.sort.SortOrder; 059import org.joda.time.DateTime; 060import org.joda.time.format.ISODateTimeFormat; 061import org.nuxeo.common.utils.TextTemplate; 062import org.nuxeo.ecm.core.api.DocumentModel; 063import org.nuxeo.ecm.core.api.NuxeoException; 064import org.nuxeo.ecm.core.api.security.SecurityConstants; 065import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 066import org.nuxeo.ecm.core.uidgen.UIDSequencer; 067import org.nuxeo.ecm.core.work.AbstractWork; 068import org.nuxeo.ecm.core.work.api.Work; 069import org.nuxeo.ecm.core.work.api.Work.State; 070import org.nuxeo.ecm.core.work.api.WorkManager; 071import org.nuxeo.ecm.platform.audit.api.AuditLogger; 072import org.nuxeo.ecm.platform.audit.api.AuditReader; 073import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 074import org.nuxeo.ecm.platform.audit.api.FilterMapEntry; 075import org.nuxeo.ecm.platform.audit.api.LogEntry; 076import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException; 077import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser; 078import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 079import org.nuxeo.ecm.platform.audit.service.AuditBackend; 080import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 081import org.nuxeo.ecm.platform.audit.service.DefaultAuditBackend; 082import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 083import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 084import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 085import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 086import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter; 087import org.nuxeo.runtime.api.Framework; 088import org.nuxeo.runtime.transaction.TransactionHelper; 089 090/** 091 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 092 * 093 * @author tiry 094 */ 095public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 096 097 public static final String IDX_NAME = "audit"; 098 099 public static final String IDX_TYPE = "entry"; 100 101 public static final String SEQ_NAME = "audit"; 102 103 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 104 105 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 106 107 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 108 109 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 110 111 protected Client esClient = null; 112 113 @SuppressWarnings("hiding") 114 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 115 116 protected BaseLogEntryProvider provider = null; 117 118 protected Client getClient() { 119 if (esClient == null) { 120 log.info("Activate Elasticsearch backend for Audit"); 121 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 122 esClient = esa.getClient(); 123 ensureUIDSequencer(esClient); 124 } 125 return esClient; 126 } 127 128 protected boolean isMigrationDone() { 129 AuditReader reader = Framework.getService(AuditReader.class); 130 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 131 return !entries.isEmpty(); 132 } 133 134 @Override 135 public void deactivate() { 136 if (esClient != null) { 137 esClient.close(); 138 } 139 } 140 141 @Override 142 public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) { 143 SearchRequestBuilder builder = getSearchRequestBuilder(); 144 TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid); 145 FilterBuilder filter; 146 if (MapUtils.isEmpty(filterMap)) { 147 filter = docFilter; 148 } else { 149 filter = FilterBuilders.boolFilter(); 150 ((BoolFilterBuilder) filter).must(docFilter); 151 for (String key : filterMap.keySet()) { 152 FilterMapEntry entry = filterMap.get(key); 153 ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject())); 154 } 155 } 156 builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE); 157 if (doDefaultSort) { 158 builder.addSort("eventDate", SortOrder.DESC); 159 } 160 logSearchRequest(builder); 161 SearchResponse searchResponse = builder.get(); 162 logSearchResponse(searchResponse); 163 return buildLogEntries(searchResponse); 164 } 165 166 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 167 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 168 for (SearchHit hit : searchResponse.getHits()) { 169 try { 170 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 171 } catch (IOException e) { 172 log.error("Error while reading Audit Entry from ES", e); 173 } 174 } 175 return entries; 176 } 177 178 protected SearchRequestBuilder getSearchRequestBuilder() { 179 return getClient().prepareSearch(IDX_NAME).setTypes(IDX_TYPE).setSearchType(SearchType.DFS_QUERY_THEN_FETCH); 180 } 181 182 @Override 183 public LogEntry getLogEntryByID(long id) { 184 GetResponse ret = getClient().prepareGet(IDX_NAME, IDX_TYPE, String.valueOf(id)).get(); 185 if (!ret.isExists()) { 186 return null; 187 } 188 try { 189 return AuditEntryJSONReader.read(ret.getSourceAsString()); 190 } catch (IOException e) { 191 throw new RuntimeException("Unable to read Entry for id " + id, e); 192 } 193 } 194 195 public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) { 196 if (params != null && params.size() > 0) { 197 query = expandQueryVariables(query, params); 198 } 199 SearchRequestBuilder builder = getSearchRequestBuilder(); 200 builder.setQuery(query); 201 return builder; 202 } 203 204 public String expandQueryVariables(String query, Object[] params) { 205 Map<String, Object> qParams = new HashMap<>(); 206 for (int i = 0; i < params.length; i++) { 207 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 208 qParams.put("param" + i, params[i]); 209 } 210 return expandQueryVariables(query, qParams); 211 } 212 213 public String expandQueryVariables(String query, Map<String, Object> params) { 214 if (params != null && params.size() > 0) { 215 TextTemplate tmpl = new TextTemplate(); 216 for (String key : params.keySet()) { 217 Object val = params.get(key); 218 if (val == null) { 219 continue; 220 } else if (val instanceof Calendar) { 221 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 222 } else if (val instanceof Date) { 223 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 224 } else { 225 tmpl.setVariable(key, val.toString()); 226 } 227 } 228 query = tmpl.processText(query); 229 } 230 return query; 231 } 232 233 @Override 234 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 235 SearchRequestBuilder builder = buildQuery(query, params); 236 if (pageNb > 0) { 237 builder.setFrom(pageNb * pageSize); 238 } 239 if (pageSize > 0) { 240 builder.setSize(pageSize); 241 } 242 logSearchRequest(builder); 243 SearchResponse searchResponse = builder.get(); 244 logSearchResponse(searchResponse); 245 return buildLogEntries(searchResponse); 246 } 247 248 @Override 249 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 250 int pageSize) { 251 SearchRequestBuilder builder = getSearchRequestBuilder(); 252 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 253 if (eventIds != null && eventIds.length > 0) { 254 if (eventIds.length == 1) { 255 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 256 } else { 257 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 258 } 259 } 260 if (categories != null && categories.length > 0) { 261 if (categories.length == 1) { 262 filterBuilder.must(FilterBuilders.termFilter("category", categories[0])); 263 } else { 264 filterBuilder.must(FilterBuilders.termsFilter("category", categories)); 265 } 266 } 267 if (path != null) { 268 filterBuilder.must(FilterBuilders.termFilter("docPath", path)); 269 } 270 271 if (limit != null) { 272 filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit)); 273 } 274 275 builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder)); 276 277 if (pageNb > 0) { 278 builder.setFrom(pageNb * pageSize); 279 } 280 if (pageSize > 0) { 281 builder.setSize(pageSize); 282 } else { 283 builder.setSize(Integer.MAX_VALUE); 284 } 285 logSearchRequest(builder); 286 SearchResponse searchResponse = builder.get(); 287 logSearchResponse(searchResponse); 288 return buildLogEntries(searchResponse); 289 } 290 291 @Override 292 public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path, 293 int pageNb, int pageSize) { 294 295 Date limit = null; 296 if (dateRange != null) { 297 try { 298 limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange); 299 } catch (AuditQueryException aqe) { 300 aqe.addInfo("Wrong date range query. Query was " + dateRange); 301 throw aqe; 302 } 303 } 304 return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize); 305 } 306 307 @Override 308 public void addLogEntries(List<LogEntry> entries) { 309 310 BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 311 JsonFactory factory = new JsonFactory(); 312 313 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 314 UIDSequencer seq = uidGeneratorService.getSequencer(); 315 316 try { 317 318 for (LogEntry entry : entries) { 319 entry.setId(seq.getNext(SEQ_NAME)); 320 XContentBuilder builder = jsonBuilder(); 321 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 322 AuditEntryJSONWriter.asJSON(jsonGen, entry); 323 bulkRequest.add(getClient().prepareIndex(IDX_NAME, IDX_TYPE, String.valueOf(entry.getId())).setSource( 324 builder)); 325 } 326 327 BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 328 if (bulkResponse.hasFailures()) { 329 for (BulkItemResponse response : bulkResponse.getItems()) { 330 if (response.isFailed()) { 331 log.error("Unable to index audit entry " + response.getItemId() + " :" 332 + response.getFailureMessage()); 333 } 334 } 335 } 336 } catch (IOException e) { 337 throw new NuxeoException("Error while indexing Audit entries", e); 338 } 339 340 } 341 342 @Override 343 public Long getEventsCount(String eventId) { 344 CountResponse res = getClient().prepareCount(IDX_NAME).setTypes(IDX_TYPE).setQuery( 345 QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get(); 346 return res.getCount(); 347 } 348 349 protected BaseLogEntryProvider getProvider() { 350 351 if (provider == null) { 352 provider = new BaseLogEntryProvider() { 353 354 @Override 355 public int removeEntries(String eventId, String pathPattern) { 356 throw new UnsupportedOperationException("Not implemented yet!"); 357 } 358 359 @Override 360 public void addLogEntry(LogEntry logEntry) { 361 List<LogEntry> entries = new ArrayList<>(); 362 entries.add(logEntry); 363 addLogEntries(entries); 364 } 365 }; 366 } 367 return provider; 368 } 369 370 @Override 371 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 372 return syncLogCreationEntries(getProvider(), repoId, path, recurs); 373 } 374 375 protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 376 377 if (searchDocumentModel == null) { 378 return FilterBuilders.matchAllFilter(); 379 } 380 381 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 382 383 int nbFilters = 0; 384 385 for (PredicateDefinition predicate : predicates) { 386 387 // extract data from DocumentModel 388 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 389 Object[] val = new Object[fieldDef.length]; 390 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 391 if (fieldDef[fidx].getXpath() != null) { 392 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 393 } else { 394 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 395 } 396 } 397 398 if (!isNonNullParam(val)) { 399 // skip predicate where all values are null 400 continue; 401 } 402 403 nbFilters++; 404 405 String op = predicate.getOperator(); 406 if (op.equalsIgnoreCase("IN")) { 407 408 String[] values = null; 409 if (val[0] instanceof Iterable<?>) { 410 List<String> l = new ArrayList<>(); 411 Iterable<?> vals = (Iterable<?>) val[0]; 412 Iterator<?> valueIterator = vals.iterator(); 413 414 while (valueIterator.hasNext()) { 415 416 Object v = valueIterator.next(); 417 if (v != null) { 418 l.add(v.toString()); 419 } 420 } 421 values = l.toArray(new String[l.size()]); 422 } else if (val[0] instanceof Object[]) { 423 values = (String[]) val[0]; 424 } 425 filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values)); 426 } else if (op.equalsIgnoreCase("BETWEEN")) { 427 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 428 if (val.length > 1) { 429 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1])); 430 } 431 } else if (">".equals(op)) { 432 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 433 } else if (">=".equals(op)) { 434 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0])); 435 } else if ("<".equals(op)) { 436 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0])); 437 } else if ("<=".equals(op)) { 438 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0])); 439 } else { 440 filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0])); 441 } 442 } 443 444 if (nbFilters == 0) { 445 return FilterBuilders.matchAllFilter(); 446 } 447 return filterBuilder; 448 } 449 450 public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 451 DocumentModel searchDocumentModel) { 452 SearchRequestBuilder builder = getSearchRequestBuilder(); 453 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 454 FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 455 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 456 return builder; 457 } 458 459 protected boolean isNonNullParam(Object[] val) { 460 if (val == null) { 461 return false; 462 } 463 for (Object v : val) { 464 if (v != null) { 465 if (v instanceof String) { 466 if (!((String) v).isEmpty()) { 467 return true; 468 } 469 } else if (v instanceof String[]) { 470 if (((String[]) v).length > 0) { 471 return true; 472 } 473 } else { 474 return true; 475 } 476 } 477 } 478 return false; 479 } 480 481 public String migrate(final int batchSize) { 482 483 final AuditBackend sourceBackend = new DefaultAuditBackend(); 484 sourceBackend.activate(component); 485 486 final String MIGRATION_WORK_ID = "AuditMigration"; 487 488 WorkManager wm = Framework.getService(WorkManager.class); 489 490 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 491 if (migrationState != null) { 492 return "Migration already scheduled : " + migrationState.toString(); 493 } 494 @SuppressWarnings("unchecked") 495 List<Long> res = (List<Long>) sourceBackend.nativeQuery("select count(*) from LogEntry", 1, 20); 496 497 final long nbEntriesToMigrate = res.get(0); 498 499 Work migrationWork = new AbstractWork(MIGRATION_WORK_ID) { 500 501 private static final long serialVersionUID = 1L; 502 503 @Override 504 public String getTitle() { 505 return "Audit migration worker"; 506 } 507 508 @Override 509 public void work() { 510 TransactionHelper.commitOrRollbackTransaction(); 511 try { 512 513 long t0 = System.currentTimeMillis(); 514 long nbEntriesMigrated = 0; 515 int pageIdx = 0; 516 517 while (nbEntriesMigrated < nbEntriesToMigrate) { 518 @SuppressWarnings("unchecked") 519 List<LogEntry> entries = (List<LogEntry>) sourceBackend.nativeQuery( 520 "from LogEntry log order by log.id asc", pageIdx, batchSize); 521 522 if (entries.size() == 0) { 523 log.warn("Migration ending after " + nbEntriesMigrated + " entries"); 524 break; 525 } 526 setProgress(new Progress(nbEntriesMigrated, nbEntriesToMigrate)); 527 addLogEntries(entries); 528 pageIdx++; 529 nbEntriesMigrated += entries.size(); 530 log.info("Migrated " + nbEntriesMigrated + " log entries on " + nbEntriesToMigrate); 531 double dt = (System.currentTimeMillis() - t0) / 1000.0; 532 if (dt != 0) { 533 log.info("Migration speed: " + (nbEntriesMigrated / dt) + " entries/s"); 534 } 535 } 536 log.info("Audit migration from SQL to Elasticsearch done: " + nbEntriesMigrated 537 + " entries migrated"); 538 539 // Log technical event in audit as a flag to know if the migration has been processed at application 540 // startup 541 AuditLogger logger = Framework.getService(AuditLogger.class); 542 LogEntry entry = logger.newLogEntry(); 543 entry.setCategory("NuxeoTechnicalEvent"); 544 entry.setEventId(MIGRATION_DONE_EVENT); 545 entry.setPrincipalName(SecurityConstants.SYSTEM_USERNAME); 546 entry.setEventDate(Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime()); 547 addLogEntries(Collections.singletonList(entry)); 548 } finally { 549 TransactionHelper.startTransaction(); 550 sourceBackend.deactivate(); 551 } 552 } 553 }; 554 555 wm.schedule(migrationWork); 556 return "Migration work started : " + MIGRATION_WORK_ID; 557 } 558 559 protected void logSearchResponse(SearchResponse response) { 560 if (log.isDebugEnabled()) { 561 log.debug("Response: " + response.toString()); 562 } 563 } 564 565 protected void logSearchRequest(SearchRequestBuilder request) { 566 if (log.isDebugEnabled()) { 567 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 568 IDX_NAME, IDX_TYPE, request.toString())); 569 } 570 } 571 572 @Override 573 public void onApplicationStarted() { 574 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 575 if (!isMigrationDone()) { 576 log.info(String.format( 577 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 578 MIGRATION_FLAG_PROP)); 579 // Drop audit index first in case of a previous bad migration 580 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 581 esa.dropAndInitIndex(IDX_NAME); 582 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 583 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 584 if (batchSizeProp != null) { 585 batchSize = Integer.parseInt(batchSizeProp); 586 } 587 migrate(batchSize); 588 } else { 589 log.warn(String.format( 590 "Property %s is true but migration is already done, please set this property to false", 591 MIGRATION_FLAG_PROP)); 592 } 593 } else { 594 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 595 } 596 } 597 598 /** 599 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 600 */ 601 protected void ensureUIDSequencer(Client esClient) { 602 boolean auditIndexExists = esClient.admin().indices().prepareExists(IDX_NAME).execute().actionGet().isExists(); 603 if (!auditIndexExists) { 604 return; 605 } 606 607 // Get max log entry id 608 SearchRequestBuilder builder = getSearchRequestBuilder(); 609 builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id")); 610 SearchResponse searchResponse = builder.execute().actionGet(); 611 Max agg = searchResponse.getAggregations().get("maxAgg"); 612 int maxLogEntryId = (int) agg.getValue(); 613 614 // Get next sequence id 615 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 616 UIDSequencer seq = uidGeneratorService.getSequencer(); 617 int nextSequenceId = seq.getNext(SEQ_NAME); 618 619 // Increment sequence to max log entry id if needed 620 if (nextSequenceId < maxLogEntryId) { 621 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 622 nextSequenceId, maxLogEntryId)); 623 seq.initSequence(SEQ_NAME, maxLogEntryId); 624 } 625 } 626 627 @Override 628 public ExtendedInfo newExtendedInfo(Serializable value) { 629 return new ExtendedInfo() { 630 631 /** 632 * 633 */ 634 private static final long serialVersionUID = 1L; 635 636 @Override 637 public Long getId() { 638 throw new UnsupportedOperationException(); 639 } 640 641 @Override 642 public void setId(Long id) { 643 throw new UnsupportedOperationException(); 644 } 645 646 @Override 647 public Serializable getSerializableValue() { 648 return value; 649 } 650 651 @Override 652 public <T> T getValue(Class<T> clazz) { 653 return clazz.cast(getSerializableValue()); 654 } 655 656 }; 657 } 658}