001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION;
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
025
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.Serializable;
029import java.time.ZonedDateTime;
030import java.util.ArrayList;
031import java.util.Calendar;
032import java.util.Collections;
033import java.util.Date;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.function.Function;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.elasticsearch.action.bulk.BulkItemResponse;
043import org.elasticsearch.action.bulk.BulkRequest;
044import org.elasticsearch.action.bulk.BulkResponse;
045import org.elasticsearch.action.get.GetRequest;
046import org.elasticsearch.action.get.GetResponse;
047import org.elasticsearch.action.index.IndexRequest;
048import org.elasticsearch.action.search.ClearScrollRequest;
049import org.elasticsearch.action.search.SearchRequest;
050import org.elasticsearch.action.search.SearchResponse;
051import org.elasticsearch.action.search.SearchScrollRequest;
052import org.elasticsearch.action.search.SearchType;
053import org.elasticsearch.common.ParsingException;
054import org.elasticsearch.common.io.stream.BytesStreamOutput;
055import org.elasticsearch.common.settings.Settings;
056import org.elasticsearch.common.unit.TimeValue;
057import org.elasticsearch.common.xcontent.NamedXContentRegistry;
058import org.elasticsearch.common.xcontent.XContentBuilder;
059import org.elasticsearch.common.xcontent.XContentFactory;
060import org.elasticsearch.common.xcontent.XContentParser;
061import org.elasticsearch.common.xcontent.XContentType;
062import org.elasticsearch.index.query.BoolQueryBuilder;
063import org.elasticsearch.index.query.QueryBuilder;
064import org.elasticsearch.index.query.QueryBuilders;
065import org.elasticsearch.search.SearchHit;
066import org.elasticsearch.search.SearchModule;
067import org.elasticsearch.search.aggregations.Aggregation;
068import org.elasticsearch.search.aggregations.AggregationBuilders;
069import org.elasticsearch.search.builder.SearchSourceBuilder;
070import org.elasticsearch.search.sort.SortOrder;
071import org.json.JSONException;
072import org.json.JSONObject;
073import org.nuxeo.common.utils.TextTemplate;
074import org.nuxeo.ecm.core.api.CursorResult;
075import org.nuxeo.ecm.core.api.CursorService;
076import org.nuxeo.ecm.core.api.DocumentModel;
077import org.nuxeo.ecm.core.api.NuxeoException;
078import org.nuxeo.ecm.core.api.ScrollResult;
079import org.nuxeo.ecm.core.query.sql.model.Literals;
080import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
081import org.nuxeo.ecm.core.query.sql.model.Operand;
082import org.nuxeo.ecm.core.query.sql.model.Operator;
083import org.nuxeo.ecm.core.query.sql.model.OrderByList;
084import org.nuxeo.ecm.core.query.sql.model.Predicate;
085import org.nuxeo.ecm.core.query.sql.model.Reference;
086import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
087import org.nuxeo.ecm.core.uidgen.UIDSequencer;
088import org.nuxeo.ecm.core.work.api.Work;
089import org.nuxeo.ecm.core.work.api.Work.State;
090import org.nuxeo.ecm.core.work.api.WorkManager;
091import org.nuxeo.ecm.platform.audit.api.AuditReader;
092import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
093import org.nuxeo.ecm.platform.audit.api.LogEntry;
094import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
095import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
096import org.nuxeo.ecm.platform.audit.service.AuditBackend;
097import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
098import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
099import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
100import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
101import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
102import org.nuxeo.elasticsearch.ElasticSearchConstants;
103import org.nuxeo.elasticsearch.api.ESClient;
104import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
105import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
106import org.nuxeo.runtime.api.Framework;
107import org.nuxeo.runtime.model.DefaultComponent;
108
109import com.fasterxml.jackson.core.JsonFactory;
110import com.fasterxml.jackson.core.JsonGenerator;
111import com.fasterxml.jackson.databind.ObjectMapper;
112
113/**
114 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
115 *
116 * @author tiry
117 */
118public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
119
120    public static final String SEQ_NAME = "audit";
121
122    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
123
124    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
125
126    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
127
128    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
129
130    protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService;
131
132    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
133        super(component, config);
134    }
135
136    /**
137     * @since 9.3
138     */
139    public ESAuditBackend() {
140        super();
141    }
142
143    protected ESClient esClient;
144
145    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
146
147    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
148
149        @Override
150        public int removeEntries(String eventId, String pathPattern) {
151            throw new UnsupportedOperationException("Not implemented yet!");
152        }
153
154        @Override
155        public void addLogEntry(LogEntry logEntry) {
156            List<LogEntry> entries = new ArrayList<>();
157            entries.add(logEntry);
158            addLogEntries(entries);
159        }
160
161    };
162
163    protected ESClient getClient() {
164        log.info("Activate Elasticsearch backend for Audit");
165        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
166        ESClient client = esa.getClient();
167        ensureUIDSequencer(client);
168        return client;
169    }
170
171    protected boolean isMigrationDone() {
172        AuditReader reader = Framework.getService(AuditReader.class);
173        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
174        return !entries.isEmpty();
175    }
176
177    @Override
178    public int getApplicationStartedOrder() {
179        int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
180                "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder();
181        int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
182                "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder();
183        return Integer.max(elasticOrder, uidgenOrder) + 1;
184    }
185
186    @Override
187    public void onApplicationStarted() {
188        esClient = getClient();
189        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
190            if (!isMigrationDone()) {
191                log.info(String.format(
192                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
193                        MIGRATION_FLAG_PROP));
194                // Drop audit index first in case of a previous bad migration
195                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
196                esa.dropAndInitIndex(getESIndexName());
197                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
198                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
199                if (batchSizeProp != null) {
200                    batchSize = Integer.parseInt(batchSizeProp);
201                }
202                migrate(batchSize);
203            } else {
204                log.warn(String.format(
205                        "Property %s is true but migration is already done, please set this property to false",
206                        MIGRATION_FLAG_PROP));
207            }
208        } else {
209            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
210        }
211        cursorService = new CursorService<>(SearchHit::getSourceAsString);
212    }
213
214    @Override
215    public void onApplicationStopped() {
216        if (esClient == null) {
217            return;
218        }
219        try {
220            esClient.close();
221            cursorService.clear();
222        } catch (Exception e) {
223            log.warn("Fail to close esClient: " + e.getMessage(), e);
224        } finally {
225            esClient = null;
226            cursorService = null;
227        }
228    }
229
230    @Override
231    @SuppressWarnings("unchecked")
232    public List<LogEntry> queryLogs(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder) {
233        // prepare parameters
234        MultiExpression predicate = builder.predicate();
235        OrderByList orders = builder.orders();
236        long offset = builder.offset();
237        long limit = builder.limit();
238
239        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
240
241        // Perform search
242        List<LogEntry> logEntries;
243        SearchRequest request = createSearchRequest();
244        request.source(source);
245        if (limit == 0) {
246            // return all result -> use the scroll api
247            // offset is not taking into account when querying all results
248            TimeValue keepAlive = TimeValue.timeValueMinutes(1);
249            request.scroll(keepAlive);
250            // the size here is the size of each scrolls
251            source.size(100);
252
253            // run request
254            SearchResponse searchResponse = runRequest(request);
255
256            // Build log entries
257            logEntries = buildLogEntries(searchResponse);
258            // Scroll on next results
259            for (; //
260            searchResponse.getHits().getHits().length > 0
261                            && logEntries.size() < searchResponse.getHits().getTotalHits().value; //
262                    searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
263                // Build log entries
264                logEntries.addAll(buildLogEntries(searchResponse));
265            }
266        } else {
267            // return a page -> use a regular search
268            source.from((int) offset).size((int) limit);
269
270            // run request
271            SearchResponse searchResponse = runRequest(request);
272
273            // Build log entries
274            logEntries = buildLogEntries(searchResponse);
275        }
276
277        return logEntries;
278    }
279
280    protected SearchSourceBuilder createSearchRequestSource(MultiExpression predicate, OrderByList orders) {
281        // create ES query builder
282        QueryBuilder query = createQueryBuilder(predicate);
283
284        // create ES source
285        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100);
286
287        // create sort
288        orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC));
289        return source;
290    }
291
292    protected QueryBuilder createQueryBuilder(MultiExpression andPredicate) {
293        // cast parameters
294        // current implementation only support a MultiExpression with AND operator
295        List<Predicate> predicates = andPredicate.predicates;
296        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
297        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
298
299        QueryBuilder query;
300        if (predicates.isEmpty()) {
301            query = QueryBuilders.matchAllQuery();
302        } else {
303            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
304            for (Predicate predicate : predicates) {
305                String leftName = getFieldName.apply(predicate.lvalue);
306                Operator operator = predicate.operator;
307                Object rightValue = Literals.valueOf(predicate.rvalue);
308                if (rightValue instanceof ZonedDateTime) {
309                    // The ZonedDateTime representation is not compatible with Elasticsearch query
310                    rightValue = ((ZonedDateTime) rightValue).toEpochSecond();
311                }
312                if (Operator.EQ.equals(operator)) {
313                    boolQuery.must(QueryBuilders.termQuery(leftName, rightValue));
314                } else if (Operator.NOTEQ.equals(operator)) {
315                    boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue));
316                } else if (Operator.LT.equals(operator)) {
317                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue));
318                } else if (Operator.LTEQ.equals(operator)) {
319                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue));
320                } else if (Operator.GTEQ.equals(operator)) {
321                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue));
322                } else if (Operator.GT.equals(operator)) {
323                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue));
324                } else if (Operator.IN.equals(operator)) {
325                    boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue));
326                } else if (Operator.STARTSWITH.equals(operator)) {
327                    boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue));
328                }
329            }
330            query = boolQuery;
331        }
332        return query;
333    }
334
335    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
336        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
337        ObjectMapper mapper = new ObjectMapper();
338        for (SearchHit hit : searchResponse.getHits()) {
339            try {
340                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
341            } catch (IOException e) {
342                log.error("Error while reading Audit Entry from ES", e);
343            }
344        }
345        return entries;
346    }
347
348    protected SearchRequest createSearchRequest() {
349        return new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH);
350    }
351
352    @Override
353    public LogEntry getLogEntryByID(long id) {
354        GetResponse ret = esClient.get(
355                new GetRequest(getESIndexName(), String.valueOf(id)));
356        if (!ret.isExists()) {
357            return null;
358        }
359        try {
360            return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class);
361        } catch (IOException e) {
362            throw new NuxeoException("Unable to read Entry for id " + id, e);
363        }
364    }
365
366    public SearchRequest buildQuery(String query, Map<String, Object> params) {
367        if (params != null && params.size() > 0) {
368            query = expandQueryVariables(query, params);
369        }
370        SearchRequest request = createSearchRequest();
371        SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query);
372        return request.source(sourceBuilder);
373    }
374
375    protected SearchSourceBuilder createSearchSourceBuilder(String query) {
376        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
377        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
378        try {
379            try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
380                    new NamedXContentRegistry(searchModule.getNamedXContents()), THROW_UNSUPPORTED_OPERATION, query)) {
381                searchSourceBuilder.parseXContent(parser);
382            }
383        } catch (IOException | ParsingException e) {
384            log.error("Invalid query: " + query + ": " + e.getMessage(), e);
385            throw new IllegalArgumentException("Bad query: " + query);
386        }
387        return searchSourceBuilder;
388    }
389
390    public String expandQueryVariables(String query, Object[] params) {
391        Map<String, Object> qParams = new HashMap<>();
392        for (int i = 0; i < params.length; i++) {
393            query = query.replaceFirst("\\?", "\\${param" + i + "}");
394            qParams.put("param" + i, params[i]);
395        }
396        return expandQueryVariables(query, qParams);
397    }
398
399    public String expandQueryVariables(String query, Map<String, Object> params) {
400        if (params != null && params.size() > 0) {
401            TextTemplate tmpl = new TextTemplate();
402            for (String key : params.keySet()) {
403                Object val = params.get(key);
404                if (val == null) {
405                    continue;
406                } else if (val instanceof Calendar) {
407                    tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime()));
408                } else if (val instanceof Date) {
409                    tmpl.setVariable(key, Long.toString(((Date) val).getTime()));
410                } else {
411                    tmpl.setVariable(key, val.toString());
412                }
413            }
414            query = tmpl.processText(query);
415        }
416        return query;
417    }
418
419    @Override
420    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
421        SearchRequest request = buildQuery(query, params);
422        if (pageNb > 0) {
423            request.source().from(pageNb * pageSize);
424        }
425        if (pageSize > 0) {
426            request.source().size(pageSize);
427        }
428        SearchResponse searchResponse = runRequest(request);
429        return buildLogEntries(searchResponse);
430    }
431
432    @Override
433    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
434            int pageSize) {
435        SearchRequest request = createSearchRequest();
436        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
437        if (eventIds != null && eventIds.length > 0) {
438            if (eventIds.length == 1) {
439                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
440            } else {
441                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
442            }
443        }
444        if (categories != null && categories.length > 0) {
445            if (categories.length == 1) {
446                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
447            } else {
448                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
449            }
450        }
451        if (path != null) {
452            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
453        }
454
455        if (limit != null) {
456            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit)));
457        }
458        request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder)));
459        if (pageNb > 0) {
460            request.source().from(pageNb * pageSize);
461        }
462        if (pageSize > 0) {
463            request.source().size(pageSize);
464        }
465        SearchResponse searchResponse = runRequest(request);
466        return buildLogEntries(searchResponse);
467    }
468
469    @Override
470    public void addLogEntries(List<LogEntry> entries) {
471
472        if (entries.isEmpty()) {
473            return;
474        }
475
476        BulkRequest bulkRequest = new BulkRequest();
477        JsonFactory factory = new JsonFactory();
478
479        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
480        UIDSequencer seq = uidGeneratorService.getSequencer();
481
482        try {
483            List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size());
484            for (int i = 0; i < entries.size(); i++) {
485                LogEntry entry = entries.get(i);
486                entry.setId(block.get(i));
487                if (log.isDebugEnabled()) {
488                    log.debug(String.format("Indexing log entry: %s", entry));
489                }
490                entry.setLogDate(new Date());
491                try (OutputStream out = new BytesStreamOutput(); //
492                        JsonGenerator jg = factory.createGenerator(out); //
493                        XContentBuilder builder = jsonBuilder(out)) {
494                    ObjectMapper mapper = new ObjectMapper();
495                    mapper.writeValue(jg, entry);
496                    bulkRequest.add(
497                            new IndexRequest(getESIndexName()).id(String.valueOf(entry.getId())).source(builder));
498                }
499            }
500
501            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
502            if (bulkResponse.hasFailures()) {
503                for (BulkItemResponse response : bulkResponse.getItems()) {
504                    if (response.isFailed()) {
505                        log.error("Unable to index audit entry " + response.getItemId() + " :"
506                                + response.getFailureMessage());
507                    }
508                }
509            }
510        } catch (IOException e) {
511            throw new NuxeoException("Error while indexing Audit entries", e);
512        }
513
514    }
515
516    @Override
517    public Long getEventsCount(String eventId) {
518        SearchResponse res = esClient.search(
519                new SearchRequest(getESIndexName()).source(
520                        new SearchSourceBuilder().query(
521                                QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId)))
522                                                 .size(0)));
523        return res.getHits().getTotalHits().value;
524    }
525
526    @Override
527    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
528        return syncLogCreationEntries(provider, repoId, path, recurs);
529    }
530
531    public SearchResponse search(SearchRequest request) {
532        String[] indices = request.indices();
533        if (indices == null || indices.length != 1) {
534            throw new IllegalStateException("Search on audit must include index name: " + request);
535        }
536        if (!getESIndexName().equals(indices[0])) {
537            throw new IllegalStateException("Search on audit must be on audit index: " + request);
538        }
539        return runRequest(request);
540    }
541
542    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
543
544        if (searchDocumentModel == null) {
545            return QueryBuilders.matchAllQuery();
546        }
547
548        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
549
550        int nbFilters = 0;
551
552        for (PredicateDefinition predicate : predicates) {
553
554            // extract data from DocumentModel
555            PredicateFieldDefinition[] fieldDef = predicate.getValues();
556            Object[] val = new Object[fieldDef.length];
557            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
558                if (fieldDef[fidx].getXpath() != null) {
559                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
560                } else {
561                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
562                }
563            }
564
565            if (!isNonNullParam(val)) {
566                // skip predicate where all values are null
567                continue;
568            }
569
570            nbFilters++;
571
572            String op = predicate.getOperator();
573            if (op.equalsIgnoreCase("IN")) {
574
575                String[] values = null;
576                if (val[0] instanceof Iterable<?>) {
577                    List<String> l = new ArrayList<>();
578                    Iterable<?> vals = (Iterable<?>) val[0];
579
580                    for (Object v : vals) {
581                        if (v != null) {
582                            l.add(v.toString());
583                        }
584                    }
585                    values = l.toArray(new String[l.size()]);
586                } else if (val[0] instanceof Object[]) {
587                    values = (String[]) val[0];
588                }
589                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
590            } else if (op.equalsIgnoreCase("BETWEEN")) {
591                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
592                if (val.length > 1) {
593                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1])));
594                }
595            } else if (">".equals(op)) {
596                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
597            } else if (">=".equals(op)) {
598                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0])));
599            } else if ("<".equals(op)) {
600                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0])));
601            } else if ("<=".equals(op)) {
602                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0])));
603            } else {
604                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0])));
605            }
606        }
607
608        if (nbFilters == 0) {
609            return QueryBuilders.matchAllQuery();
610        }
611        return filterBuilder;
612    }
613
614    protected Object convertDate(Object o) {
615        // Date are convert to timestamp ms which is a known format by default for ES
616        if (o instanceof Calendar) {
617            return Long.valueOf(((Calendar) o).getTime().getTime());
618        } else if (o instanceof Date) {
619            return Long.valueOf(((Date) o).getTime());
620        }
621        return o;
622    }
623
624    public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
625            DocumentModel searchDocumentModel) {
626        SearchRequest request = createSearchRequest();
627        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
628        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
629        request.source(
630                new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)));
631        return request;
632    }
633
634    protected boolean isNonNullParam(Object[] val) {
635        if (val == null) {
636            return false;
637        }
638        for (Object v : val) {
639            if (v != null) {
640                if (v instanceof String) {
641                    if (!((String) v).isEmpty()) {
642                        return true;
643                    }
644                } else if (v instanceof String[]) {
645                    if (((String[]) v).length > 0) {
646                        return true;
647                    }
648                } else {
649                    return true;
650                }
651            }
652        }
653        return false;
654    }
655
656    @SuppressWarnings("deprecation")
657    public String migrate(final int batchSize) {
658
659        final String MIGRATION_WORK_ID = "AuditMigration";
660
661        WorkManager wm = Framework.getService(WorkManager.class);
662        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
663        if (migrationState != null) {
664            return "Migration already scheduled : " + migrationState.toString();
665        }
666
667        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
668        wm.schedule(migrationWork);
669        return "Migration work started : " + MIGRATION_WORK_ID;
670    }
671
672    SearchResponse runRequest(SearchRequest request) {
673        logSearchRequest(request);
674        SearchResponse response = esClient.search(request);
675        logSearchResponse(response);
676        return response;
677    }
678
679    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
680        if (log.isDebugEnabled()) {
681            log.debug(String.format(
682                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
683                    keepAlive, scrollId));
684        }
685        SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive));
686        logSearchResponse(response);
687        return response;
688    }
689
690    protected void logSearchResponse(SearchResponse response) {
691        if (log.isDebugEnabled()) {
692            log.debug("Response: " + response.toString());
693        }
694    }
695
696    protected void logSearchRequest(SearchRequest request) {
697        if (log.isDebugEnabled()) {
698            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/_search?pretty' -d '%s'",
699                    getESIndexName(), request.toString()));
700        }
701    }
702
703    /**
704     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
705     */
706    protected void ensureUIDSequencer(ESClient esClient) {
707        boolean auditIndexExists = esClient.indexExists(getESIndexName());
708        if (!auditIndexExists) {
709            return;
710        }
711
712        // Get max log entry id
713        SearchRequest request = createSearchRequest();
714        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
715                                                .aggregation(AggregationBuilders.max("maxAgg").field("id")));
716        SearchResponse searchResponse = esClient.search(request);
717        Aggregation agg = searchResponse.getAggregations().get("maxAgg");
718        long maxLogEntryId = 0;
719        if (agg.getMetadata() != null && agg.getMetadata().containsKey(Aggregation.CommonFields.VALUE)) {
720            maxLogEntryId = (long) agg.getMetadata().get(Aggregation.CommonFields.VALUE);
721        }
722
723        // Get next sequence id
724        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
725        UIDSequencer seq = uidGeneratorService.getSequencer();
726        seq.init();
727        long nextSequenceId = seq.getNextLong(SEQ_NAME);
728
729        // Increment sequence to max log entry id if needed
730        if (nextSequenceId < maxLogEntryId) {
731            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
732                    nextSequenceId, maxLogEntryId));
733            seq.initSequence(SEQ_NAME, maxLogEntryId);
734        }
735    }
736
737    @Override
738    public ExtendedInfo newExtendedInfo(Serializable value) {
739        return new ESExtendedInfo(value);
740    }
741
742    protected String getESIndexName() {
743        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
744        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
745    }
746
747    @Override
748    public void append(List<String> jsonEntries) {
749        BulkRequest bulkRequest = new BulkRequest();
750        for (String json : jsonEntries) {
751            try {
752                Object entryId = new JSONObject(json).opt(LOG_ID);
753                if (entryId ==  null) {
754                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
755                }
756                IndexRequest request = new IndexRequest(getESIndexName()).id(entryId.toString());
757                request.source(json, XContentType.JSON);
758                bulkRequest.add(request);
759            } catch (JSONException e) {
760                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
761            }
762        }
763        esClient.bulk(bulkRequest);
764    }
765
766    @SuppressWarnings("resource") // CursorResult is being registered, must not be closed
767    @Override
768    public ScrollResult<String> scroll(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder, int batchSize,
769            int keepAliveSeconds) {
770        // prepare parameters
771        MultiExpression predicate = builder.predicate();
772        OrderByList orders = builder.orders();
773
774        // create source
775        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
776        source.size(batchSize);
777        // create request
778        SearchRequest request = createSearchRequest();
779        request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds));
780        SearchResponse response = runRequest(request);
781        // register cursor
782        String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds));
783        return scroll(scrollId);
784    }
785
786    @Override
787    public ScrollResult<String> scroll(String scrollId) {
788        return cursorService.scroll(scrollId);
789    }
790
791    public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> {
792
793        protected final String scrollId;
794
795        protected boolean end;
796
797        public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) {
798            super(response.getHits().iterator(), batchSize, keepAliveSeconds);
799            this.scrollId = response.getScrollId();
800        }
801
802        @Override
803        public boolean hasNext() {
804            if (cursor == null || end) {
805                return false;
806            } else if (cursor.hasNext()) {
807                return true;
808            } else {
809                runNextScroll();
810                return !end;
811            }
812        }
813
814        @Override
815        public SearchHit next() {
816            if (cursor != null && !cursor.hasNext() && !end) {
817                // try to run a next scroll
818                runNextScroll();
819            }
820            return super.next();
821        }
822
823        protected void runNextScroll() {
824            SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId,
825                    TimeValue.timeValueSeconds(keepAliveSeconds));
826            cursor = response.getHits().iterator();
827            end = !cursor.hasNext();
828        }
829
830        @Override
831        public void close() {
832            ClearScrollRequest request = new ClearScrollRequest();
833            request.addScrollId(scrollId);
834            esClient.clearScroll(request);
835            end = true;
836            // Call super close to clear cursor
837            super.close();
838        }
839
840    }
841
842}