001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION;
023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
025
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.Serializable;
029import java.util.ArrayList;
030import java.util.Calendar;
031import java.util.Collections;
032import java.util.Date;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.function.Function;
038
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.elasticsearch.action.bulk.BulkItemResponse;
043import org.elasticsearch.action.bulk.BulkRequest;
044import org.elasticsearch.action.bulk.BulkResponse;
045import org.elasticsearch.action.get.GetRequest;
046import org.elasticsearch.action.get.GetResponse;
047import org.elasticsearch.action.index.IndexRequest;
048import org.elasticsearch.action.search.ClearScrollRequest;
049import org.elasticsearch.action.search.SearchRequest;
050import org.elasticsearch.action.search.SearchResponse;
051import org.elasticsearch.action.search.SearchScrollRequest;
052import org.elasticsearch.action.search.SearchType;
053import org.elasticsearch.common.ParsingException;
054import org.elasticsearch.common.io.stream.BytesStreamOutput;
055import org.elasticsearch.common.settings.Settings;
056import org.elasticsearch.common.unit.TimeValue;
057import org.elasticsearch.common.xcontent.NamedXContentRegistry;
058import org.elasticsearch.common.xcontent.XContentBuilder;
059import org.elasticsearch.common.xcontent.XContentFactory;
060import org.elasticsearch.common.xcontent.XContentParser;
061import org.elasticsearch.common.xcontent.XContentType;
062import org.elasticsearch.index.query.BoolQueryBuilder;
063import org.elasticsearch.index.query.QueryBuilder;
064import org.elasticsearch.index.query.QueryBuilders;
065import org.elasticsearch.search.SearchHit;
066import org.elasticsearch.search.SearchModule;
067import org.elasticsearch.search.aggregations.AggregationBuilders;
068import org.elasticsearch.search.aggregations.metrics.max.Max;
069import org.elasticsearch.search.builder.SearchSourceBuilder;
070import org.elasticsearch.search.sort.SortOrder;
071import org.json.JSONException;
072import org.json.JSONObject;
073import org.nuxeo.common.utils.TextTemplate;
074import org.nuxeo.ecm.core.api.CursorResult;
075import org.nuxeo.ecm.core.api.CursorService;
076import org.nuxeo.ecm.core.api.DocumentModel;
077import org.nuxeo.ecm.core.api.NuxeoException;
078import org.nuxeo.ecm.core.api.ScrollResult;
079import org.nuxeo.ecm.core.query.sql.model.Literals;
080import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
081import org.nuxeo.ecm.core.query.sql.model.Operand;
082import org.nuxeo.ecm.core.query.sql.model.Operator;
083import org.nuxeo.ecm.core.query.sql.model.OrderByList;
084import org.nuxeo.ecm.core.query.sql.model.Predicate;
085import org.nuxeo.ecm.core.query.sql.model.Reference;
086import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
087import org.nuxeo.ecm.core.uidgen.UIDSequencer;
088import org.nuxeo.ecm.core.work.api.Work;
089import org.nuxeo.ecm.core.work.api.Work.State;
090import org.nuxeo.ecm.core.work.api.WorkManager;
091import org.nuxeo.ecm.platform.audit.api.AuditReader;
092import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
093import org.nuxeo.ecm.platform.audit.api.LogEntry;
094import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
095import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
096import org.nuxeo.ecm.platform.audit.service.AuditBackend;
097import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
098import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
099import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
100import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
101import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
102import org.nuxeo.elasticsearch.ElasticSearchConstants;
103import org.nuxeo.elasticsearch.api.ESClient;
104import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
105import org.nuxeo.elasticsearch.query.NxqlQueryConverter;
106import org.nuxeo.runtime.api.Framework;
107import org.nuxeo.runtime.model.DefaultComponent;
108
109import com.fasterxml.jackson.core.JsonFactory;
110import com.fasterxml.jackson.core.JsonGenerator;
111import com.fasterxml.jackson.databind.ObjectMapper;
112
113/**
114 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
115 *
116 * @author tiry
117 */
118public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
119
120    public static final String SEQ_NAME = "audit";
121
122    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
123
124    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
125
126    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
127
128    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
129
130    protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService;
131
132    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
133        super(component, config);
134    }
135
136    /**
137     * @since 9.3
138     */
139    public ESAuditBackend() {
140        super();
141    }
142
143    protected ESClient esClient;
144
145    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
146
147    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
148
149        @Override
150        public int removeEntries(String eventId, String pathPattern) {
151            throw new UnsupportedOperationException("Not implemented yet!");
152        }
153
154        @Override
155        public void addLogEntry(LogEntry logEntry) {
156            List<LogEntry> entries = new ArrayList<>();
157            entries.add(logEntry);
158            addLogEntries(entries);
159        }
160
161    };
162
163    protected ESClient getClient() {
164        log.info("Activate Elasticsearch backend for Audit");
165        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
166        ESClient client = esa.getClient();
167        ensureUIDSequencer(client);
168        return client;
169    }
170
171    protected boolean isMigrationDone() {
172        AuditReader reader = Framework.getService(AuditReader.class);
173        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
174        return !entries.isEmpty();
175    }
176
177    @Override
178    public int getApplicationStartedOrder() {
179        int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
180                "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder();
181        int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
182                "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder();
183        return Integer.max(elasticOrder, uidgenOrder) + 1;
184    }
185
186    @Override
187    public void onApplicationStarted() {
188        esClient = getClient();
189        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
190            if (!isMigrationDone()) {
191                log.info(String.format(
192                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
193                        MIGRATION_FLAG_PROP));
194                // Drop audit index first in case of a previous bad migration
195                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
196                esa.dropAndInitIndex(getESIndexName());
197                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
198                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
199                if (batchSizeProp != null) {
200                    batchSize = Integer.parseInt(batchSizeProp);
201                }
202                migrate(batchSize);
203            } else {
204                log.warn(String.format(
205                        "Property %s is true but migration is already done, please set this property to false",
206                        MIGRATION_FLAG_PROP));
207            }
208        } else {
209            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
210        }
211        cursorService = new CursorService<>(SearchHit::getSourceAsString);
212    }
213
214    @Override
215    public void onApplicationStopped() {
216        if (esClient == null) {
217            return;
218        }
219        try {
220            esClient.close();
221            cursorService.clear();
222        } catch (Exception e) {
223            log.warn("Fail to close esClient: " + e.getMessage(), e);
224        } finally {
225            esClient = null;
226            cursorService = null;
227        }
228    }
229
230    @Override
231    @SuppressWarnings("unchecked")
232    public List<LogEntry> queryLogs(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder) {
233        // prepare parameters
234        MultiExpression predicate = builder.predicate();
235        OrderByList orders = builder.orders();
236        long offset = builder.offset();
237        long limit = builder.limit();
238
239        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
240
241        // Perform search
242        List<LogEntry> logEntries;
243        SearchRequest request = createSearchRequest();
244        request.source(source);
245        if (limit == 0) {
246            // return all result -> use the scroll api
247            // offset is not taking into account when querying all results
248            TimeValue keepAlive = TimeValue.timeValueMinutes(1);
249            request.scroll(keepAlive);
250            // the size here is the size of each scrolls
251            source.size(100);
252
253            // run request
254            SearchResponse searchResponse = runRequest(request);
255
256            // Build log entries
257            logEntries = buildLogEntries(searchResponse);
258            // Scroll on next results
259            for (; //
260            searchResponse.getHits().getHits().length > 0
261                    && logEntries.size() < searchResponse.getHits().getTotalHits(); //
262                    searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
263                // Build log entries
264                logEntries.addAll(buildLogEntries(searchResponse));
265            }
266        } else {
267            // return a page -> use a regular search
268            source.from((int) offset).size((int) limit);
269
270            // run request
271            SearchResponse searchResponse = runRequest(request);
272
273            // Build log entries
274            logEntries = buildLogEntries(searchResponse);
275        }
276
277        return logEntries;
278    }
279
280    protected SearchSourceBuilder createSearchRequestSource(MultiExpression predicate, OrderByList orders) {
281        // create ES query builder
282        QueryBuilder query = createQueryBuilder(predicate);
283
284        // create ES source
285        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100);
286
287        // create sort
288        orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC));
289        return source;
290    }
291
292    protected QueryBuilder createQueryBuilder(MultiExpression andPredicate) {
293        // cast parameters
294        // current implementation only support a MultiExpression with AND operator
295        List<Predicate> predicates = andPredicate.predicates;
296        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
297        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
298
299        QueryBuilder query;
300        if (predicates.isEmpty()) {
301            query = QueryBuilders.matchAllQuery();
302        } else {
303            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
304            for (Predicate predicate : predicates) {
305                String leftName = getFieldName.apply(predicate.lvalue);
306                Operator operator = predicate.operator;
307                Object rightValue = Literals.valueOf(predicate.rvalue);
308                if (Operator.EQ.equals(operator)) {
309                    boolQuery.must(QueryBuilders.termQuery(leftName, rightValue));
310                } else if (Operator.NOTEQ.equals(operator)) {
311                    boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue));
312                } else if (Operator.LT.equals(operator)) {
313                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue));
314                } else if (Operator.LTEQ.equals(operator)) {
315                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue));
316                } else if (Operator.GTEQ.equals(operator)) {
317                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue));
318                } else if (Operator.GT.equals(operator)) {
319                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue));
320                } else if (Operator.IN.equals(operator)) {
321                    boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue));
322                } else if (Operator.STARTSWITH.equals(operator)) {
323                    boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue));
324                }
325            }
326            query = boolQuery;
327        }
328        return query;
329    }
330
331    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
332        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
333        ObjectMapper mapper = new ObjectMapper();
334        for (SearchHit hit : searchResponse.getHits()) {
335            try {
336                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
337            } catch (IOException e) {
338                log.error("Error while reading Audit Entry from ES", e);
339            }
340        }
341        return entries;
342    }
343
344    protected SearchRequest createSearchRequest() {
345        return new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH);
346    }
347
348    @Override
349    public LogEntry getLogEntryByID(long id) {
350        GetResponse ret = esClient.get(
351                new GetRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id)));
352        if (!ret.isExists()) {
353            return null;
354        }
355        try {
356            return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class);
357        } catch (IOException e) {
358            throw new NuxeoException("Unable to read Entry for id " + id, e);
359        }
360    }
361
362    public SearchRequest buildQuery(String query, Map<String, Object> params) {
363        if (params != null && params.size() > 0) {
364            query = expandQueryVariables(query, params);
365        }
366        SearchRequest request = createSearchRequest();
367        SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query);
368        return request.source(sourceBuilder);
369    }
370
371    protected SearchSourceBuilder createSearchSourceBuilder(String query) {
372        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
373        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
374        try {
375            try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
376                    new NamedXContentRegistry(searchModule.getNamedXContents()), THROW_UNSUPPORTED_OPERATION, query)) {
377                searchSourceBuilder.parseXContent(parser);
378            }
379        } catch (IOException | ParsingException e) {
380            log.error("Invalid query: " + query + ": " + e.getMessage(), e);
381            throw new IllegalArgumentException("Bad query: " + query);
382        }
383        return searchSourceBuilder;
384    }
385
386    public String expandQueryVariables(String query, Object[] params) {
387        Map<String, Object> qParams = new HashMap<>();
388        for (int i = 0; i < params.length; i++) {
389            query = query.replaceFirst("\\?", "\\${param" + i + "}");
390            qParams.put("param" + i, params[i]);
391        }
392        return expandQueryVariables(query, qParams);
393    }
394
395    public String expandQueryVariables(String query, Map<String, Object> params) {
396        if (params != null && params.size() > 0) {
397            TextTemplate tmpl = new TextTemplate();
398            for (String key : params.keySet()) {
399                Object val = params.get(key);
400                if (val == null) {
401                    continue;
402                } else if (val instanceof Calendar) {
403                    tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime()));
404                } else if (val instanceof Date) {
405                    tmpl.setVariable(key, Long.toString(((Date) val).getTime()));
406                } else {
407                    tmpl.setVariable(key, val.toString());
408                }
409            }
410            query = tmpl.processText(query);
411        }
412        return query;
413    }
414
415    @Override
416    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
417        SearchRequest request = buildQuery(query, params);
418        if (pageNb > 0) {
419            request.source().from(pageNb * pageSize);
420        }
421        if (pageSize > 0) {
422            request.source().size(pageSize);
423        }
424        SearchResponse searchResponse = runRequest(request);
425        return buildLogEntries(searchResponse);
426    }
427
428    @Override
429    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
430            int pageSize) {
431        SearchRequest request = createSearchRequest();
432        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
433        if (eventIds != null && eventIds.length > 0) {
434            if (eventIds.length == 1) {
435                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
436            } else {
437                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
438            }
439        }
440        if (categories != null && categories.length > 0) {
441            if (categories.length == 1) {
442                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
443            } else {
444                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
445            }
446        }
447        if (path != null) {
448            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
449        }
450
451        if (limit != null) {
452            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit)));
453        }
454        request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder)));
455        if (pageNb > 0) {
456            request.source().from(pageNb * pageSize);
457        }
458        if (pageSize > 0) {
459            request.source().size(pageSize);
460        }
461        SearchResponse searchResponse = runRequest(request);
462        return buildLogEntries(searchResponse);
463    }
464
465    @Override
466    public void addLogEntries(List<LogEntry> entries) {
467
468        if (entries.isEmpty()) {
469            return;
470        }
471
472        BulkRequest bulkRequest = new BulkRequest();
473        JsonFactory factory = new JsonFactory();
474
475        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
476        UIDSequencer seq = uidGeneratorService.getSequencer();
477
478        try {
479            List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size());
480            for (int i = 0; i < entries.size(); i++) {
481                LogEntry entry = entries.get(i);
482                entry.setId(block.get(i));
483                if (log.isDebugEnabled()) {
484                    log.debug(String.format("Indexing log entry: %s", entry));
485                }
486                entry.setLogDate(new Date());
487                try (OutputStream out = new BytesStreamOutput(); //
488                        JsonGenerator jg = factory.createGenerator(out); //
489                        XContentBuilder builder = jsonBuilder(out)) {
490                    ObjectMapper mapper = new ObjectMapper();
491                    mapper.writeValue(jg, entry);
492                    bulkRequest.add(new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
493                            String.valueOf(entry.getId())).source(builder));
494                }
495            }
496
497            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
498            if (bulkResponse.hasFailures()) {
499                for (BulkItemResponse response : bulkResponse.getItems()) {
500                    if (response.isFailed()) {
501                        log.error("Unable to index audit entry " + response.getItemId() + " :"
502                                + response.getFailureMessage());
503                    }
504                }
505            }
506        } catch (IOException e) {
507            throw new NuxeoException("Error while indexing Audit entries", e);
508        }
509
510    }
511
512    @Override
513    public Long getEventsCount(String eventId) {
514        SearchResponse res = esClient.search(
515                new SearchRequest(getESIndexName()).source(
516                        new SearchSourceBuilder().query(
517                                QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId)))
518                                                 .size(0)));
519        return res.getHits().getTotalHits();
520    }
521
522    @Override
523    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
524        return syncLogCreationEntries(provider, repoId, path, recurs);
525    }
526
527    public SearchResponse search(SearchRequest request) {
528        String[] indices = request.indices();
529        if (indices == null || indices.length != 1) {
530            throw new IllegalStateException("Search on audit must include index name: " + request);
531        }
532        if (!getESIndexName().equals(indices[0])) {
533            throw new IllegalStateException("Search on audit must be on audit index: " + request);
534        }
535        return runRequest(request);
536    }
537
538    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
539
540        if (searchDocumentModel == null) {
541            return QueryBuilders.matchAllQuery();
542        }
543
544        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
545
546        int nbFilters = 0;
547
548        for (PredicateDefinition predicate : predicates) {
549
550            // extract data from DocumentModel
551            PredicateFieldDefinition[] fieldDef = predicate.getValues();
552            Object[] val = new Object[fieldDef.length];
553            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
554                if (fieldDef[fidx].getXpath() != null) {
555                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
556                } else {
557                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
558                }
559            }
560
561            if (!isNonNullParam(val)) {
562                // skip predicate where all values are null
563                continue;
564            }
565
566            nbFilters++;
567
568            String op = predicate.getOperator();
569            if (op.equalsIgnoreCase("IN")) {
570
571                String[] values = null;
572                if (val[0] instanceof Iterable<?>) {
573                    List<String> l = new ArrayList<>();
574                    Iterable<?> vals = (Iterable<?>) val[0];
575
576                    for (Object v : vals) {
577                        if (v != null) {
578                            l.add(v.toString());
579                        }
580                    }
581                    values = l.toArray(new String[l.size()]);
582                } else if (val[0] instanceof Object[]) {
583                    values = (String[]) val[0];
584                }
585                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
586            } else if (op.equalsIgnoreCase("BETWEEN")) {
587                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
588                if (val.length > 1) {
589                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1])));
590                }
591            } else if (">".equals(op)) {
592                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
593            } else if (">=".equals(op)) {
594                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0])));
595            } else if ("<".equals(op)) {
596                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0])));
597            } else if ("<=".equals(op)) {
598                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0])));
599            } else {
600                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0])));
601            }
602        }
603
604        if (nbFilters == 0) {
605            return QueryBuilders.matchAllQuery();
606        }
607        return filterBuilder;
608    }
609
610    protected Object convertDate(Object o) {
611        // Date are convert to timestamp ms which is a known format by default for ES
612        if (o instanceof Calendar) {
613            return Long.valueOf(((Calendar) o).getTime().getTime());
614        } else if (o instanceof Date) {
615            return Long.valueOf(((Date) o).getTime());
616        }
617        return o;
618    }
619
620    public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
621            DocumentModel searchDocumentModel) {
622        SearchRequest request = createSearchRequest();
623        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
624        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
625        request.source(
626                new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)));
627        return request;
628    }
629
630    protected boolean isNonNullParam(Object[] val) {
631        if (val == null) {
632            return false;
633        }
634        for (Object v : val) {
635            if (v != null) {
636                if (v instanceof String) {
637                    if (!((String) v).isEmpty()) {
638                        return true;
639                    }
640                } else if (v instanceof String[]) {
641                    if (((String[]) v).length > 0) {
642                        return true;
643                    }
644                } else {
645                    return true;
646                }
647            }
648        }
649        return false;
650    }
651
652    @SuppressWarnings("deprecation")
653    public String migrate(final int batchSize) {
654
655        final String MIGRATION_WORK_ID = "AuditMigration";
656
657        WorkManager wm = Framework.getService(WorkManager.class);
658        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
659        if (migrationState != null) {
660            return "Migration already scheduled : " + migrationState.toString();
661        }
662
663        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
664        wm.schedule(migrationWork);
665        return "Migration work started : " + MIGRATION_WORK_ID;
666    }
667
668    SearchResponse runRequest(SearchRequest request) {
669        logSearchRequest(request);
670        SearchResponse response = esClient.search(request);
671        logSearchResponse(response);
672        return response;
673    }
674
675    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
676        if (log.isDebugEnabled()) {
677            log.debug(String.format(
678                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
679                    keepAlive, scrollId));
680        }
681        SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive));
682        logSearchResponse(response);
683        return response;
684    }
685
686    protected void logSearchResponse(SearchResponse response) {
687        if (log.isDebugEnabled()) {
688            log.debug("Response: " + response.toString());
689        }
690    }
691
692    protected void logSearchRequest(SearchRequest request) {
693        if (log.isDebugEnabled()) {
694            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
695                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
696        }
697    }
698
699    /**
700     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
701     */
702    protected void ensureUIDSequencer(ESClient esClient) {
703        boolean auditIndexExists = esClient.indexExists(getESIndexName());
704        if (!auditIndexExists) {
705            return;
706        }
707
708        // Get max log entry id
709        SearchRequest request = createSearchRequest();
710        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
711                                                .aggregation(AggregationBuilders.max("maxAgg").field("id")));
712        SearchResponse searchResponse = esClient.search(request);
713        Max agg = searchResponse.getAggregations().get("maxAgg");
714        long maxLogEntryId = (long) agg.getValue();
715
716        // Get next sequence id
717        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
718        UIDSequencer seq = uidGeneratorService.getSequencer();
719        seq.init();
720        long nextSequenceId = seq.getNextLong(SEQ_NAME);
721
722        // Increment sequence to max log entry id if needed
723        if (nextSequenceId < maxLogEntryId) {
724            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
725                    nextSequenceId, maxLogEntryId));
726            seq.initSequence(SEQ_NAME, maxLogEntryId);
727        }
728    }
729
730    @Override
731    public ExtendedInfo newExtendedInfo(Serializable value) {
732        return new ESExtendedInfo(value);
733    }
734
735    protected String getESIndexName() {
736        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
737        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
738    }
739
740    @Override
741    public void append(List<String> jsonEntries) {
742        BulkRequest bulkRequest = new BulkRequest();
743        for (String json : jsonEntries) {
744            try {
745                String entryId = new JSONObject(json).getString(LOG_ID);
746                if (StringUtils.isBlank(entryId)) {
747                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
748                }
749                IndexRequest request = new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, entryId);
750                request.source(json, XContentType.JSON);
751                bulkRequest.add(request);
752            } catch (JSONException e) {
753                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
754            }
755        }
756        esClient.bulk(bulkRequest);
757    }
758
759    @Override
760    public ScrollResult<String> scroll(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder, int batchSize,
761            int keepAliveSeconds) {
762        // prepare parameters
763        MultiExpression predicate = builder.predicate();
764        OrderByList orders = builder.orders();
765
766        // create source
767        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
768        source.size(batchSize);
769        // create request
770        SearchRequest request = createSearchRequest();
771        request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds));
772        SearchResponse response = runRequest(request);
773        // register cursor
774        String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds));
775        return scroll(scrollId);
776    }
777
778    @Override
779    public ScrollResult<String> scroll(String scrollId) {
780        return cursorService.scroll(scrollId);
781    }
782
783    public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> {
784
785        protected final String scrollId;
786
787        protected boolean end;
788
789        public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) {
790            super(response.getHits().iterator(), batchSize, keepAliveSeconds);
791            this.scrollId = response.getScrollId();
792        }
793
794        @Override
795        public boolean hasNext() {
796            if (cursor == null || end) {
797                return false;
798            } else if (cursor.hasNext()) {
799                return true;
800            } else {
801                runNextScroll();
802                return !end;
803            }
804        }
805
806        @Override
807        public SearchHit next() {
808            if (cursor != null && !cursor.hasNext() && !end) {
809                // try to run a next scroll
810                runNextScroll();
811            }
812            return super.next();
813        }
814
815        protected void runNextScroll() {
816            SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId,
817                    TimeValue.timeValueSeconds(keepAliveSeconds));
818            cursor = response.getHits().iterator();
819            end = !cursor.hasNext();
820        }
821
822        @Override
823        public void close() {
824            ClearScrollRequest request = new ClearScrollRequest();
825            request.addScrollId(scrollId);
826            esClient.clearScroll(request);
827            end = true;
828            // Call super close to clear cursor
829            super.close();
830        }
831
832    }
833
834}