001/*
002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Calendar;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.MapUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequestBuilder;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.count.CountResponse;
043import org.elasticsearch.action.get.GetResponse;
044import org.elasticsearch.action.search.SearchRequestBuilder;
045import org.elasticsearch.action.search.SearchResponse;
046import org.elasticsearch.action.search.SearchType;
047import org.elasticsearch.client.Client;
048import org.elasticsearch.common.xcontent.XContentBuilder;
049import org.elasticsearch.index.query.BoolFilterBuilder;
050import org.elasticsearch.index.query.FilterBuilder;
051import org.elasticsearch.index.query.FilterBuilders;
052import org.elasticsearch.index.query.QueryBuilder;
053import org.elasticsearch.index.query.QueryBuilders;
054import org.elasticsearch.index.query.TermFilterBuilder;
055import org.elasticsearch.search.SearchHit;
056import org.elasticsearch.search.aggregations.AggregationBuilders;
057import org.elasticsearch.search.aggregations.metrics.max.Max;
058import org.elasticsearch.search.sort.SortOrder;
059import org.joda.time.DateTime;
060import org.joda.time.format.ISODateTimeFormat;
061import org.nuxeo.common.utils.TextTemplate;
062import org.nuxeo.ecm.core.api.DocumentModel;
063import org.nuxeo.ecm.core.api.NuxeoException;
064import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
065import org.nuxeo.ecm.core.uidgen.UIDSequencer;
066import org.nuxeo.ecm.core.work.api.Work;
067import org.nuxeo.ecm.core.work.api.Work.State;
068import org.nuxeo.ecm.core.work.api.WorkManager;
069import org.nuxeo.ecm.platform.audit.api.AuditReader;
070import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
071import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
072import org.nuxeo.ecm.platform.audit.api.LogEntry;
073import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException;
074import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser;
075import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
076import org.nuxeo.ecm.platform.audit.service.AuditBackend;
077import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
078import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
079import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
080import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
081import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
082import org.nuxeo.elasticsearch.ElasticSearchConstants;
083import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
084import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
085import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter;
086import org.nuxeo.runtime.api.Framework;
087import org.nuxeo.runtime.model.DefaultComponent;
088
089/**
090 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
091 *
092 * @author tiry
093 */
094public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
095
096    public static final String SEQ_NAME = "audit";
097
098    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
099
100    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
101
102    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
103
104    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
105
106    public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) {
107        super(component, config);
108    }
109
110    protected Client esClient;
111
112    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
113
114    protected BaseLogEntryProvider provider = new BaseLogEntryProvider() {
115
116        @Override
117        public int removeEntries(String eventId, String pathPattern) {
118            throw new UnsupportedOperationException("Not implemented yet!");
119        }
120
121        @Override
122        public void addLogEntry(LogEntry logEntry) {
123            List<LogEntry> entries = new ArrayList<>();
124            entries.add(logEntry);
125            addLogEntries(entries);
126        }
127    };
128
129    protected Client getClient() {
130        log.info("Activate Elasticsearch backend for Audit");
131        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
132        Client client = esa.getClient();
133        ensureUIDSequencer(client);
134        return client;
135    }
136
137    protected boolean isMigrationDone() {
138        AuditReader reader = Framework.getService(AuditReader.class);
139        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
140        return !entries.isEmpty();
141    }
142
143
144    @Override
145    public int getApplicationStartedOrder() {
146        return ((DefaultComponent)Framework.getRuntime().getComponent("org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder()+1;
147    }
148
149    @Override
150    public void onApplicationStarted() {
151        esClient = getClient();
152        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
153            if (!isMigrationDone()) {
154                log.info(String.format(
155                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
156                        MIGRATION_FLAG_PROP));
157                // Drop audit index first in case of a previous bad migration
158                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
159                esa.dropAndInitIndex(getESIndexName());
160                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
161                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
162                if (batchSizeProp != null) {
163                    batchSize = Integer.parseInt(batchSizeProp);
164                }
165                migrate(batchSize);
166            } else {
167                log.warn(String.format(
168                        "Property %s is true but migration is already done, please set this property to false",
169                        MIGRATION_FLAG_PROP));
170            }
171        } else {
172            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
173        }
174    }
175
176    @Override
177    public void onShutdown() {
178        if (esClient != null) {
179            try {
180                esClient.close();
181            } finally {
182                esClient = null;
183            }
184        }
185    }
186
187    @Override
188    public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) {
189        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
190        TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid);
191        FilterBuilder filter;
192        if (MapUtils.isEmpty(filterMap)) {
193            filter = docFilter;
194        } else {
195            filter = FilterBuilders.boolFilter();
196            ((BoolFilterBuilder) filter).must(docFilter);
197            for (String key : filterMap.keySet()) {
198                FilterMapEntry entry = filterMap.get(key);
199                ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject()));
200            }
201        }
202        builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE);
203        if (doDefaultSort) {
204            builder.addSort("eventDate", SortOrder.DESC);
205        }
206        logSearchRequest(builder);
207        SearchResponse searchResponse = builder.get();
208        logSearchResponse(searchResponse);
209        return buildLogEntries(searchResponse);
210    }
211
212    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
213        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
214        for (SearchHit hit : searchResponse.getHits()) {
215            try {
216                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
217            } catch (IOException e) {
218                log.error("Error while reading Audit Entry from ES", e);
219            }
220        }
221        return entries;
222    }
223
224    protected SearchRequestBuilder getSearchRequestBuilder(Client esClient) {
225        return esClient.prepareSearch(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setSearchType(
226                SearchType.DFS_QUERY_THEN_FETCH);
227    }
228
229    @Override
230    public LogEntry getLogEntryByID(long id) {
231        GetResponse ret = esClient.prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
232                String.valueOf(id)).get();
233        if (!ret.isExists()) {
234            return null;
235        }
236        try {
237            return AuditEntryJSONReader.read(ret.getSourceAsString());
238        } catch (IOException e) {
239            throw new RuntimeException("Unable to read Entry for id " + id, e);
240        }
241    }
242
243    public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) {
244        if (params != null && params.size() > 0) {
245            query = expandQueryVariables(query, params);
246        }
247        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
248        builder.setQuery(query);
249        return builder;
250    }
251
252    public String expandQueryVariables(String query, Object[] params) {
253        Map<String, Object> qParams = new HashMap<>();
254        for (int i = 0; i < params.length; i++) {
255            query = query.replaceFirst("\\?", "\\${param" + i + "}");
256            qParams.put("param" + i, params[i]);
257        }
258        return expandQueryVariables(query, qParams);
259    }
260
261    public String expandQueryVariables(String query, Map<String, Object> params) {
262        if (params != null && params.size() > 0) {
263            TextTemplate tmpl = new TextTemplate();
264            for (String key : params.keySet()) {
265                Object val = params.get(key);
266                if (val == null) {
267                    continue;
268                } else if (val instanceof Calendar) {
269                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
270                } else if (val instanceof Date) {
271                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
272                } else {
273                    tmpl.setVariable(key, val.toString());
274                }
275            }
276            query = tmpl.processText(query);
277        }
278        return query;
279    }
280
281    @Override
282    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
283        SearchRequestBuilder builder = buildQuery(query, params);
284        if (pageNb > 0) {
285            builder.setFrom(pageNb * pageSize);
286        }
287        if (pageSize > 0) {
288            builder.setSize(pageSize);
289        }
290        logSearchRequest(builder);
291        SearchResponse searchResponse = builder.get();
292        logSearchResponse(searchResponse);
293        return buildLogEntries(searchResponse);
294    }
295
296    @Override
297    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
298            int pageSize) {
299        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
300        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
301        if (eventIds != null && eventIds.length > 0) {
302            if (eventIds.length == 1) {
303                filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
304            } else {
305                filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
306            }
307        }
308        if (categories != null && categories.length > 0) {
309            if (categories.length == 1) {
310                filterBuilder.must(FilterBuilders.termFilter("category", categories[0]));
311            } else {
312                filterBuilder.must(FilterBuilders.termsFilter("category", categories));
313            }
314        }
315        if (path != null) {
316            filterBuilder.must(FilterBuilders.termFilter("docPath", path));
317        }
318
319        if (limit != null) {
320            filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit));
321        }
322
323        builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder));
324
325        if (pageNb > 0) {
326            builder.setFrom(pageNb * pageSize);
327        }
328        if (pageSize > 0) {
329            builder.setSize(pageSize);
330        } else {
331            builder.setSize(Integer.MAX_VALUE);
332        }
333        logSearchRequest(builder);
334        SearchResponse searchResponse = builder.get();
335        logSearchResponse(searchResponse);
336        return buildLogEntries(searchResponse);
337    }
338
339    @Override
340    public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path,
341            int pageNb, int pageSize) {
342
343        Date limit = null;
344        if (dateRange != null) {
345            try {
346                limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange);
347            } catch (AuditQueryException aqe) {
348                aqe.addInfo("Wrong date range query. Query was " + dateRange);
349                throw aqe;
350            }
351        }
352        return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize);
353    }
354
355    @Override
356    public void addLogEntries(List<LogEntry> entries) {
357
358        if (entries.isEmpty()) {
359            return;
360        }
361
362        BulkRequestBuilder bulkRequest = esClient.prepareBulk();
363        JsonFactory factory = new JsonFactory();
364
365        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
366        UIDSequencer seq = uidGeneratorService.getSequencer();
367
368        try {
369
370            for (LogEntry entry : entries) {
371                entry.setId(seq.getNext(SEQ_NAME));
372                if (log.isDebugEnabled()) {
373                    log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ",
374                            entry.getId(), entry.getLogDate(), entry.getDocUUID()));
375                }
376                XContentBuilder builder = jsonBuilder();
377                JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
378                AuditEntryJSONWriter.asJSON(jsonGen, entry);
379                bulkRequest.add(esClient.prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
380                        String.valueOf(entry.getId())).setSource(builder));
381            }
382
383            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
384            if (bulkResponse.hasFailures()) {
385                for (BulkItemResponse response : bulkResponse.getItems()) {
386                    if (response.isFailed()) {
387                        log.error("Unable to index audit entry " + response.getItemId() + " :"
388                                + response.getFailureMessage());
389                    }
390                }
391            }
392        } catch (IOException e) {
393            throw new NuxeoException("Error while indexing Audit entries", e);
394        }
395
396    }
397
398    @Override
399    public Long getEventsCount(String eventId) {
400        CountResponse res = esClient.prepareCount(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setQuery(
401                QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get();
402        return res.getCount();
403    }
404
405    @Override
406    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
407        return syncLogCreationEntries(provider, repoId, path, recurs);
408    }
409
410    protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
411
412        if (searchDocumentModel == null) {
413            return FilterBuilders.matchAllFilter();
414        }
415
416        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
417
418        int nbFilters = 0;
419
420        for (PredicateDefinition predicate : predicates) {
421
422            // extract data from DocumentModel
423            PredicateFieldDefinition[] fieldDef = predicate.getValues();
424            Object[] val = new Object[fieldDef.length];
425            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
426                if (fieldDef[fidx].getXpath() != null) {
427                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
428                } else {
429                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
430                }
431            }
432
433            if (!isNonNullParam(val)) {
434                // skip predicate where all values are null
435                continue;
436            }
437
438            nbFilters++;
439
440            String op = predicate.getOperator();
441            if (op.equalsIgnoreCase("IN")) {
442
443                String[] values = null;
444                if (val[0] instanceof Iterable<?>) {
445                    List<String> l = new ArrayList<>();
446                    Iterable<?> vals = (Iterable<?>) val[0];
447                    Iterator<?> valueIterator = vals.iterator();
448
449                    while (valueIterator.hasNext()) {
450
451                        Object v = valueIterator.next();
452                        if (v != null) {
453                            l.add(v.toString());
454                        }
455                    }
456                    values = l.toArray(new String[l.size()]);
457                } else if (val[0] instanceof Object[]) {
458                    values = (String[]) val[0];
459                }
460                filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values));
461            } else if (op.equalsIgnoreCase("BETWEEN")) {
462                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
463                if (val.length > 1) {
464                    filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1]));
465                }
466            } else if (">".equals(op)) {
467                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
468            } else if (">=".equals(op)) {
469                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0]));
470            } else if ("<".equals(op)) {
471                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0]));
472            } else if ("<=".equals(op)) {
473                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0]));
474            } else {
475                filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0]));
476            }
477        }
478
479        if (nbFilters == 0) {
480            return FilterBuilders.matchAllFilter();
481        }
482        return filterBuilder;
483    }
484
485    public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
486            DocumentModel searchDocumentModel) {
487        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
488        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
489        FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
490        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
491        return builder;
492    }
493
494    protected boolean isNonNullParam(Object[] val) {
495        if (val == null) {
496            return false;
497        }
498        for (Object v : val) {
499            if (v != null) {
500                if (v instanceof String) {
501                    if (!((String) v).isEmpty()) {
502                        return true;
503                    }
504                } else if (v instanceof String[]) {
505                    if (((String[]) v).length > 0) {
506                        return true;
507                    }
508                } else {
509                    return true;
510                }
511            }
512        }
513        return false;
514    }
515
516    public String migrate(final int batchSize) {
517
518        final String MIGRATION_WORK_ID = "AuditMigration";
519
520        WorkManager wm = Framework.getService(WorkManager.class);
521        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
522        if (migrationState != null) {
523            return "Migration already scheduled : " + migrationState.toString();
524        }
525
526        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
527        wm.schedule(migrationWork);
528        return "Migration work started : " + MIGRATION_WORK_ID;
529    }
530
531    protected void logSearchResponse(SearchResponse response) {
532        if (log.isDebugEnabled()) {
533            log.debug("Response: " + response.toString());
534        }
535    }
536
537    protected void logSearchRequest(SearchRequestBuilder request) {
538        if (log.isDebugEnabled()) {
539            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
540                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
541        }
542    }
543
544
545    /**
546     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
547     */
548    protected void ensureUIDSequencer(Client esClient) {
549        boolean auditIndexExists = esClient.admin().indices().prepareExists(getESIndexName()).execute().actionGet().isExists();
550        if (!auditIndexExists) {
551            return;
552        }
553
554        // Get max log entry id
555        SearchRequestBuilder builder = getSearchRequestBuilder(esClient);
556        builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id"));
557        SearchResponse searchResponse = builder.execute().actionGet();
558        Max agg = searchResponse.getAggregations().get("maxAgg");
559        int maxLogEntryId = (int) agg.getValue();
560
561        // Get next sequence id
562        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
563        UIDSequencer seq = uidGeneratorService.getSequencer();
564        int nextSequenceId = seq.getNext(SEQ_NAME);
565
566        // Increment sequence to max log entry id if needed
567        if (nextSequenceId < maxLogEntryId) {
568            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
569                    nextSequenceId, maxLogEntryId));
570            seq.initSequence(SEQ_NAME, maxLogEntryId);
571        }
572    }
573
574    @Override
575    public ExtendedInfo newExtendedInfo(Serializable value) {
576        return new ESExtendedInfo(value);
577    }
578
579    protected String getESIndexName() {
580        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
581        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
582    }
583
584}