001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION;
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
025
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.Serializable;
029import java.time.ZonedDateTime;
030import java.util.ArrayList;
031import java.util.Calendar;
032import java.util.Collections;
033import java.util.Date;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.function.Function;
039
040import org.apache.commons.lang3.StringUtils;
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.elasticsearch.action.bulk.BulkItemResponse;
044import org.elasticsearch.action.bulk.BulkRequest;
045import org.elasticsearch.action.bulk.BulkResponse;
046import org.elasticsearch.action.get.GetRequest;
047import org.elasticsearch.action.get.GetResponse;
048import org.elasticsearch.action.index.IndexRequest;
049import org.elasticsearch.action.search.ClearScrollRequest;
050import org.elasticsearch.action.search.SearchRequest;
051import org.elasticsearch.action.search.SearchResponse;
052import org.elasticsearch.action.search.SearchScrollRequest;
053import org.elasticsearch.action.search.SearchType;
054import org.elasticsearch.common.ParsingException;
055import org.elasticsearch.common.io.stream.BytesStreamOutput;
056import org.elasticsearch.common.settings.Settings;
057import org.elasticsearch.common.unit.TimeValue;
058import org.elasticsearch.common.xcontent.NamedXContentRegistry;
059import org.elasticsearch.common.xcontent.XContentBuilder;
060import org.elasticsearch.common.xcontent.XContentFactory;
061import org.elasticsearch.common.xcontent.XContentParser;
062import org.elasticsearch.common.xcontent.XContentType;
063import org.elasticsearch.index.query.BoolQueryBuilder;
064import org.elasticsearch.index.query.QueryBuilder;
065import org.elasticsearch.index.query.QueryBuilders;
066import org.elasticsearch.search.SearchHit;
067import org.elasticsearch.search.SearchModule;
068import org.elasticsearch.search.aggregations.AggregationBuilders;
069import org.elasticsearch.search.aggregations.metrics.max.Max;
070import org.elasticsearch.search.builder.SearchSourceBuilder;
071import org.elasticsearch.search.sort.SortOrder;
072import org.json.JSONException;
073import org.json.JSONObject;
074import org.nuxeo.common.utils.TextTemplate;
075import org.nuxeo.ecm.core.api.CursorResult;
076import org.nuxeo.ecm.core.api.CursorService;
077import org.nuxeo.ecm.core.api.DocumentModel;
078import org.nuxeo.ecm.core.api.NuxeoException;
079import org.nuxeo.ecm.core.api.ScrollResult;
080import org.nuxeo.ecm.core.query.sql.model.Literals;
081import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
082import org.nuxeo.ecm.core.query.sql.model.Operand;
083import org.nuxeo.ecm.core.query.sql.model.Operator;
084import org.nuxeo.ecm.core.query.sql.model.OrderByList;
085import org.nuxeo.ecm.core.query.sql.model.Predicate;
086import org.nuxeo.ecm.core.query.sql.model.Reference;
087import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
088import org.nuxeo.ecm.core.uidgen.UIDSequencer;
089import org.nuxeo.ecm.core.work.api.Work;
090import org.nuxeo.ecm.core.work.api.Work.State;
091import org.nuxeo.ecm.core.work.api.WorkManager;
092import org.nuxeo.ecm.platform.audit.api.AuditReader;
093import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
094import org.nuxeo.ecm.platform.audit.api.LogEntry;
095import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
096import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
097import org.nuxeo.ecm.platform.audit.service.AuditBackend;
098import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
099import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
100import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
101import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
102import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
103import org.nuxeo.elasticsearch.ElasticSearchConstants;
104import org.nuxeo.elasticsearch.api.ESClient;
105import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
106import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
107import org.nuxeo.runtime.api.Framework;
108import org.nuxeo.runtime.model.DefaultComponent;
109
110import com.fasterxml.jackson.core.JsonFactory;
111import com.fasterxml.jackson.core.JsonGenerator;
112import com.fasterxml.jackson.databind.ObjectMapper;
113
114/**
115 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
116 *
117 * @author tiry
118 */
119public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
120
121    public static final String SEQ_NAME = "audit";
122
123    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
124
125    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
126
127    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
128
129    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
130
131    protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService;
132
133    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
134        super(component, config);
135    }
136
137    /**
138     * @since 9.3
139     */
140    public ESAuditBackend() {
141        super();
142    }
143
144    protected ESClient esClient;
145
146    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
147
148    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
149
150        @Override
151        public int removeEntries(String eventId, String pathPattern) {
152            throw new UnsupportedOperationException("Not implemented yet!");
153        }
154
155        @Override
156        public void addLogEntry(LogEntry logEntry) {
157            List<LogEntry> entries = new ArrayList<>();
158            entries.add(logEntry);
159            addLogEntries(entries);
160        }
161
162    };
163
164    protected ESClient getClient() {
165        log.info("Activate Elasticsearch backend for Audit");
166        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
167        ESClient client = esa.getClient();
168        ensureUIDSequencer(client);
169        return client;
170    }
171
172    protected boolean isMigrationDone() {
173        AuditReader reader = Framework.getService(AuditReader.class);
174        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
175        return !entries.isEmpty();
176    }
177
178    @Override
179    public int getApplicationStartedOrder() {
180        int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
181                "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder();
182        int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
183                "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder();
184        return Integer.max(elasticOrder, uidgenOrder) + 1;
185    }
186
187    @Override
188    public void onApplicationStarted() {
189        esClient = getClient();
190        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
191            if (!isMigrationDone()) {
192                log.info(String.format(
193                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
194                        MIGRATION_FLAG_PROP));
195                // Drop audit index first in case of a previous bad migration
196                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
197                esa.dropAndInitIndex(getESIndexName());
198                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
199                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
200                if (batchSizeProp != null) {
201                    batchSize = Integer.parseInt(batchSizeProp);
202                }
203                migrate(batchSize);
204            } else {
205                log.warn(String.format(
206                        "Property %s is true but migration is already done, please set this property to false",
207                        MIGRATION_FLAG_PROP));
208            }
209        } else {
210            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
211        }
212        cursorService = new CursorService<>(SearchHit::getSourceAsString);
213    }
214
215    @Override
216    public void onApplicationStopped() {
217        if (esClient == null) {
218            return;
219        }
220        try {
221            esClient.close();
222            cursorService.clear();
223        } catch (Exception e) {
224            log.warn("Fail to close esClient: " + e.getMessage(), e);
225        } finally {
226            esClient = null;
227            cursorService = null;
228        }
229    }
230
231    @Override
232    @SuppressWarnings("unchecked")
233    public List<LogEntry> queryLogs(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder) {
234        // prepare parameters
235        MultiExpression predicate = builder.predicate();
236        OrderByList orders = builder.orders();
237        long offset = builder.offset();
238        long limit = builder.limit();
239
240        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
241
242        // Perform search
243        List<LogEntry> logEntries;
244        SearchRequest request = createSearchRequest();
245        request.source(source);
246        if (limit == 0) {
247            // return all result -> use the scroll api
248            // offset is not taking into account when querying all results
249            TimeValue keepAlive = TimeValue.timeValueMinutes(1);
250            request.scroll(keepAlive);
251            // the size here is the size of each scrolls
252            source.size(100);
253
254            // run request
255            SearchResponse searchResponse = runRequest(request);
256
257            // Build log entries
258            logEntries = buildLogEntries(searchResponse);
259            // Scroll on next results
260            for (; //
261            searchResponse.getHits().getHits().length > 0
262                    && logEntries.size() < searchResponse.getHits().getTotalHits(); //
263                    searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
264                // Build log entries
265                logEntries.addAll(buildLogEntries(searchResponse));
266            }
267        } else {
268            // return a page -> use a regular search
269            source.from((int) offset).size((int) limit);
270
271            // run request
272            SearchResponse searchResponse = runRequest(request);
273
274            // Build log entries
275            logEntries = buildLogEntries(searchResponse);
276        }
277
278        return logEntries;
279    }
280
281    protected SearchSourceBuilder createSearchRequestSource(MultiExpression predicate, OrderByList orders) {
282        // create ES query builder
283        QueryBuilder query = createQueryBuilder(predicate);
284
285        // create ES source
286        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100);
287
288        // create sort
289        orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC));
290        return source;
291    }
292
293    protected QueryBuilder createQueryBuilder(MultiExpression andPredicate) {
294        // cast parameters
295        // current implementation only support a MultiExpression with AND operator
296        List<Predicate> predicates = andPredicate.predicates;
297        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
298        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
299
300        QueryBuilder query;
301        if (predicates.isEmpty()) {
302            query = QueryBuilders.matchAllQuery();
303        } else {
304            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
305            for (Predicate predicate : predicates) {
306                String leftName = getFieldName.apply(predicate.lvalue);
307                Operator operator = predicate.operator;
308                Object rightValue = Literals.valueOf(predicate.rvalue);
309                if (rightValue instanceof ZonedDateTime) {
310                    // The ZonedDateTime representation is not compatible with Elasticsearch query
311                    rightValue = ((ZonedDateTime) rightValue).toOffsetDateTime();
312                }
313                if (Operator.EQ.equals(operator)) {
314                    boolQuery.must(QueryBuilders.termQuery(leftName, rightValue));
315                } else if (Operator.NOTEQ.equals(operator)) {
316                    boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue));
317                } else if (Operator.LT.equals(operator)) {
318                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue));
319                } else if (Operator.LTEQ.equals(operator)) {
320                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue));
321                } else if (Operator.GTEQ.equals(operator)) {
322                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue));
323                } else if (Operator.GT.equals(operator)) {
324                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue));
325                } else if (Operator.IN.equals(operator)) {
326                    boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue));
327                } else if (Operator.STARTSWITH.equals(operator)) {
328                    boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue));
329                }
330            }
331            query = boolQuery;
332        }
333        return query;
334    }
335
336    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
337        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
338        ObjectMapper mapper = new ObjectMapper();
339        for (SearchHit hit : searchResponse.getHits()) {
340            try {
341                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
342            } catch (IOException e) {
343                log.error("Error while reading Audit Entry from ES", e);
344            }
345        }
346        return entries;
347    }
348
349    protected SearchRequest createSearchRequest() {
350        return new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH);
351    }
352
353    @Override
354    public LogEntry getLogEntryByID(long id) {
355        GetResponse ret = esClient.get(
356                new GetRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id)));
357        if (!ret.isExists()) {
358            return null;
359        }
360        try {
361            return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class);
362        } catch (IOException e) {
363            throw new NuxeoException("Unable to read Entry for id " + id, e);
364        }
365    }
366
367    public SearchRequest buildQuery(String query, Map<String, Object> params) {
368        if (params != null && params.size() > 0) {
369            query = expandQueryVariables(query, params);
370        }
371        SearchRequest request = createSearchRequest();
372        SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query);
373        return request.source(sourceBuilder);
374    }
375
376    protected SearchSourceBuilder createSearchSourceBuilder(String query) {
377        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
378        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
379        try {
380            try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
381                    new NamedXContentRegistry(searchModule.getNamedXContents()), THROW_UNSUPPORTED_OPERATION, query)) {
382                searchSourceBuilder.parseXContent(parser);
383            }
384        } catch (IOException | ParsingException e) {
385            log.error("Invalid query: " + query + ": " + e.getMessage(), e);
386            throw new IllegalArgumentException("Bad query: " + query);
387        }
388        return searchSourceBuilder;
389    }
390
391    public String expandQueryVariables(String query, Object[] params) {
392        Map<String, Object> qParams = new HashMap<>();
393        for (int i = 0; i < params.length; i++) {
394            query = query.replaceFirst("\\?", "\\${param" + i + "}");
395            qParams.put("param" + i, params[i]);
396        }
397        return expandQueryVariables(query, qParams);
398    }
399
400    public String expandQueryVariables(String query, Map<String, Object> params) {
401        if (params != null && params.size() > 0) {
402            TextTemplate tmpl = new TextTemplate();
403            for (String key : params.keySet()) {
404                Object val = params.get(key);
405                if (val == null) {
406                    continue;
407                } else if (val instanceof Calendar) {
408                    tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime()));
409                } else if (val instanceof Date) {
410                    tmpl.setVariable(key, Long.toString(((Date) val).getTime()));
411                } else {
412                    tmpl.setVariable(key, val.toString());
413                }
414            }
415            query = tmpl.processText(query);
416        }
417        return query;
418    }
419
420    @Override
421    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
422        SearchRequest request = buildQuery(query, params);
423        if (pageNb > 0) {
424            request.source().from(pageNb * pageSize);
425        }
426        if (pageSize > 0) {
427            request.source().size(pageSize);
428        }
429        SearchResponse searchResponse = runRequest(request);
430        return buildLogEntries(searchResponse);
431    }
432
433    @Override
434    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
435            int pageSize) {
436        SearchRequest request = createSearchRequest();
437        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
438        if (eventIds != null && eventIds.length > 0) {
439            if (eventIds.length == 1) {
440                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
441            } else {
442                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
443            }
444        }
445        if (categories != null && categories.length > 0) {
446            if (categories.length == 1) {
447                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
448            } else {
449                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
450            }
451        }
452        if (path != null) {
453            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
454        }
455
456        if (limit != null) {
457            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit)));
458        }
459        request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder)));
460        if (pageNb > 0) {
461            request.source().from(pageNb * pageSize);
462        }
463        if (pageSize > 0) {
464            request.source().size(pageSize);
465        }
466        SearchResponse searchResponse = runRequest(request);
467        return buildLogEntries(searchResponse);
468    }
469
470    @Override
471    public void addLogEntries(List<LogEntry> entries) {
472
473        if (entries.isEmpty()) {
474            return;
475        }
476
477        BulkRequest bulkRequest = new BulkRequest();
478        JsonFactory factory = new JsonFactory();
479
480        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
481        UIDSequencer seq = uidGeneratorService.getSequencer();
482
483        try {
484            List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size());
485            for (int i = 0; i < entries.size(); i++) {
486                LogEntry entry = entries.get(i);
487                entry.setId(block.get(i));
488                if (log.isDebugEnabled()) {
489                    log.debug(String.format("Indexing log entry: %s", entry));
490                }
491                entry.setLogDate(new Date());
492                try (OutputStream out = new BytesStreamOutput(); //
493                        JsonGenerator jg = factory.createGenerator(out); //
494                        XContentBuilder builder = jsonBuilder(out)) {
495                    ObjectMapper mapper = new ObjectMapper();
496                    mapper.writeValue(jg, entry);
497                    bulkRequest.add(new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
498                            String.valueOf(entry.getId())).source(builder));
499                }
500            }
501
502            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
503            if (bulkResponse.hasFailures()) {
504                for (BulkItemResponse response : bulkResponse.getItems()) {
505                    if (response.isFailed()) {
506                        log.error("Unable to index audit entry " + response.getItemId() + " :"
507                                + response.getFailureMessage());
508                    }
509                }
510            }
511        } catch (IOException e) {
512            throw new NuxeoException("Error while indexing Audit entries", e);
513        }
514
515    }
516
517    @Override
518    public Long getEventsCount(String eventId) {
519        SearchResponse res = esClient.search(
520                new SearchRequest(getESIndexName()).source(
521                        new SearchSourceBuilder().query(
522                                QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId)))
523                                                 .size(0)));
524        return res.getHits().getTotalHits();
525    }
526
527    @Override
528    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
529        return syncLogCreationEntries(provider, repoId, path, recurs);
530    }
531
532    public SearchResponse search(SearchRequest request) {
533        String[] indices = request.indices();
534        if (indices == null || indices.length != 1) {
535            throw new IllegalStateException("Search on audit must include index name: " + request);
536        }
537        if (!getESIndexName().equals(indices[0])) {
538            throw new IllegalStateException("Search on audit must be on audit index: " + request);
539        }
540        return runRequest(request);
541    }
542
543    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
544
545        if (searchDocumentModel == null) {
546            return QueryBuilders.matchAllQuery();
547        }
548
549        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
550
551        int nbFilters = 0;
552
553        for (PredicateDefinition predicate : predicates) {
554
555            // extract data from DocumentModel
556            PredicateFieldDefinition[] fieldDef = predicate.getValues();
557            Object[] val = new Object[fieldDef.length];
558            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
559                if (fieldDef[fidx].getXpath() != null) {
560                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
561                } else {
562                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
563                }
564            }
565
566            if (!isNonNullParam(val)) {
567                // skip predicate where all values are null
568                continue;
569            }
570
571            nbFilters++;
572
573            String op = predicate.getOperator();
574            if (op.equalsIgnoreCase("IN")) {
575
576                String[] values = null;
577                if (val[0] instanceof Iterable<?>) {
578                    List<String> l = new ArrayList<>();
579                    Iterable<?> vals = (Iterable<?>) val[0];
580
581                    for (Object v : vals) {
582                        if (v != null) {
583                            l.add(v.toString());
584                        }
585                    }
586                    values = l.toArray(new String[l.size()]);
587                } else if (val[0] instanceof Object[]) {
588                    values = (String[]) val[0];
589                }
590                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
591            } else if (op.equalsIgnoreCase("BETWEEN")) {
592                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
593                if (val.length > 1) {
594                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1])));
595                }
596            } else if (">".equals(op)) {
597                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
598            } else if (">=".equals(op)) {
599                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0])));
600            } else if ("<".equals(op)) {
601                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0])));
602            } else if ("<=".equals(op)) {
603                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0])));
604            } else {
605                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0])));
606            }
607        }
608
609        if (nbFilters == 0) {
610            return QueryBuilders.matchAllQuery();
611        }
612        return filterBuilder;
613    }
614
615    protected Object convertDate(Object o) {
616        // Date are convert to timestamp ms which is a known format by default for ES
617        if (o instanceof Calendar) {
618            return Long.valueOf(((Calendar) o).getTime().getTime());
619        } else if (o instanceof Date) {
620            return Long.valueOf(((Date) o).getTime());
621        }
622        return o;
623    }
624
625    public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
626            DocumentModel searchDocumentModel) {
627        SearchRequest request = createSearchRequest();
628        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
629        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
630        request.source(
631                new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)));
632        return request;
633    }
634
635    protected boolean isNonNullParam(Object[] val) {
636        if (val == null) {
637            return false;
638        }
639        for (Object v : val) {
640            if (v != null) {
641                if (v instanceof String) {
642                    if (!((String) v).isEmpty()) {
643                        return true;
644                    }
645                } else if (v instanceof String[]) {
646                    if (((String[]) v).length > 0) {
647                        return true;
648                    }
649                } else {
650                    return true;
651                }
652            }
653        }
654        return false;
655    }
656
657    @SuppressWarnings("deprecation")
658    public String migrate(final int batchSize) {
659
660        final String MIGRATION_WORK_ID = "AuditMigration";
661
662        WorkManager wm = Framework.getService(WorkManager.class);
663        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
664        if (migrationState != null) {
665            return "Migration already scheduled : " + migrationState.toString();
666        }
667
668        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
669        wm.schedule(migrationWork);
670        return "Migration work started : " + MIGRATION_WORK_ID;
671    }
672
673    SearchResponse runRequest(SearchRequest request) {
674        logSearchRequest(request);
675        SearchResponse response = esClient.search(request);
676        logSearchResponse(response);
677        return response;
678    }
679
680    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
681        if (log.isDebugEnabled()) {
682            log.debug(String.format(
683                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
684                    keepAlive, scrollId));
685        }
686        SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive));
687        logSearchResponse(response);
688        return response;
689    }
690
691    protected void logSearchResponse(SearchResponse response) {
692        if (log.isDebugEnabled()) {
693            log.debug("Response: " + response.toString());
694        }
695    }
696
697    protected void logSearchRequest(SearchRequest request) {
698        if (log.isDebugEnabled()) {
699            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
700                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
701        }
702    }
703
704    /**
705     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
706     */
707    protected void ensureUIDSequencer(ESClient esClient) {
708        boolean auditIndexExists = esClient.indexExists(getESIndexName());
709        if (!auditIndexExists) {
710            return;
711        }
712
713        // Get max log entry id
714        SearchRequest request = createSearchRequest();
715        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
716                                                .aggregation(AggregationBuilders.max("maxAgg").field("id")));
717        SearchResponse searchResponse = esClient.search(request);
718        Max agg = searchResponse.getAggregations().get("maxAgg");
719        long maxLogEntryId = (long) agg.getValue();
720
721        // Get next sequence id
722        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
723        UIDSequencer seq = uidGeneratorService.getSequencer();
724        seq.init();
725        long nextSequenceId = seq.getNextLong(SEQ_NAME);
726
727        // Increment sequence to max log entry id if needed
728        if (nextSequenceId < maxLogEntryId) {
729            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
730                    nextSequenceId, maxLogEntryId));
731            seq.initSequence(SEQ_NAME, maxLogEntryId);
732        }
733    }
734
735    @Override
736    public ExtendedInfo newExtendedInfo(Serializable value) {
737        return new ESExtendedInfo(value);
738    }
739
740    protected String getESIndexName() {
741        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
742        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
743    }
744
745    @Override
746    public void append(List<String> jsonEntries) {
747        BulkRequest bulkRequest = new BulkRequest();
748        for (String json : jsonEntries) {
749            try {
750                String entryId = new JSONObject(json).getString(LOG_ID);
751                if (StringUtils.isBlank(entryId)) {
752                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
753                }
754                IndexRequest request = new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, entryId);
755                request.source(json, XContentType.JSON);
756                bulkRequest.add(request);
757            } catch (JSONException e) {
758                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
759            }
760        }
761        esClient.bulk(bulkRequest);
762    }
763
764    @Override
765    public ScrollResult<String> scroll(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder, int batchSize,
766            int keepAliveSeconds) {
767        // prepare parameters
768        MultiExpression predicate = builder.predicate();
769        OrderByList orders = builder.orders();
770
771        // create source
772        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
773        source.size(batchSize);
774        // create request
775        SearchRequest request = createSearchRequest();
776        request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds));
777        SearchResponse response = runRequest(request);
778        // register cursor
779        String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds));
780        return scroll(scrollId);
781    }
782
783    @Override
784    public ScrollResult<String> scroll(String scrollId) {
785        return cursorService.scroll(scrollId);
786    }
787
788    public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> {
789
790        protected final String scrollId;
791
792        protected boolean end;
793
794        public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) {
795            super(response.getHits().iterator(), batchSize, keepAliveSeconds);
796            this.scrollId = response.getScrollId();
797        }
798
799        @Override
800        public boolean hasNext() {
801            if (cursor == null || end) {
802                return false;
803            } else if (cursor.hasNext()) {
804                return true;
805            } else {
806                runNextScroll();
807                return !end;
808            }
809        }
810
811        @Override
812        public SearchHit next() {
813            if (cursor != null && !cursor.hasNext() && !end) {
814                // try to run a next scroll
815                runNextScroll();
816            }
817            return super.next();
818        }
819
820        protected void runNextScroll() {
821            SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId,
822                    TimeValue.timeValueSeconds(keepAliveSeconds));
823            cursor = response.getHits().iterator();
824            end = !cursor.hasNext();
825        }
826
827        @Override
828        public void close() {
829            ClearScrollRequest request = new ClearScrollRequest();
830            request.addScrollId(scrollId);
831            esClient.clearScroll(request);
832            end = true;
833            // Call super close to clear cursor
834            super.close();
835        }
836
837    }
838
839}