001/*
002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID;
024
025import java.io.IOException;
026import java.io.OutputStream;
027import java.io.Serializable;
028import java.util.ArrayList;
029import java.util.Calendar;
030import java.util.Collections;
031import java.util.Date;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.function.Function;
037
038import org.apache.commons.lang3.StringUtils;
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.elasticsearch.action.bulk.BulkItemResponse;
042import org.elasticsearch.action.bulk.BulkRequest;
043import org.elasticsearch.action.bulk.BulkResponse;
044import org.elasticsearch.action.get.GetRequest;
045import org.elasticsearch.action.get.GetResponse;
046import org.elasticsearch.action.index.IndexRequest;
047import org.elasticsearch.action.search.ClearScrollRequest;
048import org.elasticsearch.action.search.SearchRequest;
049import org.elasticsearch.action.search.SearchResponse;
050import org.elasticsearch.action.search.SearchScrollRequest;
051import org.elasticsearch.action.search.SearchType;
052import org.elasticsearch.common.ParsingException;
053import org.elasticsearch.common.io.stream.BytesStreamOutput;
054import org.elasticsearch.common.settings.Settings;
055import org.elasticsearch.common.unit.TimeValue;
056import org.elasticsearch.common.xcontent.NamedXContentRegistry;
057import org.elasticsearch.common.xcontent.XContentBuilder;
058import org.elasticsearch.common.xcontent.XContentFactory;
059import org.elasticsearch.common.xcontent.XContentParser;
060import org.elasticsearch.common.xcontent.XContentType;
061import org.elasticsearch.index.query.BoolQueryBuilder;
062import org.elasticsearch.index.query.QueryBuilder;
063import org.elasticsearch.index.query.QueryBuilders;
064import org.elasticsearch.index.query.QueryParseContext;
065import org.elasticsearch.search.SearchHit;
066import org.elasticsearch.search.SearchModule;
067import org.elasticsearch.search.aggregations.AggregationBuilders;
068import org.elasticsearch.search.aggregations.metrics.max.Max;
069import org.elasticsearch.search.builder.SearchSourceBuilder;
070import org.elasticsearch.search.sort.SortOrder;
071import org.json.JSONException;
072import org.json.JSONObject;
073import org.nuxeo.common.utils.TextTemplate;
074import org.nuxeo.ecm.core.api.CursorResult;
075import org.nuxeo.ecm.core.api.CursorService;
076import org.nuxeo.ecm.core.api.DocumentModel;
077import org.nuxeo.ecm.core.api.NuxeoException;
078import org.nuxeo.ecm.core.api.ScrollResult;
079import org.nuxeo.ecm.core.query.sql.model.Literals;
080import org.nuxeo.ecm.core.query.sql.model.MultiExpression;
081import org.nuxeo.ecm.core.query.sql.model.Operand;
082import org.nuxeo.ecm.core.query.sql.model.Operator;
083import org.nuxeo.ecm.core.query.sql.model.OrderByList;
084import org.nuxeo.ecm.core.query.sql.model.Predicate;
085import org.nuxeo.ecm.core.query.sql.model.Reference;
086import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
087import org.nuxeo.ecm.core.uidgen.UIDSequencer;
088import org.nuxeo.ecm.core.work.api.Work;
089import org.nuxeo.ecm.core.work.api.Work.State;
090import org.nuxeo.ecm.core.work.api.WorkManager;
091import org.nuxeo.ecm.platform.audit.api.AuditQueryBuilder;
092import org.nuxeo.ecm.platform.audit.api.AuditReader;
093import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
094import org.nuxeo.ecm.platform.audit.api.LogEntry;
095import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
096import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
097import org.nuxeo.ecm.platform.audit.service.AuditBackend;
098import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
099import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
100import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
101import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
102import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
103import org.nuxeo.elasticsearch.ElasticSearchConstants;
104import org.nuxeo.elasticsearch.api.ESClient;
105import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
106import org.nuxeo.runtime.api.Framework;
107import org.nuxeo.runtime.model.DefaultComponent;
108
109import com.fasterxml.jackson.core.JsonFactory;
110import com.fasterxml.jackson.core.JsonGenerator;
111import com.fasterxml.jackson.databind.ObjectMapper;
112
113/**
114 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
115 *
116 * @author tiry
117 */
118public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
119
120    public static final String SEQ_NAME = "audit";
121
122    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
123
124    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
125
126    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
127
128    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
129
130    protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService;
131
132    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
133        super(component, config);
134    }
135
136    /**
137     * @since 9.3
138     */
139    public ESAuditBackend() {
140        super();
141    }
142
143    protected ESClient esClient;
144
145    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
146
147    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
148
149        @Override
150        public int removeEntries(String eventId, String pathPattern) {
151            throw new UnsupportedOperationException("Not implemented yet!");
152        }
153
154        @Override
155        public void addLogEntry(LogEntry logEntry) {
156            List<LogEntry> entries = new ArrayList<>();
157            entries.add(logEntry);
158            addLogEntries(entries);
159        }
160
161    };
162
163    protected ESClient getClient() {
164        log.info("Activate Elasticsearch backend for Audit");
165        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
166        ESClient client = esa.getClient();
167        ensureUIDSequencer(client);
168        return client;
169    }
170
171    protected boolean isMigrationDone() {
172        AuditReader reader = Framework.getService(AuditReader.class);
173        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
174        return !entries.isEmpty();
175    }
176
177    @Override
178    public int getApplicationStartedOrder() {
179        int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
180                "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder();
181        int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent(
182                "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder();
183        return Integer.max(elasticOrder, uidgenOrder) + 1;
184    }
185
186    @Override
187    public void onApplicationStarted() {
188        esClient = getClient();
189        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
190            if (!isMigrationDone()) {
191                log.info(String.format(
192                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
193                        MIGRATION_FLAG_PROP));
194                // Drop audit index first in case of a previous bad migration
195                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
196                esa.dropAndInitIndex(getESIndexName());
197                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
198                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
199                if (batchSizeProp != null) {
200                    batchSize = Integer.parseInt(batchSizeProp);
201                }
202                migrate(batchSize);
203            } else {
204                log.warn(String.format(
205                        "Property %s is true but migration is already done, please set this property to false",
206                        MIGRATION_FLAG_PROP));
207            }
208        } else {
209            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
210        }
211        cursorService = new CursorService<>(SearchHit::getSourceAsString);
212    }
213
214    @Override
215    public void onApplicationStopped() {
216        if (esClient == null) {
217            return;
218        }
219        try {
220            esClient.close();
221            cursorService.clear();
222        } catch (Exception e) {
223            log.warn("Fail to close esClient: " + e.getMessage(), e);
224        } finally {
225            esClient = null;
226            cursorService = null;
227        }
228    }
229
230    @Override
231    @SuppressWarnings("unchecked")
232    public List<LogEntry> queryLogs(AuditQueryBuilder builder) {
233        // prepare parameters
234        Predicate predicate = builder.predicate();
235        OrderByList orders = builder.orders();
236        long offset = builder.offset();
237        long limit = builder.limit();
238
239        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
240
241        // Perform search
242        List<LogEntry> logEntries;
243        SearchRequest request = createSearchRequest();
244        request.source(source);
245        if (limit == 0) {
246            // return all result -> use the scroll api
247            // offset is not taking into account when querying all results
248            TimeValue keepAlive = TimeValue.timeValueMinutes(1);
249            request.scroll(keepAlive);
250            // the size here is the size of each scrolls
251            source.size(100);
252
253            // run request
254            SearchResponse searchResponse = runRequest(request);
255
256            // Build log entries
257            logEntries = buildLogEntries(searchResponse);
258            // Scroll on next results
259            for (; //
260            searchResponse.getHits().getHits().length > 0
261                    && logEntries.size() < searchResponse.getHits().getTotalHits(); //
262                    searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) {
263                // Build log entries
264                logEntries.addAll(buildLogEntries(searchResponse));
265            }
266        } else {
267            // return a page -> use a regular search
268            source.from((int) offset).size((int) limit);
269
270            // run request
271            SearchResponse searchResponse = runRequest(request);
272
273            // Build log entries
274            logEntries = buildLogEntries(searchResponse);
275        }
276
277        return logEntries;
278    }
279
280    protected SearchSourceBuilder createSearchRequestSource(Predicate predicate, OrderByList orders) {
281        // create ES query builder
282        QueryBuilder query = createQueryBuilder(predicate);
283
284        // create ES source
285        SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100);
286
287        // create sort
288        orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC));
289        return source;
290    }
291
292    protected QueryBuilder createQueryBuilder(Predicate andPredicate) {
293        // cast parameters
294        // current implementation only support a MultiExpression with AND operator
295        @SuppressWarnings("unchecked")
296        List<Predicate> predicates = (List<Predicate>) ((List<?>) ((MultiExpression) andPredicate).values);
297        // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right
298        Function<Operand, String> getFieldName = operand -> ((Reference) operand).name;
299
300        QueryBuilder query;
301        if (predicates.isEmpty()) {
302            query = QueryBuilders.matchAllQuery();
303        } else {
304            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
305            for (Predicate predicate : predicates) {
306                String leftName = getFieldName.apply(predicate.lvalue);
307                Operator operator = predicate.operator;
308                Object rightValue = Literals.valueOf(predicate.rvalue);
309                if (Operator.EQ.equals(operator)) {
310                    boolQuery.must(QueryBuilders.termQuery(leftName, rightValue));
311                } else if (Operator.NOTEQ.equals(operator)) {
312                    boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue));
313                } else if (Operator.LT.equals(operator)) {
314                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue));
315                } else if (Operator.LTEQ.equals(operator)) {
316                    boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue));
317                } else if (Operator.GTEQ.equals(operator)) {
318                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue));
319                } else if (Operator.GT.equals(operator)) {
320                    boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue));
321                } else if (Operator.IN.equals(operator)) {
322                    boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue));
323                }
324            }
325            query = boolQuery;
326        }
327        return query;
328    }
329
330    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
331        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
332        ObjectMapper mapper = new ObjectMapper();
333        for (SearchHit hit : searchResponse.getHits()) {
334            try {
335                entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class));
336            } catch (IOException e) {
337                log.error("Error while reading Audit Entry from ES", e);
338            }
339        }
340        return entries;
341    }
342
343    protected SearchRequest createSearchRequest() {
344        return new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE)
345                                                  .searchType(SearchType.DFS_QUERY_THEN_FETCH);
346    }
347
348    @Override
349    public LogEntry getLogEntryByID(long id) {
350        GetResponse ret = esClient.get(
351                new GetRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id)));
352        if (!ret.isExists()) {
353            return null;
354        }
355        try {
356            return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class);
357        } catch (IOException e) {
358            throw new NuxeoException("Unable to read Entry for id " + id, e);
359        }
360    }
361
362    public SearchRequest buildQuery(String query, Map<String, Object> params) {
363        if (params != null && params.size() > 0) {
364            query = expandQueryVariables(query, params);
365        }
366        SearchRequest request = createSearchRequest();
367        SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query);
368        return request.source(sourceBuilder);
369    }
370
371    protected SearchSourceBuilder createSearchSourceBuilder(String query) {
372        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
373        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
374        try {
375            try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
376                    new NamedXContentRegistry(searchModule.getNamedXContents()), query)) {
377                searchSourceBuilder.parseXContent(new QueryParseContext(parser));
378            }
379        } catch (IOException | ParsingException e) {
380            log.error("Invalid query: " + query + ": " + e.getMessage(), e);
381            throw new IllegalArgumentException("Bad query: " + query);
382        }
383        return searchSourceBuilder;
384    }
385
386    public String expandQueryVariables(String query, Object[] params) {
387        Map<String, Object> qParams = new HashMap<>();
388        for (int i = 0; i < params.length; i++) {
389            query = query.replaceFirst("\\?", "\\${param" + i + "}");
390            qParams.put("param" + i, params[i]);
391        }
392        return expandQueryVariables(query, qParams);
393    }
394
395    public String expandQueryVariables(String query, Map<String, Object> params) {
396        if (params != null && params.size() > 0) {
397            TextTemplate tmpl = new TextTemplate();
398            for (String key : params.keySet()) {
399                Object val = params.get(key);
400                if (val == null) {
401                    continue;
402                } else if (val instanceof Calendar) {
403                    tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime()));
404                } else if (val instanceof Date) {
405                    tmpl.setVariable(key, Long.toString(((Date) val).getTime()));
406                } else {
407                    tmpl.setVariable(key, val.toString());
408                }
409            }
410            query = tmpl.processText(query);
411        }
412        return query;
413    }
414
415    @Override
416    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
417        SearchRequest request = buildQuery(query, params);
418        if (pageNb > 0) {
419            request.source().from(pageNb * pageSize);
420        }
421        if (pageSize > 0) {
422            request.source().size(pageSize);
423        }
424        SearchResponse searchResponse = runRequest(request);
425        return buildLogEntries(searchResponse);
426    }
427
428    @Override
429    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
430            int pageSize) {
431        SearchRequest request = createSearchRequest();
432        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
433        if (eventIds != null && eventIds.length > 0) {
434            if (eventIds.length == 1) {
435                filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0]));
436            } else {
437                filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds));
438            }
439        }
440        if (categories != null && categories.length > 0) {
441            if (categories.length == 1) {
442                filterBuilder.must(QueryBuilders.termQuery("category", categories[0]));
443            } else {
444                filterBuilder.must(QueryBuilders.termsQuery("category", categories));
445            }
446        }
447        if (path != null) {
448            filterBuilder.must(QueryBuilders.termQuery("docPath", path));
449        }
450
451        if (limit != null) {
452            filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit)));
453        }
454        request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder)));
455        if (pageNb > 0) {
456            request.source().from(pageNb * pageSize);
457        }
458        if (pageSize > 0) {
459            request.source().size(pageSize);
460        }
461        SearchResponse searchResponse = runRequest(request);
462        return buildLogEntries(searchResponse);
463    }
464
465    @Override
466    public void addLogEntries(List<LogEntry> entries) {
467
468        if (entries.isEmpty()) {
469            return;
470        }
471
472        BulkRequest bulkRequest = new BulkRequest();
473        JsonFactory factory = new JsonFactory();
474
475        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
476        UIDSequencer seq = uidGeneratorService.getSequencer();
477
478        try {
479
480            for (LogEntry entry : entries) {
481                entry.setId(seq.getNextLong(SEQ_NAME));
482                if (log.isDebugEnabled()) {
483                    log.debug(String.format("Indexing log entry: %s", entry));
484                }
485                entry.setLogDate(new Date());
486                OutputStream out = new BytesStreamOutput();
487                JsonGenerator jsonGen = factory.createGenerator(out);
488                XContentBuilder builder = jsonBuilder(out);
489                ObjectMapper mapper = new ObjectMapper();
490                mapper.writeValue(jsonGen, entry);
491                bulkRequest.add(new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
492                        String.valueOf(entry.getId())).source(builder));
493            }
494
495            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
496            if (bulkResponse.hasFailures()) {
497                for (BulkItemResponse response : bulkResponse.getItems()) {
498                    if (response.isFailed()) {
499                        log.error("Unable to index audit entry " + response.getItemId() + " :"
500                                + response.getFailureMessage());
501                    }
502                }
503            }
504        } catch (IOException e) {
505            throw new NuxeoException("Error while indexing Audit entries", e);
506        }
507
508    }
509
510    @Override
511    public Long getEventsCount(String eventId) {
512        SearchResponse res = esClient.search(
513                new SearchRequest(getESIndexName()).types(ElasticSearchConstants.ENTRY_TYPE).source(
514                        new SearchSourceBuilder().query(
515                                QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId)))
516                                                 .size(0)));
517        return res.getHits().getTotalHits();
518    }
519
520    @Override
521    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
522        return syncLogCreationEntries(provider, repoId, path, recurs);
523    }
524
525    public SearchResponse search(SearchRequest request) {
526        String[] indices = request.indices();
527        if (indices == null || indices.length != 1) {
528            throw new IllegalStateException("Search on audit must include index name: " + request);
529        }
530        if (!getESIndexName().equals(indices[0])) {
531            throw new IllegalStateException("Search on audit must be on audit index: " + request);
532        }
533        return runRequest(request);
534    }
535
536    protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
537
538        if (searchDocumentModel == null) {
539            return QueryBuilders.matchAllQuery();
540        }
541
542        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
543
544        int nbFilters = 0;
545
546        for (PredicateDefinition predicate : predicates) {
547
548            // extract data from DocumentModel
549            PredicateFieldDefinition[] fieldDef = predicate.getValues();
550            Object[] val = new Object[fieldDef.length];
551            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
552                if (fieldDef[fidx].getXpath() != null) {
553                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
554                } else {
555                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
556                }
557            }
558
559            if (!isNonNullParam(val)) {
560                // skip predicate where all values are null
561                continue;
562            }
563
564            nbFilters++;
565
566            String op = predicate.getOperator();
567            if (op.equalsIgnoreCase("IN")) {
568
569                String[] values = null;
570                if (val[0] instanceof Iterable<?>) {
571                    List<String> l = new ArrayList<>();
572                    Iterable<?> vals = (Iterable<?>) val[0];
573
574                    for (Object v : vals) {
575                        if (v != null) {
576                            l.add(v.toString());
577                        }
578                    }
579                    values = l.toArray(new String[l.size()]);
580                } else if (val[0] instanceof Object[]) {
581                    values = (String[]) val[0];
582                }
583                filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values));
584            } else if (op.equalsIgnoreCase("BETWEEN")) {
585                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
586                if (val.length > 1) {
587                    filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1])));
588                }
589            } else if (">".equals(op)) {
590                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0])));
591            } else if (">=".equals(op)) {
592                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0])));
593            } else if ("<".equals(op)) {
594                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0])));
595            } else if ("<=".equals(op)) {
596                filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0])));
597            } else {
598                filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0])));
599            }
600        }
601
602        if (nbFilters == 0) {
603            return QueryBuilders.matchAllQuery();
604        }
605        return filterBuilder;
606    }
607
608    protected Object convertDate(Object o) {
609        // Date are convert to timestamp ms which is a known format by default for ES
610        if (o instanceof Calendar) {
611            return Long.valueOf(((Calendar) o).getTime().getTime());
612        } else if (o instanceof Date) {
613            return Long.valueOf(((Date) o).getTime());
614        }
615        return o;
616    }
617
618    public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
619            DocumentModel searchDocumentModel) {
620        SearchRequest request = createSearchRequest();
621        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
622        QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
623        request.source(
624                new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder)));
625        return request;
626    }
627
628    protected boolean isNonNullParam(Object[] val) {
629        if (val == null) {
630            return false;
631        }
632        for (Object v : val) {
633            if (v != null) {
634                if (v instanceof String) {
635                    if (!((String) v).isEmpty()) {
636                        return true;
637                    }
638                } else if (v instanceof String[]) {
639                    if (((String[]) v).length > 0) {
640                        return true;
641                    }
642                } else {
643                    return true;
644                }
645            }
646        }
647        return false;
648    }
649
650    @SuppressWarnings("deprecation")
651    public String migrate(final int batchSize) {
652
653        final String MIGRATION_WORK_ID = "AuditMigration";
654
655        WorkManager wm = Framework.getService(WorkManager.class);
656        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
657        if (migrationState != null) {
658            return "Migration already scheduled : " + migrationState.toString();
659        }
660
661        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
662        wm.schedule(migrationWork);
663        return "Migration work started : " + MIGRATION_WORK_ID;
664    }
665
666    SearchResponse runRequest(SearchRequest request) {
667        logSearchRequest(request);
668        SearchResponse response = esClient.search(request);
669        logSearchResponse(response);
670        return response;
671    }
672
673    SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) {
674        if (log.isDebugEnabled()) {
675            log.debug(String.format(
676                    "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'",
677                    keepAlive, scrollId));
678        }
679        SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive));
680        logSearchResponse(response);
681        return response;
682    }
683
684    protected void logSearchResponse(SearchResponse response) {
685        if (log.isDebugEnabled()) {
686            log.debug("Response: " + response.toString());
687        }
688    }
689
690    protected void logSearchRequest(SearchRequest request) {
691        if (log.isDebugEnabled()) {
692            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
693                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
694        }
695    }
696
697    /**
698     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
699     */
700    protected void ensureUIDSequencer(ESClient esClient) {
701        boolean auditIndexExists = esClient.indexExists(getESIndexName());
702        if (!auditIndexExists) {
703            return;
704        }
705
706        // Get max log entry id
707        SearchRequest request = createSearchRequest();
708        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
709                                                .aggregation(AggregationBuilders.max("maxAgg").field("id")));
710        SearchResponse searchResponse = esClient.search(request);
711        Max agg = searchResponse.getAggregations().get("maxAgg");
712        long maxLogEntryId = (long) agg.getValue();
713
714        // Get next sequence id
715        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
716        UIDSequencer seq = uidGeneratorService.getSequencer();
717        seq.init();
718        long nextSequenceId = seq.getNextLong(SEQ_NAME);
719
720        // Increment sequence to max log entry id if needed
721        if (nextSequenceId < maxLogEntryId) {
722            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
723                    nextSequenceId, maxLogEntryId));
724            seq.initSequence(SEQ_NAME, maxLogEntryId);
725        }
726    }
727
728    @Override
729    public ExtendedInfo newExtendedInfo(Serializable value) {
730        return new ESExtendedInfo(value);
731    }
732
733    protected String getESIndexName() {
734        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
735        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
736    }
737
738    @Override
739    public void append(List<String> jsonEntries) {
740        BulkRequest bulkRequest = new BulkRequest();
741        for (String json : jsonEntries) {
742            try {
743                String entryId = new JSONObject(json).getString(LOG_ID);
744                if (StringUtils.isBlank(entryId)) {
745                    throw new NuxeoException("A json entry has an empty id. entry=" + json);
746                }
747                IndexRequest request = new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, entryId);
748                request.source(json, XContentType.JSON);
749                bulkRequest.add(request);
750            } catch (JSONException e) {
751                throw new NuxeoException("Unable to deserialize json entry=" + json, e);
752            }
753        }
754        esClient.bulk(bulkRequest);
755    }
756
757    @Override
758    public ScrollResult<String> scroll(AuditQueryBuilder builder, int batchSize, int keepAliveSeconds) {
759        // prepare parameters
760        Predicate predicate = builder.predicate();
761        OrderByList orders = builder.orders();
762
763        // create source
764        SearchSourceBuilder source = createSearchRequestSource(predicate, orders);
765        source.size(batchSize);
766        // create request
767        SearchRequest request = createSearchRequest();
768        request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds));
769        SearchResponse response = runRequest(request);
770        // register cursor
771        String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds));
772        return scroll(scrollId);
773    }
774
775    @Override
776    public ScrollResult<String> scroll(String scrollId) {
777        return cursorService.scroll(scrollId);
778    }
779
780    public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> {
781
782        protected final String scrollId;
783
784        protected boolean end;
785
786        public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) {
787            super(response.getHits().iterator(), batchSize, keepAliveSeconds);
788            this.scrollId = response.getScrollId();
789        }
790
791        @Override
792        public boolean hasNext() {
793            if (cursor == null || end) {
794                return false;
795            } else if (cursor.hasNext()) {
796                return true;
797            } else {
798                runNextScroll();
799                return !end;
800            }
801        }
802
803        @Override
804        public SearchHit next() {
805            if (cursor != null && !cursor.hasNext() && !end) {
806                // try to run a next scroll
807                runNextScroll();
808            }
809            return super.next();
810        }
811
812        protected void runNextScroll() {
813            SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId,
814                    TimeValue.timeValueSeconds(keepAliveSeconds));
815            cursor = response.getHits().iterator();
816            end = !cursor.hasNext();
817        }
818
819        @Override
820        public void close() {
821            ClearScrollRequest request = new ClearScrollRequest();
822            request.addScrollId(scrollId);
823            esClient.clearScroll(request);
824            end = true;
825            // Call super close to clear cursor
826            super.close();
827        }
828
829    }
830
831}