001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION;
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
025
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.Serializable;
029import java.util.ArrayList;
030import java.util.Calendar;
031import java.util.Collections;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.function.Function;
038
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.elasticsearch.action.bulk.BulkItemResponse;
043import org.elasticsearch.action.bulk.BulkRequest;
044import org.elasticsearch.action.bulk.BulkResponse;
045import org.elasticsearch.action.get.GetRequest;
046import org.elasticsearch.action.get.GetResponse;
047import org.elasticsearch.action.index.IndexRequest;
048import org.elasticsearch.action.search.ClearScrollRequest;
049import org.elasticsearch.action.search.SearchRequest;
050import org.elasticsearch.action.search.SearchResponse;
051import org.elasticsearch.action.search.SearchScrollRequest;
052import org.elasticsearch.action.search.SearchType;
053import org.elasticsearch.common.ParsingException;
054import org.elasticsearch.common.io.stream.BytesStreamOutput;
055import org.elasticsearch.common.settings.Settings;
056import org.elasticsearch.common.unit.TimeValue;
057import org.elasticsearch.common.xcontent.NamedXContentRegistry;
058import org.elasticsearch.common.xcontent.XContentBuilder;
059import org.elasticsearch.common.xcontent.XContentFactory;
060import org.elasticsearch.common.xcontent.XContentParser;
061import org.elasticsearch.common.xcontent.XContentType;
062import org.elasticsearch.index.query.BoolQueryBuilder;
063import org.elasticsearch.index.query.QueryBuilder;
064import org.elasticsearch.index.query.QueryBuilders;
065import org.elasticsearch.search.SearchHit;
066import org.elasticsearch.search.SearchModule;
067import org.elasticsearch.search.aggregations.AggregationBuilders;
068import org.elasticsearch.search.aggregations.metrics.max.Max;
069import org.elasticsearch.search.builder.SearchSourceBuilder;
070import org.elasticsearch.search.sort.SortOrder;
071import org.json.JSONException;
072import org.json.JSONObject;
073import org.nuxeo.common.utils.TextTemplate;
074import org.nuxeo.ecm.core.api.CursorResult;
075import org.nuxeo.ecm.core.api.CursorService;
076import org.nuxeo.ecm.core.api.DocumentModel;
077import org.nuxeo.ecm.core.api.NuxeoException;
078import org.nuxeo.ecm.core.api.ScrollResult;
079import org.nuxeo.ecm.core.query.sql.model.Literals;
080import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
081import org.nuxeo.ecm.core.query.sql.model.Operand;
082import org.nuxeo.ecm.core.query.sql.model.Operator;
083import org.nuxeo.ecm.core.query.sql.model.OrderByList;
084import org.nuxeo.ecm.core.query.sql.model.Predicate;
085import org.nuxeo.ecm.core.query.sql.model.Reference;
086import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
087import org.nuxeo.ecm.core.uidgen.UIDSequencer;
088import org.nuxeo.ecm.core.work.api.Work;
089import org.nuxeo.ecm.core.work.api.Work.State;
090import org.nuxeo.ecm.core.work.api.WorkManager;
091import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder;
092import org.nuxeo.ecm.platform.audit.api.AuditReader;
093import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
094import org.nuxeo.ecm.platform.audit.api.LogEntry;
095import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
096import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
097import org.nuxeo.ecm.platform.audit.service.AuditBackend;
098import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
099import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
100import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
101import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
102import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
103import org.nuxeo.elasticsearch.ElasticSearchConstants;
104import org.nuxeo.elasticsearch.api.ESClient;
105import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
106import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
107import org.nuxeo.runtime.api.Framework;
108import org.nuxeo.runtime.model.DefaultComponent;
109
110import com.fasterxml.jackson.core.JsonFactory;
111import com.fasterxml.jackson.core.JsonGenerator;
112import com.fasterxml.jackson.databind.ObjectMapper;
113
114/**
115 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
116 *
117 * @author tiry
118 */
119public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
120
121    public static final String SEQ_NAME = "audit";
122
123    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
124
125    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
126
127    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
128
129    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
130
131    protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService;
132
133    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
134        super(component, config);
135    }
136
137    /**
138     * @since 9.3
139     */
140    public ESAuditBackend() {
141        super();
142    }
143
144    protected ESClient esClient;
145
146    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
147
148    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
149
150        @Override
151        public int removeEntries(String eventId, String pathPattern) {
152            throw new UnsupportedOperationException("Not implemented yet!");
153        }
154
155        @Override
156        public void addLogEntry(LogEntry logEntry) {
157            List<LogEntry> entries = new ArrayList<>();
158            entries.add(logEntry);
159            addLogEntries(entries);
160        }
161
162    };
163
164    protected ESClient getClient() {
165        log.info("Activate Elasticsearch backend for Audit");
166        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
167        ESClient client = esa.getClient();
168        ensureUIDSequencer(client);
169        return client;
170    }
171
172    protected boolean isMigrationDone() {
173        AuditReader reader = Framework.getService(AuditReader.class);
174        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
175        return !entries.isEmpty();
176    }
177
178    @Override
179    public int getApplicationStartedOrder() {
180        int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
181                "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder();
182        int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
183                "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder();
184        return Integer.max(elasticOrder, uidgenOrder) + 1;
185    }
186
187    @Override
188    public void onApplicationStarted() {
189        esClient = getClient();
190        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
191            if (!isMigrationDone()) {
192                log.info(String.format(
193                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
194                        MIGRATION_FLAG_PROP));
195                // Drop audit index first in case of a previous bad migration
196                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
197                esa.dropAndInitIndex(getESIndexName());
198                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
199                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
200                if (batchSizeProp != null) {
201                    batchSize = Integer.parseInt(batchSizeProp);
202                }
203                migrate(batchSize);
204            } else {
205                log.warn(String.format(
206                        "Property %s is true but migration is already done, please set this property to false",
207                        MIGRATION_FLAG_PROP));
208            }
209        } else {
210            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
211        }
212        cursorService = new CursorService<>(SearchHit::getSourceAsString);
213    }
214
215    @Override
216    public void onApplicationStopped() {
217        if (esClient == null) {
218            return;
219        }
220        try {
221            esClient.close();
222            cursorService.clear();
223        } catch (Exception e) {
224            log.warn("Fail to close esClient: " + e.getMessage(), e);
225        } finally {
226            esClient = null;
227            cursorService = null;
228        }
229    }
230
231    @Override
232    @SuppressWarnings("unchecked")
233    public List<LogEntry> queryLogs(AuditQueryBuilder builder) {
234        // prepare parameters
235        Predicate predicate = builder.predicate();
236        OrderByList orders = builder.orders();
237        long offset = builder.offset();
238        long limit = builder.limit();
239
240        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
241
242        // Perform search
243        List<LogEntry> logEntries;
244        SearchRequest request = createSearchRequest();
245        request.source(source);
246        if (limit == 0) {
247            // return all result -> use the scroll api
248            // offset is not taking into account when querying all results
249            TimeValue keepAlive = TimeValue.timeValueMinutes(1);
250            request.scroll(keepAlive);
251            // the size here is the size of each scrolls
252            source.size(100);
253
254            // run request
255            SearchResponse searchResponse = runRequest(request);
256
257            // Build log entries
258            logEntries = buildLogEntries(searchResponse);
259            // Scroll on next results
260            for (; //
261            searchResponse.getHits().getHits().length > 0
262                    && logEntries.size() < searchResponse.getHits().getTotalHits(); //
263                    searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
264                // Build log entries
265                logEntries.addAll(buildLogEntries(searchResponse));
266            }
267        } else {
268            // return a page -> use a regular search
269            source.from((int) offset).size((int) limit);
270
271            // run request
272            SearchResponse searchResponse = runRequest(request);
273
274            // Build log entries
275            logEntries = buildLogEntries(searchResponse);
276        }
277
278        return logEntries;
279    }
280
281    protected SearchSourceBuilder createSearchRequestSource(Predicate predicate, OrderByList orders) {
282        // create ES query builder
283        QueryBuilder query = createQueryBuilder(predicate);
284
285        // create ES source
286        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100);
287
288        // create sort
289        orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC));
290        return source;
291    }
292
293    protected QueryBuilder createQueryBuilder(Predicate andPredicate) {
294        // cast parameters
295        // current implementation only support a MultiExpression with AND operator
296        @SuppressWarnings("unchecked")
297        List<Predicate> predicates = (List<Predicate>) ((List<?>) ((MultiExpression) andPredicate).values);
298        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
299        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
300
301        QueryBuilder query;
302        if (predicates.isEmpty()) {
303            query = QueryBuilders.matchAllQuery();
304        } else {
305            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
306            for (Predicate predicate : predicates) {
307                String leftName = getFieldName.apply(predicate.lvalue);
308                Operator operator = predicate.operator;
309                Object rightValue = Literals.valueOf(predicate.rvalue);
310                if (Operator.EQ.equals(operator)) {
311                    boolQuery.must(QueryBuilders.termQuery(leftName, rightValue));
312                } else if (Operator.NOTEQ.equals(operator)) {
313                    boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue));
314                } else if (Operator.LT.equals(operator)) {
315                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue));
316                } else if (Operator.LTEQ.equals(operator)) {
317                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue));
318                } else if (Operator.GTEQ.equals(operator)) {
319                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue));
320                } else if (Operator.GT.equals(operator)) {
321                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue));
322                } else if (Operator.IN.equals(operator)) {
323                    boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue));
324                } else if (Operator.STARTSWITH.equals(operator)) {
325                    boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue));
326                }
327            }
328            query = boolQuery;
329        }
330        return query;
331    }
332
333    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
334        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
335        ObjectMapper mapper = new ObjectMapper();
336        for (SearchHit hit : searchResponse.getHits()) {
337            try {
338                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
339            } catch (IOException e) {
340                log.error("Error while reading Audit Entry from ES", e);
341            }
342        }
343        return entries;
344    }
345
346    protected SearchRequest createSearchRequest() {
347        return new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH);
348    }
349
350    @Override
351    public LogEntry getLogEntryByID(long id) {
352        GetResponse ret = esClient.get(
353                new GetRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id)));
354        if (!ret.isExists()) {
355            return null;
356        }
357        try {
358            return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class);
359        } catch (IOException e) {
360            throw new NuxeoException("Unable to read Entry for id " + id, e);
361        }
362    }
363
364    public SearchRequest buildQuery(String query, Map<String, Object> params) {
365        if (params != null && params.size() > 0) {
366            query = expandQueryVariables(query, params);
367        }
368        SearchRequest request = createSearchRequest();
369        SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query);
370        return request.source(sourceBuilder);
371    }
372
373    protected SearchSourceBuilder createSearchSourceBuilder(String query) {
374        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
375        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
376        try {
377            try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
378                    new NamedXContentRegistry(searchModule.getNamedXContents()), THROW_UNSUPPORTED_OPERATION, query)) {
379                searchSourceBuilder.parseXContent(parser);
380            }
381        } catch (IOException | ParsingException e) {
382            log.error("Invalid query: " + query + ": " + e.getMessage(), e);
383            throw new IllegalArgumentException("Bad query: " + query);
384        }
385        return searchSourceBuilder;
386    }
387
388    public String expandQueryVariables(String query, Object[] params) {
389        Map<String, Object> qParams = new HashMap<>();
390        for (int i = 0; i < params.length; i++) {
391            query = query.replaceFirst("\\?", "\\${param" + i + "}");
392            qParams.put("param" + i, params[i]);
393        }
394        return expandQueryVariables(query, qParams);
395    }
396
397    public String expandQueryVariables(String query, Map<String, Object> params) {
398        if (params != null && params.size() > 0) {
399            TextTemplate tmpl = new TextTemplate();
400            for (String key : params.keySet()) {
401                Object val = params.get(key);
402                if (val == null) {
403                    continue;
404                } else if (val instanceof Calendar) {
405                    tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime()));
406                } else if (val instanceof Date) {
407                    tmpl.setVariable(key, Long.toString(((Date) val).getTime()));
408                } else {
409                    tmpl.setVariable(key, val.toString());
410                }
411            }
412            query = tmpl.processText(query);
413        }
414        return query;
415    }
416
417    @Override
418    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
419        SearchRequest request = buildQuery(query, params);
420        if (pageNb > 0) {
421            request.source().from(pageNb * pageSize);
422        }
423        if (pageSize > 0) {
424            request.source().size(pageSize);
425        }
426        SearchResponse searchResponse = runRequest(request);
427        return buildLogEntries(searchResponse);
428    }
429
430    @Override
431    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
432            int pageSize) {
433        SearchRequest request = createSearchRequest();
434        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
435        if (eventIds != null && eventIds.length > 0) {
436            if (eventIds.length == 1) {
437                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
438            } else {
439                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
440            }
441        }
442        if (categories != null && categories.length > 0) {
443            if (categories.length == 1) {
444                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
445            } else {
446                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
447            }
448        }
449        if (path != null) {
450            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
451        }
452
453        if (limit != null) {
454            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit)));
455        }
456        request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder)));
457        if (pageNb > 0) {
458            request.source().from(pageNb * pageSize);
459        }
460        if (pageSize > 0) {
461            request.source().size(pageSize);
462        }
463        SearchResponse searchResponse = runRequest(request);
464        return buildLogEntries(searchResponse);
465    }
466
467    @Override
468    public void addLogEntries(List<LogEntry> entries) {
469
470        if (entries.isEmpty()) {
471            return;
472        }
473
474        BulkRequest bulkRequest = new BulkRequest();
475        JsonFactory factory = new JsonFactory();
476
477        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
478        UIDSequencer seq = uidGeneratorService.getSequencer();
479
480        try {
481
482            for (LogEntry entry : entries) {
483                entry.setId(seq.getNextLong(SEQ_NAME));
484                if (log.isDebugEnabled()) {
485                    log.debug(String.format("Indexing log entry: %s", entry));
486                }
487                entry.setLogDate(new Date());
488                try (OutputStream out = new BytesStreamOutput(); //
489                        JsonGenerator jg = factory.createGenerator(out); //
490                        XContentBuilder builder = jsonBuilder(out)) {
491                    ObjectMapper mapper = new ObjectMapper();
492                    mapper.writeValue(jg, entry);
493                    bulkRequest.add(new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
494                            String.valueOf(entry.getId())).source(builder));
495                }
496            }
497
498            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
499            if (bulkResponse.hasFailures()) {
500                for (BulkItemResponse response : bulkResponse.getItems()) {
501                    if (response.isFailed()) {
502                        log.error("Unable to index audit entry " + response.getItemId() + " :"
503                                + response.getFailureMessage());
504                    }
505                }
506            }
507        } catch (IOException e) {
508            throw new NuxeoException("Error while indexing Audit entries", e);
509        }
510
511    }
512
513    @Override
514    public Long getEventsCount(String eventId) {
515        SearchResponse res = esClient.search(
516                new SearchRequest(getESIndexName()).source(
517                        new SearchSourceBuilder().query(
518                                QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId)))
519                                                 .size(0)));
520        return res.getHits().getTotalHits();
521    }
522
523    @Override
524    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
525        return syncLogCreationEntries(provider, repoId, path, recurs);
526    }
527
528    public SearchResponse search(SearchRequest request) {
529        String[] indices = request.indices();
530        if (indices == null || indices.length != 1) {
531            throw new IllegalStateException("Search on audit must include index name: " + request);
532        }
533        if (!getESIndexName().equals(indices[0])) {
534            throw new IllegalStateException("Search on audit must be on audit index: " + request);
535        }
536        return runRequest(request);
537    }
538
539    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
540
541        if (searchDocumentModel == null) {
542            return QueryBuilders.matchAllQuery();
543        }
544
545        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
546
547        int nbFilters = 0;
548
549        for (PredicateDefinition predicate : predicates) {
550
551            // extract data from DocumentModel
552            PredicateFieldDefinition[] fieldDef = predicate.getValues();
553            Object[] val = new Object[fieldDef.length];
554            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
555                if (fieldDef[fidx].getXpath() != null) {
556                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
557                } else {
558                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
559                }
560            }
561
562            if (!isNonNullParam(val)) {
563                // skip predicate where all values are null
564                continue;
565            }
566
567            nbFilters++;
568
569            String op = predicate.getOperator();
570            if (op.equalsIgnoreCase("IN")) {
571
572                String[] values = null;
573                if (val[0] instanceof Iterable<?>) {
574                    List<String> l = new ArrayList<>();
575                    Iterable<?> vals = (Iterable<?>) val[0];
576
577                    for (Object v : vals) {
578                        if (v != null) {
579                            l.add(v.toString());
580                        }
581                    }
582                    values = l.toArray(new String[l.size()]);
583                } else if (val[0] instanceof Object[]) {
584                    values = (String[]) val[0];
585                }
586                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
587            } else if (op.equalsIgnoreCase("BETWEEN")) {
588                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
589                if (val.length > 1) {
590                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1])));
591                }
592            } else if (">".equals(op)) {
593                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
594            } else if (">=".equals(op)) {
595                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0])));
596            } else if ("<".equals(op)) {
597                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0])));
598            } else if ("<=".equals(op)) {
599                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0])));
600            } else {
601                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0])));
602            }
603        }
604
605        if (nbFilters == 0) {
606            return QueryBuilders.matchAllQuery();
607        }
608        return filterBuilder;
609    }
610
611    protected Object convertDate(Object o) {
612        // Date are convert to timestamp ms which is a known format by default for ES
613        if (o instanceof Calendar) {
614            return Long.valueOf(((Calendar) o).getTime().getTime());
615        } else if (o instanceof Date) {
616            return Long.valueOf(((Date) o).getTime());
617        }
618        return o;
619    }
620
621    public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
622            DocumentModel searchDocumentModel) {
623        SearchRequest request = createSearchRequest();
624        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
625        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
626        request.source(
627                new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)));
628        return request;
629    }
630
631    protected boolean isNonNullParam(Object[] val) {
632        if (val == null) {
633            return false;
634        }
635        for (Object v : val) {
636            if (v != null) {
637                if (v instanceof String) {
638                    if (!((String) v).isEmpty()) {
639                        return true;
640                    }
641                } else if (v instanceof String[]) {
642                    if (((String[]) v).length > 0) {
643                        return true;
644                    }
645                } else {
646                    return true;
647                }
648            }
649        }
650        return false;
651    }
652
653    @SuppressWarnings("deprecation")
654    public String migrate(final int batchSize) {
655
656        final String MIGRATION_WORK_ID = "AuditMigration";
657
658        WorkManager wm = Framework.getService(WorkManager.class);
659        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
660        if (migrationState != null) {
661            return "Migration already scheduled : " + migrationState.toString();
662        }
663
664        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
665        wm.schedule(migrationWork);
666        return "Migration work started : " + MIGRATION_WORK_ID;
667    }
668
669    SearchResponse runRequest(SearchRequest request) {
670        logSearchRequest(request);
671        SearchResponse response = esClient.search(request);
672        logSearchResponse(response);
673        return response;
674    }
675
676    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
677        if (log.isDebugEnabled()) {
678            log.debug(String.format(
679                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
680                    keepAlive, scrollId));
681        }
682        SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive));
683        logSearchResponse(response);
684        return response;
685    }
686
687    protected void logSearchResponse(SearchResponse response) {
688        if (log.isDebugEnabled()) {
689            log.debug("Response: " + response.toString());
690        }
691    }
692
693    protected void logSearchRequest(SearchRequest request) {
694        if (log.isDebugEnabled()) {
695            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
696                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
697        }
698    }
699
700    /**
701     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
702     */
703    protected void ensureUIDSequencer(ESClient esClient) {
704        boolean auditIndexExists = esClient.indexExists(getESIndexName());
705        if (!auditIndexExists) {
706            return;
707        }
708
709        // Get max log entry id
710        SearchRequest request = createSearchRequest();
711        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
712                                                .aggregation(AggregationBuilders.max("maxAgg").field("id")));
713        SearchResponse searchResponse = esClient.search(request);
714        Max agg = searchResponse.getAggregations().get("maxAgg");
715        long maxLogEntryId = (long) agg.getValue();
716
717        // Get next sequence id
718        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
719        UIDSequencer seq = uidGeneratorService.getSequencer();
720        seq.init();
721        long nextSequenceId = seq.getNextLong(SEQ_NAME);
722
723        // Increment sequence to max log entry id if needed
724        if (nextSequenceId < maxLogEntryId) {
725            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
726                    nextSequenceId, maxLogEntryId));
727            seq.initSequence(SEQ_NAME, maxLogEntryId);
728        }
729    }
730
731    @Override
732    public ExtendedInfo newExtendedInfo(Serializable value) {
733        return new ESExtendedInfo(value);
734    }
735
736    protected String getESIndexName() {
737        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
738        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
739    }
740
741    @Override
742    public void append(List<String> jsonEntries) {
743        BulkRequest bulkRequest = new BulkRequest();
744        for (String json : jsonEntries) {
745            try {
746                String entryId = new JSONObject(json).getString(LOG_ID);
747                if (StringUtils.isBlank(entryId)) {
748                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
749                }
750                IndexRequest request = new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, entryId);
751                request.source(json, XContentType.JSON);
752                bulkRequest.add(request);
753            } catch (JSONException e) {
754                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
755            }
756        }
757        esClient.bulk(bulkRequest);
758    }
759
760    @Override
761    public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
762        // prepare parameters
763        Predicate predicate = builder.predicate();
764        OrderByList orders = builder.orders();
765
766        // create source
767        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
768        source.size(batchSize);
769        // create request
770        SearchRequest request = createSearchRequest();
771        request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds));
772        SearchResponse response = runRequest(request);
773        // register cursor
774        String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds));
775        return scroll(scrollId);
776    }
777
778    @Override
779    public ScrollResult<String> scroll(String scrollId) {
780        return cursorService.scroll(scrollId);
781    }
782
783    public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> {
784
785        protected final String scrollId;
786
787        protected boolean end;
788
789        public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) {
790            super(response.getHits().iterator(), batchSize, keepAliveSeconds);
791            this.scrollId = response.getScrollId();
792        }
793
794        @Override
795        public boolean hasNext() {
796            if (cursor == null || end) {
797                return false;
798            } else if (cursor.hasNext()) {
799                return true;
800            } else {
801                runNextScroll();
802                return !end;
803            }
804        }
805
806        @Override
807        public SearchHit next() {
808            if (cursor != null && !cursor.hasNext() && !end) {
809                // try to run a next scroll
810                runNextScroll();
811            }
812            return super.next();
813        }
814
815        protected void runNextScroll() {
816            SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId,
817                    TimeValue.timeValueSeconds(keepAliveSeconds));
818            cursor = response.getHits().iterator();
819            end = !cursor.hasNext();
820        }
821
822        @Override
823        public void close() {
824            ClearScrollRequest request = new ClearScrollRequest();
825            request.addScrollId(scrollId);
826            esClient.clearScroll(request);
827            end = true;
828            // Call super close to clear cursor
829            super.close();
830        }
831
832    }
833
834}