001/*
002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     Benoit Delbosc
017 */
018package org.nuxeo.elasticsearch.audit;
019
020import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
021
022import java.io.IOException;
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.Calendar;
026import java.util.Collections;
027import java.util.Date;
028import java.util.HashMap;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.TimeZone;
033
034import org.apache.commons.collections.MapUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequestBuilder;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.count.CountResponse;
043import org.elasticsearch.action.get.GetResponse;
044import org.elasticsearch.action.search.SearchRequestBuilder;
045import org.elasticsearch.action.search.SearchResponse;
046import org.elasticsearch.action.search.SearchType;
047import org.elasticsearch.client.Client;
048import org.elasticsearch.common.xcontent.XContentBuilder;
049import org.elasticsearch.index.query.BoolFilterBuilder;
050import org.elasticsearch.index.query.FilterBuilder;
051import org.elasticsearch.index.query.FilterBuilders;
052import org.elasticsearch.index.query.QueryBuilder;
053import org.elasticsearch.index.query.QueryBuilders;
054import org.elasticsearch.index.query.TermFilterBuilder;
055import org.elasticsearch.search.SearchHit;
056import org.elasticsearch.search.aggregations.AggregationBuilders;
057import org.elasticsearch.search.aggregations.metrics.max.Max;
058import org.elasticsearch.search.sort.SortOrder;
059import org.joda.time.DateTime;
060import org.joda.time.format.ISODateTimeFormat;
061import org.nuxeo.common.utils.TextTemplate;
062import org.nuxeo.ecm.core.api.DocumentModel;
063import org.nuxeo.ecm.core.api.NuxeoException;
064import org.nuxeo.ecm.core.api.security.SecurityConstants;
065import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
066import org.nuxeo.ecm.core.uidgen.UIDSequencer;
067import org.nuxeo.ecm.core.work.AbstractWork;
068import org.nuxeo.ecm.core.work.api.Work;
069import org.nuxeo.ecm.core.work.api.Work.State;
070import org.nuxeo.ecm.core.work.api.WorkManager;
071import org.nuxeo.ecm.platform.audit.api.AuditLogger;
072import org.nuxeo.ecm.platform.audit.api.AuditReader;
073import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
074import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
075import org.nuxeo.ecm.platform.audit.api.LogEntry;
076import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException;
077import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser;
078import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
079import org.nuxeo.ecm.platform.audit.service.AuditBackend;
080import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
081import org.nuxeo.ecm.platform.audit.service.DefaultAuditBackend;
082import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
083import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
084import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
085import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
086import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter;
087import org.nuxeo.runtime.api.Framework;
088import org.nuxeo.runtime.transaction.TransactionHelper;
089
090/**
091 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
092 *
093 * @author tiry
094 */
095public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
096
097    public static final String IDX_NAME = "audit";
098
099    public static final String IDX_TYPE = "entry";
100
101    public static final String SEQ_NAME = "audit";
102
103    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
104
105    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
106
107    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
108
109    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
110
111    protected Client esClient = null;
112
113    @SuppressWarnings("hiding")
114    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
115
116    protected BaseLogEntryProvider provider = null;
117
118    protected Client getClient() {
119        if (esClient == null) {
120            log.info("Activate Elasticsearch backend for Audit");
121            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
122            esClient = esa.getClient();
123            ensureUIDSequencer(esClient);
124        }
125        return esClient;
126    }
127
128    protected boolean isMigrationDone() {
129        AuditReader reader = Framework.getService(AuditReader.class);
130        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
131        return !entries.isEmpty();
132    }
133
134    @Override
135    public void deactivate() {
136        if (esClient != null) {
137            esClient.close();
138        }
139    }
140
141    @Override
142    public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) {
143        SearchRequestBuilder builder = getSearchRequestBuilder();
144        TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid);
145        FilterBuilder filter;
146        if (MapUtils.isEmpty(filterMap)) {
147            filter = docFilter;
148        } else {
149            filter = FilterBuilders.boolFilter();
150            ((BoolFilterBuilder) filter).must(docFilter);
151            for (String key : filterMap.keySet()) {
152                FilterMapEntry entry = filterMap.get(key);
153                ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject()));
154            }
155        }
156        builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE);
157        if (doDefaultSort) {
158            builder.addSort("eventDate", SortOrder.DESC);
159        }
160        logSearchRequest(builder);
161        SearchResponse searchResponse = builder.get();
162        logSearchResponse(searchResponse);
163        return buildLogEntries(searchResponse);
164    }
165
166    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
167        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
168        for (SearchHit hit : searchResponse.getHits()) {
169            try {
170                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
171            } catch (IOException e) {
172                log.error("Error while reading Audit Entry from ES", e);
173            }
174        }
175        return entries;
176    }
177
178    protected SearchRequestBuilder getSearchRequestBuilder() {
179        return getClient().prepareSearch(IDX_NAME).setTypes(IDX_TYPE).setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
180    }
181
182    @Override
183    public LogEntry getLogEntryByID(long id) {
184        GetResponse ret = getClient().prepareGet(IDX_NAME, IDX_TYPE, String.valueOf(id)).get();
185        if (!ret.isExists()) {
186            return null;
187        }
188        try {
189            return AuditEntryJSONReader.read(ret.getSourceAsString());
190        } catch (IOException e) {
191            throw new RuntimeException("Unable to read Entry for id " + id, e);
192        }
193    }
194
195    public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) {
196        if (params != null && params.size() > 0) {
197            query = expandQueryVariables(query, params);
198        }
199        SearchRequestBuilder builder = getSearchRequestBuilder();
200        builder.setQuery(query);
201        return builder;
202    }
203
204    public String expandQueryVariables(String query, Object[] params) {
205        Map<String, Object> qParams = new HashMap<>();
206        for (int i = 0; i < params.length; i++) {
207            query = query.replaceFirst("\\?", "\\${param" + i + "}");
208            qParams.put("param" + i, params[i]);
209        }
210        return expandQueryVariables(query, qParams);
211    }
212
213    public String expandQueryVariables(String query, Map<String, Object> params) {
214        if (params != null && params.size() > 0) {
215            TextTemplate tmpl = new TextTemplate();
216            for (String key : params.keySet()) {
217                Object val = params.get(key);
218                if (val == null) {
219                    continue;
220                } else if (val instanceof Calendar) {
221                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
222                } else if (val instanceof Date) {
223                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
224                } else {
225                    tmpl.setVariable(key, val.toString());
226                }
227            }
228            query = tmpl.processText(query);
229        }
230        return query;
231    }
232
233    @Override
234    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
235        SearchRequestBuilder builder = buildQuery(query, params);
236        if (pageNb > 0) {
237            builder.setFrom(pageNb * pageSize);
238        }
239        if (pageSize > 0) {
240            builder.setSize(pageSize);
241        }
242        logSearchRequest(builder);
243        SearchResponse searchResponse = builder.get();
244        logSearchResponse(searchResponse);
245        return buildLogEntries(searchResponse);
246    }
247
248    @Override
249    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
250            int pageSize) {
251        SearchRequestBuilder builder = getSearchRequestBuilder();
252        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
253        if (eventIds != null && eventIds.length > 0) {
254            if (eventIds.length == 1) {
255                filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
256            } else {
257                filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
258            }
259        }
260        if (categories != null && categories.length > 0) {
261            if (categories.length == 1) {
262                filterBuilder.must(FilterBuilders.termFilter("category", categories[0]));
263            } else {
264                filterBuilder.must(FilterBuilders.termsFilter("category", categories));
265            }
266        }
267        if (path != null) {
268            filterBuilder.must(FilterBuilders.termFilter("docPath", path));
269        }
270
271        if (limit != null) {
272            filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit));
273        }
274
275        builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder));
276
277        if (pageNb > 0) {
278            builder.setFrom(pageNb * pageSize);
279        }
280        if (pageSize > 0) {
281            builder.setSize(pageSize);
282        } else {
283            builder.setSize(Integer.MAX_VALUE);
284        }
285        logSearchRequest(builder);
286        SearchResponse searchResponse = builder.get();
287        logSearchResponse(searchResponse);
288        return buildLogEntries(searchResponse);
289    }
290
291    @Override
292    public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path,
293            int pageNb, int pageSize) {
294
295        Date limit = null;
296        if (dateRange != null) {
297            try {
298                limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange);
299            } catch (AuditQueryException aqe) {
300                aqe.addInfo("Wrong date range query. Query was " + dateRange);
301                throw aqe;
302            }
303        }
304        return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize);
305    }
306
307    @Override
308    public void addLogEntries(List<LogEntry> entries) {
309
310        BulkRequestBuilder bulkRequest = getClient().prepareBulk();
311        JsonFactory factory = new JsonFactory();
312
313        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
314        UIDSequencer seq = uidGeneratorService.getSequencer();
315
316        try {
317
318            for (LogEntry entry : entries) {
319                entry.setId(seq.getNext(SEQ_NAME));
320                XContentBuilder builder = jsonBuilder();
321                JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
322                AuditEntryJSONWriter.asJSON(jsonGen, entry);
323                bulkRequest.add(getClient().prepareIndex(IDX_NAME, IDX_TYPE, String.valueOf(entry.getId())).setSource(
324                        builder));
325            }
326
327            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
328            if (bulkResponse.hasFailures()) {
329                for (BulkItemResponse response : bulkResponse.getItems()) {
330                    if (response.isFailed()) {
331                        log.error("Unable to index audit entry " + response.getItemId() + " :"
332                                + response.getFailureMessage());
333                    }
334                }
335            }
336        } catch (IOException e) {
337            throw new NuxeoException("Error while indexing Audit entries", e);
338        }
339
340    }
341
342    @Override
343    public Long getEventsCount(String eventId) {
344        CountResponse res = getClient().prepareCount(IDX_NAME).setTypes(IDX_TYPE).setQuery(
345                QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get();
346        return res.getCount();
347    }
348
349    protected BaseLogEntryProvider getProvider() {
350
351        if (provider == null) {
352            provider = new BaseLogEntryProvider() {
353
354                @Override
355                public int removeEntries(String eventId, String pathPattern) {
356                    throw new UnsupportedOperationException("Not implemented yet!");
357                }
358
359                @Override
360                public void addLogEntry(LogEntry logEntry) {
361                    List<LogEntry> entries = new ArrayList<>();
362                    entries.add(logEntry);
363                    addLogEntries(entries);
364                }
365            };
366        }
367        return provider;
368    }
369
370    @Override
371    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
372        return syncLogCreationEntries(getProvider(), repoId, path, recurs);
373    }
374
375    protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
376
377        if (searchDocumentModel == null) {
378            return FilterBuilders.matchAllFilter();
379        }
380
381        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
382
383        int nbFilters = 0;
384
385        for (PredicateDefinition predicate : predicates) {
386
387            // extract data from DocumentModel
388            PredicateFieldDefinition[] fieldDef = predicate.getValues();
389            Object[] val = new Object[fieldDef.length];
390            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
391                if (fieldDef[fidx].getXpath() != null) {
392                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
393                } else {
394                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
395                }
396            }
397
398            if (!isNonNullParam(val)) {
399                // skip predicate where all values are null
400                continue;
401            }
402
403            nbFilters++;
404
405            String op = predicate.getOperator();
406            if (op.equalsIgnoreCase("IN")) {
407
408                String[] values = null;
409                if (val[0] instanceof Iterable<?>) {
410                    List<String> l = new ArrayList<>();
411                    Iterable<?> vals = (Iterable<?>) val[0];
412                    Iterator<?> valueIterator = vals.iterator();
413
414                    while (valueIterator.hasNext()) {
415
416                        Object v = valueIterator.next();
417                        if (v != null) {
418                            l.add(v.toString());
419                        }
420                    }
421                    values = l.toArray(new String[l.size()]);
422                } else if (val[0] instanceof Object[]) {
423                    values = (String[]) val[0];
424                }
425                filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values));
426            } else if (op.equalsIgnoreCase("BETWEEN")) {
427                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
428                if (val.length > 1) {
429                    filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1]));
430                }
431            } else if (">".equals(op)) {
432                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
433            } else if (">=".equals(op)) {
434                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0]));
435            } else if ("<".equals(op)) {
436                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0]));
437            } else if ("<=".equals(op)) {
438                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0]));
439            } else {
440                filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0]));
441            }
442        }
443
444        if (nbFilters == 0) {
445            return FilterBuilders.matchAllFilter();
446        }
447        return filterBuilder;
448    }
449
450    public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
451            DocumentModel searchDocumentModel) {
452        SearchRequestBuilder builder = getSearchRequestBuilder();
453        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
454        FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
455        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
456        return builder;
457    }
458
459    protected boolean isNonNullParam(Object[] val) {
460        if (val == null) {
461            return false;
462        }
463        for (Object v : val) {
464            if (v != null) {
465                if (v instanceof String) {
466                    if (!((String) v).isEmpty()) {
467                        return true;
468                    }
469                } else if (v instanceof String[]) {
470                    if (((String[]) v).length > 0) {
471                        return true;
472                    }
473                } else {
474                    return true;
475                }
476            }
477        }
478        return false;
479    }
480
481    public String migrate(final int batchSize) {
482
483        final AuditBackend sourceBackend = new DefaultAuditBackend();
484        sourceBackend.activate(component);
485
486        final String MIGRATION_WORK_ID = "AuditMigration";
487
488        WorkManager wm = Framework.getService(WorkManager.class);
489
490        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
491        if (migrationState != null) {
492            return "Migration already scheduled : " + migrationState.toString();
493        }
494        @SuppressWarnings("unchecked")
495        List<Long> res = (List<Long>) sourceBackend.nativeQuery("select count(*) from LogEntry", 1, 20);
496
497        final long nbEntriesToMigrate = res.get(0);
498
499        Work migrationWork = new AbstractWork(MIGRATION_WORK_ID) {
500
501            private static final long serialVersionUID = 1L;
502
503            @Override
504            public String getTitle() {
505                return "Audit migration worker";
506            }
507
508            @Override
509            public void work() {
510                TransactionHelper.commitOrRollbackTransaction();
511                try {
512
513                    long t0 = System.currentTimeMillis();
514                    long nbEntriesMigrated = 0;
515                    int pageIdx = 0;
516
517                    while (nbEntriesMigrated < nbEntriesToMigrate) {
518                        @SuppressWarnings("unchecked")
519                        List<LogEntry> entries = (List<LogEntry>) sourceBackend.nativeQuery(
520                                "from LogEntry log order by log.id asc", pageIdx, batchSize);
521
522                        if (entries.size() == 0) {
523                            log.warn("Migration ending after " + nbEntriesMigrated + " entries");
524                            break;
525                        }
526                        setProgress(new Progress(nbEntriesMigrated, nbEntriesToMigrate));
527                        addLogEntries(entries);
528                        pageIdx++;
529                        nbEntriesMigrated += entries.size();
530                        log.info("Migrated " + nbEntriesMigrated + " log entries on " + nbEntriesToMigrate);
531                        double dt = (System.currentTimeMillis() - t0) / 1000.0;
532                        if (dt != 0) {
533                            log.info("Migration speed: " + (nbEntriesMigrated / dt) + " entries/s");
534                        }
535                    }
536                    log.info("Audit migration from SQL to Elasticsearch done: " + nbEntriesMigrated
537                            + " entries migrated");
538
539                    // Log technical event in audit as a flag to know if the migration has been processed at application
540                    // startup
541                    AuditLogger logger = Framework.getService(AuditLogger.class);
542                    LogEntry entry = logger.newLogEntry();
543                    entry.setCategory("NuxeoTechnicalEvent");
544                    entry.setEventId(MIGRATION_DONE_EVENT);
545                    entry.setPrincipalName(SecurityConstants.SYSTEM_USERNAME);
546                    entry.setEventDate(Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
547                    addLogEntries(Collections.singletonList(entry));
548                } finally {
549                    TransactionHelper.startTransaction();
550                    sourceBackend.deactivate();
551                }
552            }
553        };
554
555        wm.schedule(migrationWork);
556        return "Migration work started : " + MIGRATION_WORK_ID;
557    }
558
559    protected void logSearchResponse(SearchResponse response) {
560        if (log.isDebugEnabled()) {
561            log.debug("Response: " + response.toString());
562        }
563    }
564
565    protected void logSearchRequest(SearchRequestBuilder request) {
566        if (log.isDebugEnabled()) {
567            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
568                    IDX_NAME, IDX_TYPE, request.toString()));
569        }
570    }
571
572    @Override
573    public void onApplicationStarted() {
574        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
575            if (!isMigrationDone()) {
576                log.info(String.format(
577                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
578                        MIGRATION_FLAG_PROP));
579                // Drop audit index first in case of a previous bad migration
580                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
581                esa.dropAndInitIndex(IDX_NAME);
582                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
583                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
584                if (batchSizeProp != null) {
585                    batchSize = Integer.parseInt(batchSizeProp);
586                }
587                migrate(batchSize);
588            } else {
589                log.warn(String.format(
590                        "Property %s is true but migration is already done, please set this property to false",
591                        MIGRATION_FLAG_PROP));
592            }
593        } else {
594            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
595        }
596    }
597
598    /**
599     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
600     */
601    protected void ensureUIDSequencer(Client esClient) {
602        boolean auditIndexExists = esClient.admin().indices().prepareExists(IDX_NAME).execute().actionGet().isExists();
603        if (!auditIndexExists) {
604            return;
605        }
606
607        // Get max log entry id
608        SearchRequestBuilder builder = getSearchRequestBuilder();
609        builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id"));
610        SearchResponse searchResponse = builder.execute().actionGet();
611        Max agg = searchResponse.getAggregations().get("maxAgg");
612        int maxLogEntryId = (int) agg.getValue();
613
614        // Get next sequence id
615        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
616        UIDSequencer seq = uidGeneratorService.getSequencer();
617        int nextSequenceId = seq.getNext(SEQ_NAME);
618
619        // Increment sequence to max log entry id if needed
620        if (nextSequenceId < maxLogEntryId) {
621            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
622                    nextSequenceId, maxLogEntryId));
623            seq.initSequence(SEQ_NAME, maxLogEntryId);
624        }
625    }
626
627    @Override
628    public ExtendedInfo newExtendedInfo(Serializable value) {
629        return new ExtendedInfo() {
630
631            /**
632             *
633             */
634            private static final long serialVersionUID = 1L;
635
636            @Override
637            public Long getId() {
638                throw new UnsupportedOperationException();
639            }
640
641            @Override
642            public void setId(Long id) {
643                throw new UnsupportedOperationException();
644            }
645
646            @Override
647            public Serializable getSerializableValue() {
648                return value;
649            }
650
651            @Override
652            public <T> T getValue(Class<T> clazz) {
653                return clazz.cast(getSerializableValue());
654            }
655
656        };
657    }
658}