001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
024
025import java.io.IOException;
026import java.io.OutputStream;
027import java.io.Serializable;
028import java.util.ArrayList;
029import java.util.Calendar;
030import java.util.Collections;
031import java.util.Date;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.function.Function;
037
038import org.apache.commons.lang3.StringUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.elasticsearch.action.bulk.BulkItemResponse;
042import org.elasticsearch.action.bulk.BulkRequest;
043import org.elasticsearch.action.bulk.BulkResponse;
044import org.elasticsearch.action.get.GetRequest;
045import org.elasticsearch.action.get.GetResponse;
046import org.elasticsearch.action.index.IndexRequest;
047import org.elasticsearch.action.search.ClearScrollRequest;
048import org.elasticsearch.action.search.SearchRequest;
049import org.elasticsearch.action.search.SearchResponse;
050import org.elasticsearch.action.search.SearchScrollRequest;
051import org.elasticsearch.action.search.SearchType;
052import org.elasticsearch.common.ParsingException;
053import org.elasticsearch.common.io.stream.BytesStreamOutput;
054import org.elasticsearch.common.settings.Settings;
055import org.elasticsearch.common.unit.TimeValue;
056import org.elasticsearch.common.xcontent.NamedXContentRegistry;
057import org.elasticsearch.common.xcontent.XContentBuilder;
058import org.elasticsearch.common.xcontent.XContentFactory;
059import org.elasticsearch.common.xcontent.XContentParser;
060import org.elasticsearch.common.xcontent.XContentType;
061import org.elasticsearch.index.query.BoolQueryBuilder;
062import org.elasticsearch.index.query.QueryBuilder;
063import org.elasticsearch.index.query.QueryBuilders;
064import org.elasticsearch.index.query.QueryParseContext;
065import org.elasticsearch.search.SearchHit;
066import org.elasticsearch.search.SearchModule;
067import org.elasticsearch.search.aggregations.AggregationBuilders;
068import org.elasticsearch.search.aggregations.metrics.max.Max;
069import org.elasticsearch.search.builder.SearchSourceBuilder;
070import org.elasticsearch.search.sort.SortOrder;
071import org.json.JSONException;
072import org.json.JSONObject;
073import org.nuxeo.common.utils.TextTemplate;
074import org.nuxeo.ecm.core.api.CursorResult;
075import org.nuxeo.ecm.core.api.CursorService;
076import org.nuxeo.ecm.core.api.DocumentModel;
077import org.nuxeo.ecm.core.api.NuxeoException;
078import org.nuxeo.ecm.core.api.ScrollResult;
079import org.nuxeo.ecm.core.query.sql.model.Literals;
080import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
081import org.nuxeo.ecm.core.query.sql.model.Operand;
082import org.nuxeo.ecm.core.query.sql.model.Operator;
083import org.nuxeo.ecm.core.query.sql.model.OrderByList;
084import org.nuxeo.ecm.core.query.sql.model.Predicate;
085import org.nuxeo.ecm.core.query.sql.model.Reference;
086import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
087import org.nuxeo.ecm.core.uidgen.UIDSequencer;
088import org.nuxeo.ecm.core.work.api.Work;
089import org.nuxeo.ecm.core.work.api.Work.State;
090import org.nuxeo.ecm.core.work.api.WorkManager;
091import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder;
092import org.nuxeo.ecm.platform.audit.api.AuditReader;
093import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
094import org.nuxeo.ecm.platform.audit.api.LogEntry;
095import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
096import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
097import org.nuxeo.ecm.platform.audit.service.AuditBackend;
098import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
099import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
100import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
101import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
102import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
103import org.nuxeo.elasticsearch.ElasticSearchConstants;
104import org.nuxeo.elasticsearch.api.ESClient;
105import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
106import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
107import org.nuxeo.runtime.api.Framework;
108import org.nuxeo.runtime.model.DefaultComponent;
109
110import com.fasterxml.jackson.core.JsonFactory;
111import com.fasterxml.jackson.core.JsonGenerator;
112import com.fasterxml.jackson.databind.ObjectMapper;
113
114/**
115 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
116 *
117 * @author tiry
118 */
119public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
120
121    public static final String SEQ_NAME = "audit";
122
123    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
124
125    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
126
127    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
128
129    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
130
131    protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService;
132
133    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
134        super(component, config);
135    }
136
137    /**
138     * @since 9.3
139     */
140    public ESAuditBackend() {
141        super();
142    }
143
144    protected ESClient esClient;
145
146    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
147
148    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
149
150        @Override
151        public int removeEntries(String eventId, String pathPattern) {
152            throw new UnsupportedOperationException("Not implemented yet!");
153        }
154
155        @Override
156        public void addLogEntry(LogEntry logEntry) {
157            List<LogEntry> entries = new ArrayList<>();
158            entries.add(logEntry);
159            addLogEntries(entries);
160        }
161
162    };
163
164    protected ESClient getClient() {
165        log.info("Activate Elasticsearch backend for Audit");
166        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
167        ESClient client = esa.getClient();
168        ensureUIDSequencer(client);
169        return client;
170    }
171
172    protected boolean isMigrationDone() {
173        AuditReader reader = Framework.getService(AuditReader.class);
174        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
175        return !entries.isEmpty();
176    }
177
178    @Override
179    public int getApplicationStartedOrder() {
180        int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
181                "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder();
182        int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
183                "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder();
184        return Integer.max(elasticOrder, uidgenOrder) + 1;
185    }
186
187    @Override
188    public void onApplicationStarted() {
189        esClient = getClient();
190        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
191            if (!isMigrationDone()) {
192                log.info(String.format(
193                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
194                        MIGRATION_FLAG_PROP));
195                // Drop audit index first in case of a previous bad migration
196                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
197                esa.dropAndInitIndex(getESIndexName());
198                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
199                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
200                if (batchSizeProp != null) {
201                    batchSize = Integer.parseInt(batchSizeProp);
202                }
203                migrate(batchSize);
204            } else {
205                log.warn(String.format(
206                        "Property %s is true but migration is already done, please set this property to false",
207                        MIGRATION_FLAG_PROP));
208            }
209        } else {
210            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
211        }
212        cursorService = new CursorService<>(SearchHit::getSourceAsString);
213    }
214
215    @Override
216    public void onApplicationStopped() {
217        if (esClient == null) {
218            return;
219        }
220        try {
221            esClient.close();
222            cursorService.clear();
223        } catch (Exception e) {
224            log.warn("Fail to close esClient: " + e.getMessage(), e);
225        } finally {
226            esClient = null;
227            cursorService = null;
228        }
229    }
230
231    @Override
232    @SuppressWarnings("unchecked")
233    public List<LogEntry> queryLogs(AuditQueryBuilder builder) {
234        // prepare parameters
235        Predicate predicate = builder.predicate();
236        OrderByList orders = builder.orders();
237        long offset = builder.offset();
238        long limit = builder.limit();
239
240        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
241
242        // Perform search
243        List<LogEntry> logEntries;
244        SearchRequest request = createSearchRequest();
245        request.source(source);
246        if (limit == 0) {
247            // return all result -> use the scroll api
248            // offset is not taking into account when querying all results
249            TimeValue keepAlive = TimeValue.timeValueMinutes(1);
250            request.scroll(keepAlive);
251            // the size here is the size of each scrolls
252            source.size(100);
253
254            // run request
255            SearchResponse searchResponse = runRequest(request);
256
257            // Build log entries
258            logEntries = buildLogEntries(searchResponse);
259            // Scroll on next results
260            for (; //
261            searchResponse.getHits().getHits().length > 0
262                    && logEntries.size() < searchResponse.getHits().getTotalHits(); //
263                    searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
264                // Build log entries
265                logEntries.addAll(buildLogEntries(searchResponse));
266            }
267        } else {
268            // return a page -> use a regular search
269            source.from((int) offset).size((int) limit);
270
271            // run request
272            SearchResponse searchResponse = runRequest(request);
273
274            // Build log entries
275            logEntries = buildLogEntries(searchResponse);
276        }
277
278        return logEntries;
279    }
280
281    protected SearchSourceBuilder createSearchRequestSource(Predicate predicate, OrderByList orders) {
282        // create ES query builder
283        QueryBuilder query = createQueryBuilder(predicate);
284
285        // create ES source
286        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100);
287
288        // create sort
289        orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC));
290        return source;
291    }
292
293    protected QueryBuilder createQueryBuilder(Predicate andPredicate) {
294        // cast parameters
295        // current implementation only support a MultiExpression with AND operator
296        @SuppressWarnings("unchecked")
297        List<Predicate> predicates = (List<Predicate>) ((List<?>) ((MultiExpression) andPredicate).values);
298        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
299        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
300
301        QueryBuilder query;
302        if (predicates.isEmpty()) {
303            query = QueryBuilders.matchAllQuery();
304        } else {
305            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
306            for (Predicate predicate : predicates) {
307                String leftName = getFieldName.apply(predicate.lvalue);
308                Operator operator = predicate.operator;
309                Object rightValue = Literals.valueOf(predicate.rvalue);
310                if (Operator.EQ.equals(operator)) {
311                    boolQuery.must(QueryBuilders.termQuery(leftName, rightValue));
312                } else if (Operator.NOTEQ.equals(operator)) {
313                    boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue));
314                } else if (Operator.LT.equals(operator)) {
315                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue));
316                } else if (Operator.LTEQ.equals(operator)) {
317                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue));
318                } else if (Operator.GTEQ.equals(operator)) {
319                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue));
320                } else if (Operator.GT.equals(operator)) {
321                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue));
322                } else if (Operator.IN.equals(operator)) {
323                    boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue));
324                } else if (Operator.STARTSWITH.equals(operator)) {
325                    boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue));
326                }
327            }
328            query = boolQuery;
329        }
330        return query;
331    }
332
333    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
334        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
335        ObjectMapper mapper = new ObjectMapper();
336        for (SearchHit hit : searchResponse.getHits()) {
337            try {
338                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
339            } catch (IOException e) {
340                log.error("Error while reading Audit Entry from ES", e);
341            }
342        }
343        return entries;
344    }
345
346    protected SearchRequest createSearchRequest() {
347        return new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
348                                                  .searchType(SearchType.DFS_QUERY_THEN_FETCH);
349    }
350
351    @Override
352    public LogEntry getLogEntryByID(long id) {
353        GetResponse ret = esClient.get(
354                new GetRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id)));
355        if (!ret.isExists()) {
356            return null;
357        }
358        try {
359            return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class);
360        } catch (IOException e) {
361            throw new NuxeoException("Unable to read Entry for id " + id, e);
362        }
363    }
364
365    public SearchRequest buildQuery(String query, Map<String, Object> params) {
366        if (params != null && params.size() > 0) {
367            query = expandQueryVariables(query, params);
368        }
369        SearchRequest request = createSearchRequest();
370        SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query);
371        return request.source(sourceBuilder);
372    }
373
374    protected SearchSourceBuilder createSearchSourceBuilder(String query) {
375        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
376        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
377        try {
378            try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
379                    new NamedXContentRegistry(searchModule.getNamedXContents()), query)) {
380                searchSourceBuilder.parseXContent(new QueryParseContext(parser));
381            }
382        } catch (IOException | ParsingException e) {
383            log.error("Invalid query: " + query + ": " + e.getMessage(), e);
384            throw new IllegalArgumentException("Bad query: " + query);
385        }
386        return searchSourceBuilder;
387    }
388
389    public String expandQueryVariables(String query, Object[] params) {
390        Map<String, Object> qParams = new HashMap<>();
391        for (int i = 0; i < params.length; i++) {
392            query = query.replaceFirst("\\?", "\\${param" + i + "}");
393            qParams.put("param" + i, params[i]);
394        }
395        return expandQueryVariables(query, qParams);
396    }
397
398    public String expandQueryVariables(String query, Map<String, Object> params) {
399        if (params != null && params.size() > 0) {
400            TextTemplate tmpl = new TextTemplate();
401            for (String key : params.keySet()) {
402                Object val = params.get(key);
403                if (val == null) {
404                    continue;
405                } else if (val instanceof Calendar) {
406                    tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime()));
407                } else if (val instanceof Date) {
408                    tmpl.setVariable(key, Long.toString(((Date) val).getTime()));
409                } else {
410                    tmpl.setVariable(key, val.toString());
411                }
412            }
413            query = tmpl.processText(query);
414        }
415        return query;
416    }
417
418    @Override
419    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
420        SearchRequest request = buildQuery(query, params);
421        if (pageNb > 0) {
422            request.source().from(pageNb * pageSize);
423        }
424        if (pageSize > 0) {
425            request.source().size(pageSize);
426        }
427        SearchResponse searchResponse = runRequest(request);
428        return buildLogEntries(searchResponse);
429    }
430
431    @Override
432    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
433            int pageSize) {
434        SearchRequest request = createSearchRequest();
435        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
436        if (eventIds != null && eventIds.length > 0) {
437            if (eventIds.length == 1) {
438                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
439            } else {
440                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
441            }
442        }
443        if (categories != null && categories.length > 0) {
444            if (categories.length == 1) {
445                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
446            } else {
447                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
448            }
449        }
450        if (path != null) {
451            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
452        }
453
454        if (limit != null) {
455            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit)));
456        }
457        request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder)));
458        if (pageNb > 0) {
459            request.source().from(pageNb * pageSize);
460        }
461        if (pageSize > 0) {
462            request.source().size(pageSize);
463        }
464        SearchResponse searchResponse = runRequest(request);
465        return buildLogEntries(searchResponse);
466    }
467
468    @Override
469    public void addLogEntries(List<LogEntry> entries) {
470
471        if (entries.isEmpty()) {
472            return;
473        }
474
475        BulkRequest bulkRequest = new BulkRequest();
476        JsonFactory factory = new JsonFactory();
477
478        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
479        UIDSequencer seq = uidGeneratorService.getSequencer();
480
481        try {
482
483            for (LogEntry entry : entries) {
484                entry.setId(seq.getNextLong(SEQ_NAME));
485                if (log.isDebugEnabled()) {
486                    log.debug(String.format("Indexing log entry: %s", entry));
487                }
488                entry.setLogDate(new Date());
489                try (OutputStream out = new BytesStreamOutput(); //
490                        JsonGenerator jg = factory.createGenerator(out); //
491                        XContentBuilder builder = jsonBuilder(out)) {
492                    ObjectMapper mapper = new ObjectMapper();
493                    mapper.writeValue(jg, entry);
494                    bulkRequest.add(new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
495                            String.valueOf(entry.getId())).source(builder));
496                }
497            }
498
499            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
500            if (bulkResponse.hasFailures()) {
501                for (BulkItemResponse response : bulkResponse.getItems()) {
502                    if (response.isFailed()) {
503                        log.error("Unable to index audit entry " + response.getItemId() + " :"
504                                + response.getFailureMessage());
505                    }
506                }
507            }
508        } catch (IOException e) {
509            throw new NuxeoException("Error while indexing Audit entries", e);
510        }
511
512    }
513
514    @Override
515    public Long getEventsCount(String eventId) {
516        SearchResponse res = esClient.search(
517                new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE).source(
518                        new SearchSourceBuilder().query(
519                                QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId)))
520                                                 .size(0)));
521        return res.getHits().getTotalHits();
522    }
523
524    @Override
525    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
526        return syncLogCreationEntries(provider, repoId, path, recurs);
527    }
528
529    public SearchResponse search(SearchRequest request) {
530        String[] indices = request.indices();
531        if (indices == null || indices.length != 1) {
532            throw new IllegalStateException("Search on audit must include index name: " + request);
533        }
534        if (!getESIndexName().equals(indices[0])) {
535            throw new IllegalStateException("Search on audit must be on audit index: " + request);
536        }
537        return runRequest(request);
538    }
539
540    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
541
542        if (searchDocumentModel == null) {
543            return QueryBuilders.matchAllQuery();
544        }
545
546        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
547
548        int nbFilters = 0;
549
550        for (PredicateDefinition predicate : predicates) {
551
552            // extract data from DocumentModel
553            PredicateFieldDefinition[] fieldDef = predicate.getValues();
554            Object[] val = new Object[fieldDef.length];
555            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
556                if (fieldDef[fidx].getXpath() != null) {
557                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
558                } else {
559                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
560                }
561            }
562
563            if (!isNonNullParam(val)) {
564                // skip predicate where all values are null
565                continue;
566            }
567
568            nbFilters++;
569
570            String op = predicate.getOperator();
571            if (op.equalsIgnoreCase("IN")) {
572
573                String[] values = null;
574                if (val[0] instanceof Iterable<?>) {
575                    List<String> l = new ArrayList<>();
576                    Iterable<?> vals = (Iterable<?>) val[0];
577
578                    for (Object v : vals) {
579                        if (v != null) {
580                            l.add(v.toString());
581                        }
582                    }
583                    values = l.toArray(new String[l.size()]);
584                } else if (val[0] instanceof Object[]) {
585                    values = (String[]) val[0];
586                }
587                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
588            } else if (op.equalsIgnoreCase("BETWEEN")) {
589                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
590                if (val.length > 1) {
591                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1])));
592                }
593            } else if (">".equals(op)) {
594                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
595            } else if (">=".equals(op)) {
596                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0])));
597            } else if ("<".equals(op)) {
598                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0])));
599            } else if ("<=".equals(op)) {
600                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0])));
601            } else {
602                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0])));
603            }
604        }
605
606        if (nbFilters == 0) {
607            return QueryBuilders.matchAllQuery();
608        }
609        return filterBuilder;
610    }
611
612    protected Object convertDate(Object o) {
613        // Date are convert to timestamp ms which is a known format by default for ES
614        if (o instanceof Calendar) {
615            return Long.valueOf(((Calendar) o).getTime().getTime());
616        } else if (o instanceof Date) {
617            return Long.valueOf(((Date) o).getTime());
618        }
619        return o;
620    }
621
622    public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
623            DocumentModel searchDocumentModel) {
624        SearchRequest request = createSearchRequest();
625        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
626        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
627        request.source(
628                new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)));
629        return request;
630    }
631
632    protected boolean isNonNullParam(Object[] val) {
633        if (val == null) {
634            return false;
635        }
636        for (Object v : val) {
637            if (v != null) {
638                if (v instanceof String) {
639                    if (!((String) v).isEmpty()) {
640                        return true;
641                    }
642                } else if (v instanceof String[]) {
643                    if (((String[]) v).length > 0) {
644                        return true;
645                    }
646                } else {
647                    return true;
648                }
649            }
650        }
651        return false;
652    }
653
654    @SuppressWarnings("deprecation")
655    public String migrate(final int batchSize) {
656
657        final String MIGRATION_WORK_ID = "AuditMigration";
658
659        WorkManager wm = Framework.getService(WorkManager.class);
660        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
661        if (migrationState != null) {
662            return "Migration already scheduled : " + migrationState.toString();
663        }
664
665        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
666        wm.schedule(migrationWork);
667        return "Migration work started : " + MIGRATION_WORK_ID;
668    }
669
670    SearchResponse runRequest(SearchRequest request) {
671        logSearchRequest(request);
672        SearchResponse response = esClient.search(request);
673        logSearchResponse(response);
674        return response;
675    }
676
677    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
678        if (log.isDebugEnabled()) {
679            log.debug(String.format(
680                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
681                    keepAlive, scrollId));
682        }
683        SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive));
684        logSearchResponse(response);
685        return response;
686    }
687
688    protected void logSearchResponse(SearchResponse response) {
689        if (log.isDebugEnabled()) {
690            log.debug("Response: " + response.toString());
691        }
692    }
693
694    protected void logSearchRequest(SearchRequest request) {
695        if (log.isDebugEnabled()) {
696            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
697                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
698        }
699    }
700
701    /**
702     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
703     */
704    protected void ensureUIDSequencer(ESClient esClient) {
705        boolean auditIndexExists = esClient.indexExists(getESIndexName());
706        if (!auditIndexExists) {
707            return;
708        }
709
710        // Get max log entry id
711        SearchRequest request = createSearchRequest();
712        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
713                                                .aggregation(AggregationBuilders.max("maxAgg").field("id")));
714        SearchResponse searchResponse = esClient.search(request);
715        Max agg = searchResponse.getAggregations().get("maxAgg");
716        long maxLogEntryId = (long) agg.getValue();
717
718        // Get next sequence id
719        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
720        UIDSequencer seq = uidGeneratorService.getSequencer();
721        seq.init();
722        long nextSequenceId = seq.getNextLong(SEQ_NAME);
723
724        // Increment sequence to max log entry id if needed
725        if (nextSequenceId < maxLogEntryId) {
726            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
727                    nextSequenceId, maxLogEntryId));
728            seq.initSequence(SEQ_NAME, maxLogEntryId);
729        }
730    }
731
732    @Override
733    public ExtendedInfo newExtendedInfo(Serializable value) {
734        return new ESExtendedInfo(value);
735    }
736
737    protected String getESIndexName() {
738        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
739        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
740    }
741
742    @Override
743    public void append(List<String> jsonEntries) {
744        BulkRequest bulkRequest = new BulkRequest();
745        for (String json : jsonEntries) {
746            try {
747                String entryId = new JSONObject(json).getString(LOG_ID);
748                if (StringUtils.isBlank(entryId)) {
749                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
750                }
751                IndexRequest request = new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, entryId);
752                request.source(json, XContentType.JSON);
753                bulkRequest.add(request);
754            } catch (JSONException e) {
755                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
756            }
757        }
758        esClient.bulk(bulkRequest);
759    }
760
761    @Override
762    public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
763        // prepare parameters
764        Predicate predicate = builder.predicate();
765        OrderByList orders = builder.orders();
766
767        // create source
768        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
769        source.size(batchSize);
770        // create request
771        SearchRequest request = createSearchRequest();
772        request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds));
773        SearchResponse response = runRequest(request);
774        // register cursor
775        String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds));
776        return scroll(scrollId);
777    }
778
779    @Override
780    public ScrollResult<String> scroll(String scrollId) {
781        return cursorService.scroll(scrollId);
782    }
783
784    public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> {
785
786        protected final String scrollId;
787
788        protected boolean end;
789
790        public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) {
791            super(response.getHits().iterator(), batchSize, keepAliveSeconds);
792            this.scrollId = response.getScrollId();
793        }
794
795        @Override
796        public boolean hasNext() {
797            if (cursor == null || end) {
798                return false;
799            } else if (cursor.hasNext()) {
800                return true;
801            } else {
802                runNextScroll();
803                return !end;
804            }
805        }
806
807        @Override
808        public SearchHit next() {
809            if (cursor != null && !cursor.hasNext() && !end) {
810                // try to run a next scroll
811                runNextScroll();
812            }
813            return super.next();
814        }
815
816        protected void runNextScroll() {
817            SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId,
818                    TimeValue.timeValueSeconds(keepAliveSeconds));
819            cursor = response.getHits().iterator();
820            end = !cursor.hasNext();
821        }
822
823        @Override
824        public void close() {
825            ClearScrollRequest request = new ClearScrollRequest();
826            request.addScrollId(scrollId);
827            esClient.clearScroll(request);
828            end = true;
829            // Call super close to clear cursor
830            super.close();
831        }
832
833    }
834
835}