001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023
024import java.io.IOException;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Calendar;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.MapUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequestBuilder;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.get.GetResponse;
043import org.elasticsearch.action.search.SearchRequestBuilder;
044import org.elasticsearch.action.search.SearchResponse;
045import org.elasticsearch.action.search.SearchType;
046import org.elasticsearch.client.Client;
047import org.elasticsearch.common.io.stream.BytesStreamOutput;
048import org.elasticsearch.common.unit.TimeValue;
049import org.elasticsearch.common.xcontent.XContentBuilder;
050import org.elasticsearch.index.query.BoolQueryBuilder;
051import org.elasticsearch.index.query.QueryBuilder;
052import org.elasticsearch.index.query.QueryBuilders;
053import org.elasticsearch.index.query.TermQueryBuilder;
054import org.elasticsearch.search.SearchHit;
055import org.elasticsearch.search.aggregations.AggregationBuilders;
056import org.elasticsearch.search.aggregations.metrics.max.Max;
057import org.elasticsearch.search.sort.SortOrder;
058import org.joda.time.DateTime;
059import org.joda.time.format.ISODateTimeFormat;
060import org.nuxeo.common.utils.TextTemplate;
061import org.nuxeo.ecm.core.api.DocumentModel;
062import org.nuxeo.ecm.core.api.NuxeoException;
063import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
064import org.nuxeo.ecm.core.uidgen.UIDSequencer;
065import org.nuxeo.ecm.core.work.api.Work;
066import org.nuxeo.ecm.core.work.api.Work.State;
067import org.nuxeo.ecm.core.work.api.WorkManager;
068import org.nuxeo.ecm.platform.audit.api.AuditReader;
069import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
070import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
071import org.nuxeo.ecm.platform.audit.api.LogEntry;
072import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException;
073import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser;
074import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
075import org.nuxeo.ecm.platform.audit.service.AuditBackend;
076import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
077import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
078import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
079import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
080import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
081import org.nuxeo.elasticsearch.ElasticSearchConstants;
082import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
083import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
084import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter;
085import org.nuxeo.runtime.api.Framework;
086import org.nuxeo.runtime.model.DefaultComponent;
087
088/**
089 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
090 *
091 * @author tiry
092 */
093public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
094
095    public static final String SEQ_NAME = "audit";
096
097    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
098
099    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
100
101    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
102
103    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
104
105    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
106        super(component, config);
107    }
108
109    protected Client esClient;
110
111    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
112
113    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
114
115        @Override
116        public int removeEntries(String eventId, String pathPattern) {
117            throw new UnsupportedOperationException("Not implemented yet!");
118        }
119
120        @Override
121        public void addLogEntry(LogEntry logEntry) {
122            List<LogEntry> entries = new ArrayList<>();
123            entries.add(logEntry);
124            addLogEntries(entries);
125        }
126
127        @Override
128        public List<LogEntry> getLogEntriesFor(String uuid, String repositoryId) {
129            throw new UnsupportedOperationException("Not implemented yet!");
130        }
131
132        @Override
133        public List<LogEntry> getLogEntriesFor(String uuid) {
134            throw new UnsupportedOperationException("Not implemented yet!");
135        }
136
137        @Override
138        public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap,
139                boolean doDefaultSort) {
140            throw new UnsupportedOperationException("Not implemented yet!");
141        }
142    };
143
144    protected Client getClient() {
145        log.info("Activate Elasticsearch backend for Audit");
146        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
147        Client client = esa.getClient();
148        ensureUIDSequencer(client);
149        return client;
150    }
151
152    protected boolean isMigrationDone() {
153        AuditReader reader = Framework.getService(AuditReader.class);
154        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
155        return !entries.isEmpty();
156    }
157
158    @Override
159    public int getApplicationStartedOrder() {
160        int elasticOrder = ((DefaultComponent) Framework.getRuntime()
161                                            .getComponent("org.nuxeo.elasticsearch.ElasticSearchComponent"))
162                                                                                                            .getApplicationStartedOrder();
163        int uidgenOrder = ((DefaultComponent) Framework.getRuntime()
164                .getComponent("org.nuxeo.ecm.core.uidgen.UIDGeneratorService"))
165                .getApplicationStartedOrder();
166        return Integer.max(elasticOrder, uidgenOrder) + 1;
167    }
168
169    @Override
170    public void onApplicationStarted() {
171        esClient = getClient();
172        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
173            if (!isMigrationDone()) {
174                log.info(String.format(
175                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
176                        MIGRATION_FLAG_PROP));
177                // Drop audit index first in case of a previous bad migration
178                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
179                esa.dropAndInitIndex(getESIndexName());
180                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
181                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
182                if (batchSizeProp != null) {
183                    batchSize = Integer.parseInt(batchSizeProp);
184                }
185                migrate(batchSize);
186            } else {
187                log.warn(String.format(
188                        "Property %s is true but migration is already done, please set this property to false",
189                        MIGRATION_FLAG_PROP));
190            }
191        } else {
192            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
193        }
194    }
195
196    @Override
197    public void onApplicationStopped() {
198        if (esClient == null) {
199            return;
200        }
201        try {
202            esClient.close();
203        } finally {
204            esClient = null;
205        }
206    }
207
208    @Override
209    public List<LogEntry> getLogEntriesFor(String uuid, String repositoryId) {
210        TermQueryBuilder docFilter = QueryBuilders.termQuery("docUUID", uuid);
211        TermQueryBuilder repoFilter = QueryBuilders.termQuery("repositoryId", repositoryId);
212        QueryBuilder filter;
213        filter = QueryBuilders.boolQuery().must(docFilter);
214        filter = QueryBuilders.boolQuery().must(repoFilter);
215        return getLogEntries(filter, false);
216    }
217
218    @Override
219    public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) {
220        TermQueryBuilder docFilter = QueryBuilders.termQuery("docUUID", uuid);
221        QueryBuilder filter;
222        if (MapUtils.isEmpty(filterMap)) {
223            filter = docFilter;
224        } else {
225            filter = QueryBuilders.boolQuery().must(docFilter);
226            for (String key : filterMap.keySet()) {
227                FilterMapEntry entry = filterMap.get(key);
228                ((BoolQueryBuilder) filter).must(QueryBuilders.termQuery(entry.getColumnName(), entry.getObject()));
229            }
230        }
231        return getLogEntries(filter, doDefaultSort);
232    }
233
234    protected List<LogEntry> getLogEntries(QueryBuilder filter, boolean doDefaultSort) {
235        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
236        if (doDefaultSort) {
237            builder.addSort("eventDate", SortOrder.DESC);
238        }
239        TimeValue keepAlive = TimeValue.timeValueMinutes(1);
240        builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setScroll(keepAlive).setSize(100);
241
242        logSearchRequest(builder);
243        SearchResponse searchResponse = builder.get();
244        logSearchResponse(searchResponse);
245
246        // Build log entries
247        List<LogEntry> logEntries = buildLogEntries(searchResponse);
248        // Scroll on next results
249        for (; //
250                searchResponse.getHits().getHits().length > 0
251                        && logEntries.size() < searchResponse.getHits().getTotalHits(); //
252                searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
253            // Build log entries
254            logEntries.addAll(buildLogEntries(searchResponse));
255        }
256        return logEntries;
257    }
258
259    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
260        if (log.isDebugEnabled()) {
261            log.debug(String.format(
262                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
263                    keepAlive, scrollId));
264        }
265        SearchResponse response = esClient.prepareSearchScroll(scrollId).setScroll(keepAlive).execute().actionGet();
266        logSearchResponse(response);
267        return response;
268    }
269
270    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
271        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
272        for (SearchHit hit : searchResponse.getHits()) {
273            try {
274                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
275            } catch (IOException e) {
276                log.error("Error while reading Audit Entry from ES", e);
277            }
278        }
279        return entries;
280    }
281
282    protected SearchRequestBuilder getSearchRequestBuilder(Client esClient) {
283        return esClient.prepareSearch(getESIndexName())
284                       .setTypes(ElasticSearchConstants.ENTRY_TYPE)
285                       .setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
286    }
287
288    @Override
289    public LogEntry getLogEntryByID(long id) {
290        GetResponse ret = esClient.prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id))
291                                  .get();
292        if (!ret.isExists()) {
293            return null;
294        }
295        try {
296            return AuditEntryJSONReader.read(ret.getSourceAsString());
297        } catch (IOException e) {
298            throw new RuntimeException("Unable to read Entry for id " + id, e);
299        }
300    }
301
302    public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) {
303        if (params != null && params.size() > 0) {
304            query = expandQueryVariables(query, params);
305        }
306        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
307        builder.setQuery(query);
308        return builder;
309    }
310
311    public String expandQueryVariables(String query, Object[] params) {
312        Map<String, Object> qParams = new HashMap<>();
313        for (int i = 0; i < params.length; i++) {
314            query = query.replaceFirst("\\?", "\\${param" + i + "}");
315            qParams.put("param" + i, params[i]);
316        }
317        return expandQueryVariables(query, qParams);
318    }
319
320    public String expandQueryVariables(String query, Map<String, Object> params) {
321        if (params != null && params.size() > 0) {
322            TextTemplate tmpl = new TextTemplate();
323            for (String key : params.keySet()) {
324                Object val = params.get(key);
325                if (val == null) {
326                    continue;
327                } else if (val instanceof Calendar) {
328                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
329                } else if (val instanceof Date) {
330                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
331                } else {
332                    tmpl.setVariable(key, val.toString());
333                }
334            }
335            query = tmpl.processText(query);
336        }
337        return query;
338    }
339
340    @Override
341    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
342        SearchRequestBuilder builder = buildQuery(query, params);
343        if (pageNb > 0) {
344            builder.setFrom(pageNb * pageSize);
345        }
346        if (pageSize > 0) {
347            builder.setSize(pageSize);
348        }
349        logSearchRequest(builder);
350        SearchResponse searchResponse = builder.get();
351        logSearchResponse(searchResponse);
352        return buildLogEntries(searchResponse);
353    }
354
355    @Override
356    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
357            int pageSize) {
358        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
359        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
360        if (eventIds != null && eventIds.length > 0) {
361            if (eventIds.length == 1) {
362                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
363            } else {
364                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
365            }
366        }
367        if (categories != null && categories.length > 0) {
368            if (categories.length == 1) {
369                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
370            } else {
371                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
372            }
373        }
374        if (path != null) {
375            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
376        }
377
378        if (limit != null) {
379            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(limit));
380        }
381
382        builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder));
383
384        if (pageNb > 0) {
385            builder.setFrom(pageNb * pageSize);
386        }
387        if (pageSize > 0) {
388            builder.setSize(pageSize);
389        }
390        logSearchRequest(builder);
391        SearchResponse searchResponse = builder.get();
392        logSearchResponse(searchResponse);
393        return buildLogEntries(searchResponse);
394    }
395
396    @Override
397    public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path,
398            int pageNb, int pageSize) {
399
400        Date limit = null;
401        if (dateRange != null) {
402            try {
403                limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange);
404            } catch (AuditQueryException aqe) {
405                aqe.addInfo("Wrong date range query. Query was " + dateRange);
406                throw aqe;
407            }
408        }
409        return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize);
410    }
411
412    @Override
413    public void addLogEntries(List<LogEntry> entries) {
414
415        if (entries.isEmpty()) {
416            return;
417        }
418
419        BulkRequestBuilder bulkRequest = esClient.prepareBulk();
420        JsonFactory factory = new JsonFactory();
421
422        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
423        UIDSequencer seq = uidGeneratorService.getSequencer();
424
425        try {
426
427            for (LogEntry entry : entries) {
428                entry.setId(seq.getNext(SEQ_NAME));
429                if (log.isDebugEnabled()) {
430                    log.debug(String.format("Indexing log entry: %s", entry));
431                }
432                OutputStream out = new BytesStreamOutput();
433                JsonGenerator jsonGen = factory.createJsonGenerator(out);
434                XContentBuilder builder = jsonBuilder(out);
435                AuditEntryJSONWriter.asJSON(jsonGen, entry);
436                bulkRequest.add(esClient.prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
437                        String.valueOf(entry.getId())).setSource(builder));
438            }
439
440            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
441            if (bulkResponse.hasFailures()) {
442                for (BulkItemResponse response : bulkResponse.getItems()) {
443                    if (response.isFailed()) {
444                        log.error("Unable to index audit entry " + response.getItemId() + " :"
445                                + response.getFailureMessage());
446                    }
447                }
448            }
449        } catch (IOException e) {
450            throw new NuxeoException("Error while indexing Audit entries", e);
451        }
452
453    }
454
455    @Override
456    public Long getEventsCount(String eventId) {
457        SearchResponse res = esClient.prepareSearch(getESIndexName())
458                                     .setTypes(ElasticSearchConstants.ENTRY_TYPE)
459                                     .setQuery(QueryBuilders.constantScoreQuery(
460                                             QueryBuilders.termQuery("eventId", eventId)))
461                                     .setSize(0)
462                                     .get();
463        return res.getHits().getTotalHits();
464    }
465
466    @Override
467    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
468        return syncLogCreationEntries(provider, repoId, path, recurs);
469    }
470
471    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
472
473        if (searchDocumentModel == null) {
474            return QueryBuilders.matchAllQuery();
475        }
476
477        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
478
479        int nbFilters = 0;
480
481        for (PredicateDefinition predicate : predicates) {
482
483            // extract data from DocumentModel
484            PredicateFieldDefinition[] fieldDef = predicate.getValues();
485            Object[] val = new Object[fieldDef.length];
486            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
487                if (fieldDef[fidx].getXpath() != null) {
488                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
489                } else {
490                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
491                }
492            }
493
494            if (!isNonNullParam(val)) {
495                // skip predicate where all values are null
496                continue;
497            }
498
499            nbFilters++;
500
501            String op = predicate.getOperator();
502            if (op.equalsIgnoreCase("IN")) {
503
504                String[] values = null;
505                if (val[0] instanceof Iterable<?>) {
506                    List<String> l = new ArrayList<>();
507                    Iterable<?> vals = (Iterable<?>) val[0];
508
509                    for (Object v : vals) {
510                        if (v != null) {
511                            l.add(v.toString());
512                        }
513                    }
514                    values = l.toArray(new String[l.size()]);
515                } else if (val[0] instanceof Object[]) {
516                    values = (String[]) val[0];
517                }
518                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
519            } else if (op.equalsIgnoreCase("BETWEEN")) {
520                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(val[0]));
521                if (val.length > 1) {
522                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(val[1]));
523                }
524            } else if (">".equals(op)) {
525                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(val[0]));
526            } else if (">=".equals(op)) {
527                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(val[0]));
528            } else if ("<".equals(op)) {
529                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(val[0]));
530            } else if ("<=".equals(op)) {
531                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(val[0]));
532            } else {
533                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), val[0]));
534            }
535        }
536
537        if (nbFilters == 0) {
538            return QueryBuilders.matchAllQuery();
539        }
540        return filterBuilder;
541    }
542
543    public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
544            DocumentModel searchDocumentModel) {
545        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
546        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
547        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
548        builder.setQuery(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder));
549        return builder;
550    }
551
552    protected boolean isNonNullParam(Object[] val) {
553        if (val == null) {
554            return false;
555        }
556        for (Object v : val) {
557            if (v != null) {
558                if (v instanceof String) {
559                    if (!((String) v).isEmpty()) {
560                        return true;
561                    }
562                } else if (v instanceof String[]) {
563                    if (((String[]) v).length > 0) {
564                        return true;
565                    }
566                } else {
567                    return true;
568                }
569            }
570        }
571        return false;
572    }
573
574    @SuppressWarnings("deprecation")
575    public String migrate(final int batchSize) {
576
577        final String MIGRATION_WORK_ID = "AuditMigration";
578
579        WorkManager wm = Framework.getService(WorkManager.class);
580        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
581        if (migrationState != null) {
582            return "Migration already scheduled : " + migrationState.toString();
583        }
584
585        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
586        wm.schedule(migrationWork);
587        return "Migration work started : " + MIGRATION_WORK_ID;
588    }
589
590    protected void logSearchResponse(SearchResponse response) {
591        if (log.isDebugEnabled()) {
592            log.debug("Response: " + response.toString());
593        }
594    }
595
596    protected void logSearchRequest(SearchRequestBuilder request) {
597        if (log.isDebugEnabled()) {
598            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
599                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
600        }
601    }
602
603    /**
604     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
605     */
606    protected void ensureUIDSequencer(Client esClient) {
607        boolean auditIndexExists = esClient.admin()
608                                           .indices()
609                                           .prepareExists(getESIndexName())
610                                           .execute()
611                                           .actionGet()
612                                           .isExists();
613        if (!auditIndexExists) {
614            return;
615        }
616
617        // Get max log entry id
618        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
619        builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id"));
620        SearchResponse searchResponse = builder.execute().actionGet();
621        Max agg = searchResponse.getAggregations().get("maxAgg");
622        int maxLogEntryId = (int) agg.getValue();
623
624        // Get next sequence id
625        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
626        UIDSequencer seq = uidGeneratorService.getSequencer();
627        seq.init();
628        int nextSequenceId = seq.getNext(SEQ_NAME);
629
630        // Increment sequence to max log entry id if needed
631        if (nextSequenceId < maxLogEntryId) {
632            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
633                    nextSequenceId, maxLogEntryId));
634            seq.initSequence(SEQ_NAME, maxLogEntryId);
635        }
636    }
637
638    @Override
639    public ExtendedInfo newExtendedInfo(Serializable value) {
640        return new ESExtendedInfo(value);
641    }
642
643    protected String getESIndexName() {
644        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
645        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
646    }
647
648}