001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023
024import java.io.IOException;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Calendar;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.MapUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequestBuilder;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.get.GetResponse;
043import org.elasticsearch.action.search.SearchRequestBuilder;
044import org.elasticsearch.action.search.SearchResponse;
045import org.elasticsearch.action.search.SearchType;
046import org.elasticsearch.client.Client;
047import org.elasticsearch.common.io.stream.BytesStreamOutput;
048import org.elasticsearch.common.unit.TimeValue;
049import org.elasticsearch.common.xcontent.XContentBuilder;
050import org.elasticsearch.index.query.BoolQueryBuilder;
051import org.elasticsearch.index.query.QueryBuilder;
052import org.elasticsearch.index.query.QueryBuilders;
053import org.elasticsearch.index.query.TermQueryBuilder;
054import org.elasticsearch.search.SearchHit;
055import org.elasticsearch.search.aggregations.AggregationBuilders;
056import org.elasticsearch.search.aggregations.metrics.max.Max;
057import org.elasticsearch.search.sort.SortOrder;
058import org.joda.time.DateTime;
059import org.joda.time.format.ISODateTimeFormat;
060import org.nuxeo.common.utils.TextTemplate;
061import org.nuxeo.ecm.core.api.DocumentModel;
062import org.nuxeo.ecm.core.api.NuxeoException;
063import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
064import org.nuxeo.ecm.core.uidgen.UIDSequencer;
065import org.nuxeo.ecm.core.work.api.Work;
066import org.nuxeo.ecm.core.work.api.Work.State;
067import org.nuxeo.ecm.core.work.api.WorkManager;
068import org.nuxeo.ecm.platform.audit.api.AuditReader;
069import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
070import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
071import org.nuxeo.ecm.platform.audit.api.LogEntry;
072import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException;
073import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser;
074import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
075import org.nuxeo.ecm.platform.audit.service.AuditBackend;
076import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
077import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
078import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
079import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
080import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
081import org.nuxeo.elasticsearch.ElasticSearchConstants;
082import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
083import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
084import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter;
085import org.nuxeo.runtime.api.Framework;
086import org.nuxeo.runtime.model.DefaultComponent;
087
088/**
089 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
090 *
091 * @author tiry
092 */
093public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
094
095    public static final String SEQ_NAME = "audit";
096
097    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
098
099    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
100
101    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
102
103    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
104
105    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
106        super(component, config);
107    }
108
109    protected Client esClient;
110
111    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
112
113    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
114
115        @Override
116        public int removeEntries(String eventId, String pathPattern) {
117            throw new UnsupportedOperationException("Not implemented yet!");
118        }
119
120        @Override
121        public void addLogEntry(LogEntry logEntry) {
122            List<LogEntry> entries = new ArrayList<>();
123            entries.add(logEntry);
124            addLogEntries(entries);
125        }
126
127        @Override
128        public List<LogEntry> getLogEntriesFor(String uuid, String repositoryId) {
129            throw new UnsupportedOperationException("Not implemented yet!");
130        }
131
132        @Override
133        public List<LogEntry> getLogEntriesFor(String uuid) {
134            throw new UnsupportedOperationException("Not implemented yet!");
135        }
136
137        @Override
138        public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap,
139                boolean doDefaultSort) {
140            throw new UnsupportedOperationException("Not implemented yet!");
141        }
142    };
143
144    protected Client getClient() {
145        log.info("Activate Elasticsearch backend for Audit");
146        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
147        Client client = esa.getClient();
148        ensureUIDSequencer(client);
149        return client;
150    }
151
152    protected boolean isMigrationDone() {
153        AuditReader reader = Framework.getService(AuditReader.class);
154        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
155        return !entries.isEmpty();
156    }
157
158    @Override
159    public int getApplicationStartedOrder() {
160        return ((DefaultComponent) Framework.getRuntime()
161                                            .getComponent("org.nuxeo.elasticsearch.ElasticSearchComponent"))
162                                                                                                            .getApplicationStartedOrder()
163                + 1;
164    }
165
166    @Override
167    public void onApplicationStarted() {
168        esClient = getClient();
169        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
170            if (!isMigrationDone()) {
171                log.info(String.format(
172                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
173                        MIGRATION_FLAG_PROP));
174                // Drop audit index first in case of a previous bad migration
175                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
176                esa.dropAndInitIndex(getESIndexName());
177                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
178                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
179                if (batchSizeProp != null) {
180                    batchSize = Integer.parseInt(batchSizeProp);
181                }
182                migrate(batchSize);
183            } else {
184                log.warn(String.format(
185                        "Property %s is true but migration is already done, please set this property to false",
186                        MIGRATION_FLAG_PROP));
187            }
188        } else {
189            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
190        }
191    }
192
193    @Override
194    public void onShutdown() {
195        if (esClient != null) {
196            try {
197                esClient.close();
198            } finally {
199                esClient = null;
200            }
201        }
202    }
203
204    @Override
205    public List<LogEntry> getLogEntriesFor(String uuid, String repositoryId) {
206        TermQueryBuilder docFilter = QueryBuilders.termQuery("docUUID", uuid);
207        TermQueryBuilder repoFilter = QueryBuilders.termQuery("repositoryId", repositoryId);
208        QueryBuilder filter;
209        filter = QueryBuilders.boolQuery().must(docFilter);
210        filter = QueryBuilders.boolQuery().must(repoFilter);
211        return getLogEntries(filter, false);
212    }
213
214    @Override
215    public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) {
216        TermQueryBuilder docFilter = QueryBuilders.termQuery("docUUID", uuid);
217        QueryBuilder filter;
218        if (MapUtils.isEmpty(filterMap)) {
219            filter = docFilter;
220        } else {
221            filter = QueryBuilders.boolQuery().must(docFilter);
222            for (String key : filterMap.keySet()) {
223                FilterMapEntry entry = filterMap.get(key);
224                ((BoolQueryBuilder) filter).must(QueryBuilders.termQuery(entry.getColumnName(), entry.getObject()));
225            }
226        }
227        return getLogEntries(filter, doDefaultSort);
228    }
229
230    protected List<LogEntry> getLogEntries(QueryBuilder filter, boolean doDefaultSort) {
231        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
232        if (doDefaultSort) {
233            builder.addSort("eventDate", SortOrder.DESC);
234        }
235        TimeValue keepAlive = TimeValue.timeValueMinutes(1);
236        builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setScroll(keepAlive).setSize(100);
237
238        logSearchRequest(builder);
239        SearchResponse searchResponse = builder.get();
240        logSearchResponse(searchResponse);
241
242        // Build log entries
243        List<LogEntry> logEntries = buildLogEntries(searchResponse);
244        // Scroll on next results
245        for (; //
246                searchResponse.getHits().getHits().length > 0
247                        && logEntries.size() < searchResponse.getHits().getTotalHits(); //
248                searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
249            // Build log entries
250            logEntries.addAll(buildLogEntries(searchResponse));
251        }
252        return logEntries;
253    }
254
255    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
256        if (log.isDebugEnabled()) {
257            log.debug(String.format(
258                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
259                    keepAlive, scrollId));
260        }
261        SearchResponse response = esClient.prepareSearchScroll(scrollId).setScroll(keepAlive).execute().actionGet();
262        logSearchResponse(response);
263        return response;
264    }
265
266    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
267        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
268        for (SearchHit hit : searchResponse.getHits()) {
269            try {
270                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
271            } catch (IOException e) {
272                log.error("Error while reading Audit Entry from ES", e);
273            }
274        }
275        return entries;
276    }
277
278    protected SearchRequestBuilder getSearchRequestBuilder(Client esClient) {
279        return esClient.prepareSearch(getESIndexName())
280                       .setTypes(ElasticSearchConstants.ENTRY_TYPE)
281                       .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
282    }
283
284    @Override
285    public LogEntry getLogEntryByID(long id) {
286        GetResponse ret = esClient.prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id))
287                                  .get();
288        if (!ret.isExists()) {
289            return null;
290        }
291        try {
292            return AuditEntryJSONReader.read(ret.getSourceAsString());
293        } catch (IOException e) {
294            throw new RuntimeException("Unable to read Entry for id " + id, e);
295        }
296    }
297
298    public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) {
299        if (params != null && params.size() > 0) {
300            query = expandQueryVariables(query, params);
301        }
302        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
303        builder.setQuery(query);
304        return builder;
305    }
306
307    public String expandQueryVariables(String query, Object[] params) {
308        Map<String, Object> qParams = new HashMap<>();
309        for (int i = 0; i < params.length; i++) {
310            query = query.replaceFirst("\\?", "\\${param" + i + "}");
311            qParams.put("param" + i, params[i]);
312        }
313        return expandQueryVariables(query, qParams);
314    }
315
316    public String expandQueryVariables(String query, Map<String, Object> params) {
317        if (params != null && params.size() > 0) {
318            TextTemplate tmpl = new TextTemplate();
319            for (String key : params.keySet()) {
320                Object val = params.get(key);
321                if (val == null) {
322                    continue;
323                } else if (val instanceof Calendar) {
324                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
325                } else if (val instanceof Date) {
326                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
327                } else {
328                    tmpl.setVariable(key, val.toString());
329                }
330            }
331            query = tmpl.processText(query);
332        }
333        return query;
334    }
335
336    @Override
337    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
338        SearchRequestBuilder builder = buildQuery(query, params);
339        if (pageNb > 0) {
340            builder.setFrom(pageNb * pageSize);
341        }
342        if (pageSize > 0) {
343            builder.setSize(pageSize);
344        }
345        logSearchRequest(builder);
346        SearchResponse searchResponse = builder.get();
347        logSearchResponse(searchResponse);
348        return buildLogEntries(searchResponse);
349    }
350
351    @Override
352    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
353            int pageSize) {
354        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
355        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
356        if (eventIds != null && eventIds.length > 0) {
357            if (eventIds.length == 1) {
358                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
359            } else {
360                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
361            }
362        }
363        if (categories != null && categories.length > 0) {
364            if (categories.length == 1) {
365                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
366            } else {
367                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
368            }
369        }
370        if (path != null) {
371            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
372        }
373
374        if (limit != null) {
375            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(limit));
376        }
377
378        builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder));
379
380        if (pageNb > 0) {
381            builder.setFrom(pageNb * pageSize);
382        }
383        if (pageSize > 0) {
384            builder.setSize(pageSize);
385        }
386        logSearchRequest(builder);
387        SearchResponse searchResponse = builder.get();
388        logSearchResponse(searchResponse);
389        return buildLogEntries(searchResponse);
390    }
391
392    @Override
393    public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path,
394            int pageNb, int pageSize) {
395
396        Date limit = null;
397        if (dateRange != null) {
398            try {
399                limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange);
400            } catch (AuditQueryException aqe) {
401                aqe.addInfo("Wrong date range query. Query was " + dateRange);
402                throw aqe;
403            }
404        }
405        return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize);
406    }
407
408    @Override
409    public void addLogEntries(List<LogEntry> entries) {
410
411        if (entries.isEmpty()) {
412            return;
413        }
414
415        BulkRequestBuilder bulkRequest = esClient.prepareBulk();
416        JsonFactory factory = new JsonFactory();
417
418        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
419        UIDSequencer seq = uidGeneratorService.getSequencer();
420
421        try {
422
423            for (LogEntry entry : entries) {
424                entry.setId(seq.getNext(SEQ_NAME));
425                if (log.isDebugEnabled()) {
426                    log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ",
427                            entry.getId(), entry.getLogDate(), entry.getDocUUID()));
428                }
429                OutputStream out = new BytesStreamOutput();
430                JsonGenerator jsonGen = factory.createJsonGenerator(out);
431                XContentBuilder builder = jsonBuilder(out);
432                AuditEntryJSONWriter.asJSON(jsonGen, entry);
433                bulkRequest.add(esClient.prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
434                        String.valueOf(entry.getId())).setSource(builder));
435            }
436
437            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
438            if (bulkResponse.hasFailures()) {
439                for (BulkItemResponse response : bulkResponse.getItems()) {
440                    if (response.isFailed()) {
441                        log.error("Unable to index audit entry " + response.getItemId() + " :"
442                                + response.getFailureMessage());
443                    }
444                }
445            }
446        } catch (IOException e) {
447            throw new NuxeoException("Error while indexing Audit entries", e);
448        }
449
450    }
451
452    @Override
453    public Long getEventsCount(String eventId) {
454        SearchResponse res = esClient.prepareSearch(getESIndexName())
455                                     .setTypes(ElasticSearchConstants.ENTRY_TYPE)
456                                     .setQuery(QueryBuilders.constantScoreQuery(
457                                             QueryBuilders.termQuery("eventId", eventId)))
458                                     .setSize(0)
459                                     .get();
460        return res.getHits().getTotalHits();
461    }
462
463    @Override
464    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
465        return syncLogCreationEntries(provider, repoId, path, recurs);
466    }
467
468    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
469
470        if (searchDocumentModel == null) {
471            return QueryBuilders.matchAllQuery();
472        }
473
474        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
475
476        int nbFilters = 0;
477
478        for (PredicateDefinition predicate : predicates) {
479
480            // extract data from DocumentModel
481            PredicateFieldDefinition[] fieldDef = predicate.getValues();
482            Object[] val = new Object[fieldDef.length];
483            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
484                if (fieldDef[fidx].getXpath() != null) {
485                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
486                } else {
487                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
488                }
489            }
490
491            if (!isNonNullParam(val)) {
492                // skip predicate where all values are null
493                continue;
494            }
495
496            nbFilters++;
497
498            String op = predicate.getOperator();
499            if (op.equalsIgnoreCase("IN")) {
500
501                String[] values = null;
502                if (val[0] instanceof Iterable<?>) {
503                    List<String> l = new ArrayList<>();
504                    Iterable<?> vals = (Iterable<?>) val[0];
505
506                    for (Object v : vals) {
507                        if (v != null) {
508                            l.add(v.toString());
509                        }
510                    }
511                    values = l.toArray(new String[l.size()]);
512                } else if (val[0] instanceof Object[]) {
513                    values = (String[]) val[0];
514                }
515                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
516            } else if (op.equalsIgnoreCase("BETWEEN")) {
517                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(val[0]));
518                if (val.length > 1) {
519                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(val[1]));
520                }
521            } else if (">".equals(op)) {
522                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(val[0]));
523            } else if (">=".equals(op)) {
524                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(val[0]));
525            } else if ("<".equals(op)) {
526                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(val[0]));
527            } else if ("<=".equals(op)) {
528                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(val[0]));
529            } else {
530                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), val[0]));
531            }
532        }
533
534        if (nbFilters == 0) {
535            return QueryBuilders.matchAllQuery();
536        }
537        return filterBuilder;
538    }
539
540    public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
541            DocumentModel searchDocumentModel) {
542        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
543        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
544        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
545        builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
546        return builder;
547    }
548
549    protected boolean isNonNullParam(Object[] val) {
550        if (val == null) {
551            return false;
552        }
553        for (Object v : val) {
554            if (v != null) {
555                if (v instanceof String) {
556                    if (!((String) v).isEmpty()) {
557                        return true;
558                    }
559                } else if (v instanceof String[]) {
560                    if (((String[]) v).length > 0) {
561                        return true;
562                    }
563                } else {
564                    return true;
565                }
566            }
567        }
568        return false;
569    }
570
571    @SuppressWarnings("deprecation")
572    public String migrate(final int batchSize) {
573
574        final String MIGRATION_WORK_ID = "AuditMigration";
575
576        WorkManager wm = Framework.getService(WorkManager.class);
577        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
578        if (migrationState != null) {
579            return "Migration already scheduled : " + migrationState.toString();
580        }
581
582        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
583        wm.schedule(migrationWork);
584        return "Migration work started : " + MIGRATION_WORK_ID;
585    }
586
587    protected void logSearchResponse(SearchResponse response) {
588        if (log.isDebugEnabled()) {
589            log.debug("Response: " + response.toString());
590        }
591    }
592
593    protected void logSearchRequest(SearchRequestBuilder request) {
594        if (log.isDebugEnabled()) {
595            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
596                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
597        }
598    }
599
600    /**
601     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
602     */
603    protected void ensureUIDSequencer(Client esClient) {
604        boolean auditIndexExists = esClient.admin()
605                                           .indices()
606                                           .prepareExists(getESIndexName())
607                                           .execute()
608                                           .actionGet()
609                                           .isExists();
610        if (!auditIndexExists) {
611            return;
612        }
613
614        // Get max log entry id
615        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
616        builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id"));
617        SearchResponse searchResponse = builder.execute().actionGet();
618        Max agg = searchResponse.getAggregations().get("maxAgg");
619        int maxLogEntryId = (int) agg.getValue();
620
621        // Get next sequence id
622        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
623        UIDSequencer seq = uidGeneratorService.getSequencer();
624        int nextSequenceId = seq.getNext(SEQ_NAME);
625
626        // Increment sequence to max log entry id if needed
627        if (nextSequenceId < maxLogEntryId) {
628            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
629                    nextSequenceId, maxLogEntryId));
630            seq.initSequence(SEQ_NAME, maxLogEntryId);
631        }
632    }
633
634    @Override
635    public ExtendedInfo newExtendedInfo(Serializable value) {
636        return new ESExtendedInfo(value);
637    }
638
639    protected String getESIndexName() {
640        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
641        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
642    }
643
644}