001/*
002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Tiry
018 *     Benoit Delbosc
019 */
020package org.nuxeo.elasticsearch.audit;
021
022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
023
024import java.io.IOException;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Calendar;
028import java.util.Date;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033
034import org.apache.commons.collections.MapUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.codehaus.jackson.JsonFactory;
038import org.codehaus.jackson.JsonGenerator;
039import org.elasticsearch.action.bulk.BulkItemResponse;
040import org.elasticsearch.action.bulk.BulkRequestBuilder;
041import org.elasticsearch.action.bulk.BulkResponse;
042import org.elasticsearch.action.count.CountResponse;
043import org.elasticsearch.action.get.GetResponse;
044import org.elasticsearch.action.search.SearchRequestBuilder;
045import org.elasticsearch.action.search.SearchResponse;
046import org.elasticsearch.action.search.SearchType;
047import org.elasticsearch.client.Client;
048import org.elasticsearch.common.xcontent.XContentBuilder;
049import org.elasticsearch.index.query.BoolFilterBuilder;
050import org.elasticsearch.index.query.FilterBuilder;
051import org.elasticsearch.index.query.FilterBuilders;
052import org.elasticsearch.index.query.QueryBuilder;
053import org.elasticsearch.index.query.QueryBuilders;
054import org.elasticsearch.index.query.TermFilterBuilder;
055import org.elasticsearch.search.SearchHit;
056import org.elasticsearch.search.aggregations.AggregationBuilders;
057import org.elasticsearch.search.aggregations.metrics.max.Max;
058import org.elasticsearch.search.sort.SortOrder;
059import org.joda.time.DateTime;
060import org.joda.time.format.ISODateTimeFormat;
061import org.nuxeo.common.utils.TextTemplate;
062import org.nuxeo.ecm.core.api.DocumentModel;
063import org.nuxeo.ecm.core.api.NuxeoException;
064import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
065import org.nuxeo.ecm.core.uidgen.UIDSequencer;
066import org.nuxeo.ecm.core.work.api.Work;
067import org.nuxeo.ecm.core.work.api.Work.State;
068import org.nuxeo.ecm.core.work.api.WorkManager;
069import org.nuxeo.ecm.platform.audit.api.AuditReader;
070import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
071import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
072import org.nuxeo.ecm.platform.audit.api.LogEntry;
073import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException;
074import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser;
075import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
076import org.nuxeo.ecm.platform.audit.service.AuditBackend;
077import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
078import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
079import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
080import org.nuxeo.elasticsearch.ElasticSearchConstants;
081import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
082import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
083import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter;
084import org.nuxeo.runtime.api.Framework;
085
086/**
087 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
088 *
089 * @author tiry
090 */
091public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
092
093    public static final String SEQ_NAME = "audit";
094
095    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
096
097    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
098
099    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
100
101    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
102
103    protected Client esClient = null;
104
105    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
106
107    protected BaseLogEntryProvider provider = null;
108
109    protected Client getClient() {
110        if (esClient == null) {
111            log.info("Activate Elasticsearch backend for Audit");
112            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
113            esClient = esa.getClient();
114            ensureUIDSequencer(esClient);
115        }
116        return esClient;
117    }
118
119    protected boolean isMigrationDone() {
120        AuditReader reader = Framework.getService(AuditReader.class);
121        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
122        return !entries.isEmpty();
123    }
124
125    @Override
126    public void deactivate() {
127        if (esClient != null) {
128            esClient.close();
129        }
130    }
131
132    @Override
133    public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) {
134        SearchRequestBuilder builder = getSearchRequestBuilder();
135        TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid);
136        FilterBuilder filter;
137        if (MapUtils.isEmpty(filterMap)) {
138            filter = docFilter;
139        } else {
140            filter = FilterBuilders.boolFilter();
141            ((BoolFilterBuilder) filter).must(docFilter);
142            for (String key : filterMap.keySet()) {
143                FilterMapEntry entry = filterMap.get(key);
144                ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject()));
145            }
146        }
147        builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE);
148        if (doDefaultSort) {
149            builder.addSort("eventDate", SortOrder.DESC);
150        }
151        logSearchRequest(builder);
152        SearchResponse searchResponse = builder.get();
153        logSearchResponse(searchResponse);
154        return buildLogEntries(searchResponse);
155    }
156
157    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
158        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
159        for (SearchHit hit : searchResponse.getHits()) {
160            try {
161                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
162            } catch (IOException e) {
163                log.error("Error while reading Audit Entry from ES", e);
164            }
165        }
166        return entries;
167    }
168
169    protected SearchRequestBuilder getSearchRequestBuilder() {
170        return getClient().prepareSearch(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setSearchType(
171                SearchType.DFS_QUERY_THEN_FETCH);
172    }
173
174    @Override
175    public LogEntry getLogEntryByID(long id) {
176        GetResponse ret = getClient().prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
177                String.valueOf(id)).get();
178        if (!ret.isExists()) {
179            return null;
180        }
181        try {
182            return AuditEntryJSONReader.read(ret.getSourceAsString());
183        } catch (IOException e) {
184            throw new RuntimeException("Unable to read Entry for id " + id, e);
185        }
186    }
187
188    public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) {
189        if (params != null && params.size() > 0) {
190            query = expandQueryVariables(query, params);
191        }
192        SearchRequestBuilder builder = getSearchRequestBuilder();
193        builder.setQuery(query);
194        return builder;
195    }
196
197    public String expandQueryVariables(String query, Object[] params) {
198        Map<String, Object> qParams = new HashMap<>();
199        for (int i = 0; i < params.length; i++) {
200            query = query.replaceFirst("\\?", "\\${param" + i + "}");
201            qParams.put("param" + i, params[i]);
202        }
203        return expandQueryVariables(query, qParams);
204    }
205
206    public String expandQueryVariables(String query, Map<String, Object> params) {
207        if (params != null && params.size() > 0) {
208            TextTemplate tmpl = new TextTemplate();
209            for (String key : params.keySet()) {
210                Object val = params.get(key);
211                if (val == null) {
212                    continue;
213                } else if (val instanceof Calendar) {
214                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
215                } else if (val instanceof Date) {
216                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
217                } else {
218                    tmpl.setVariable(key, val.toString());
219                }
220            }
221            query = tmpl.processText(query);
222        }
223        return query;
224    }
225
226    @Override
227    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
228        SearchRequestBuilder builder = buildQuery(query, params);
229        if (pageNb > 0) {
230            builder.setFrom(pageNb * pageSize);
231        }
232        if (pageSize > 0) {
233            builder.setSize(pageSize);
234        }
235        logSearchRequest(builder);
236        SearchResponse searchResponse = builder.get();
237        logSearchResponse(searchResponse);
238        return buildLogEntries(searchResponse);
239    }
240
241    @Override
242    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
243            int pageSize) {
244        SearchRequestBuilder builder = getSearchRequestBuilder();
245        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
246        if (eventIds != null && eventIds.length > 0) {
247            if (eventIds.length == 1) {
248                filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
249            } else {
250                filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
251            }
252        }
253        if (categories != null && categories.length > 0) {
254            if (categories.length == 1) {
255                filterBuilder.must(FilterBuilders.termFilter("category", categories[0]));
256            } else {
257                filterBuilder.must(FilterBuilders.termsFilter("category", categories));
258            }
259        }
260        if (path != null) {
261            filterBuilder.must(FilterBuilders.termFilter("docPath", path));
262        }
263
264        if (limit != null) {
265            filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit));
266        }
267
268        builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder));
269
270        if (pageNb > 0) {
271            builder.setFrom(pageNb * pageSize);
272        }
273        if (pageSize > 0) {
274            builder.setSize(pageSize);
275        } else {
276            builder.setSize(Integer.MAX_VALUE);
277        }
278        logSearchRequest(builder);
279        SearchResponse searchResponse = builder.get();
280        logSearchResponse(searchResponse);
281        return buildLogEntries(searchResponse);
282    }
283
284    @Override
285    public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path,
286            int pageNb, int pageSize) {
287
288        Date limit = null;
289        if (dateRange != null) {
290            try {
291                limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange);
292            } catch (AuditQueryException aqe) {
293                aqe.addInfo("Wrong date range query. Query was " + dateRange);
294                throw aqe;
295            }
296        }
297        return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize);
298    }
299
300    @Override
301    public void addLogEntries(List<LogEntry> entries) {
302
303        BulkRequestBuilder bulkRequest = getClient().prepareBulk();
304        JsonFactory factory = new JsonFactory();
305
306        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
307        UIDSequencer seq = uidGeneratorService.getSequencer();
308
309        try {
310
311            for (LogEntry entry : entries) {
312                entry.setId(seq.getNext(SEQ_NAME));
313                if (log.isDebugEnabled()) {
314                    log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ",
315                            entry.getId(), entry.getLogDate(), entry.getDocUUID()));
316                }
317                XContentBuilder builder = jsonBuilder();
318                JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
319                AuditEntryJSONWriter.asJSON(jsonGen, entry);
320                bulkRequest.add(getClient().prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
321                        String.valueOf(entry.getId())).setSource(builder));
322            }
323
324            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
325            if (bulkResponse.hasFailures()) {
326                for (BulkItemResponse response : bulkResponse.getItems()) {
327                    if (response.isFailed()) {
328                        log.error("Unable to index audit entry " + response.getItemId() + " :"
329                                + response.getFailureMessage());
330                    }
331                }
332            }
333        } catch (IOException e) {
334            throw new NuxeoException("Error while indexing Audit entries", e);
335        }
336
337    }
338
339    @Override
340    public Long getEventsCount(String eventId) {
341        CountResponse res = getClient().prepareCount(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setQuery(
342                QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get();
343        return res.getCount();
344    }
345
346    protected BaseLogEntryProvider getProvider() {
347
348        if (provider == null) {
349            provider = new BaseLogEntryProvider() {
350
351                @Override
352                public int removeEntries(String eventId, String pathPattern) {
353                    throw new UnsupportedOperationException("Not implemented yet!");
354                }
355
356                @Override
357                public void addLogEntry(LogEntry logEntry) {
358                    List<LogEntry> entries = new ArrayList<>();
359                    entries.add(logEntry);
360                    addLogEntries(entries);
361                }
362            };
363        }
364        return provider;
365    }
366
367    @Override
368    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
369        return syncLogCreationEntries(getProvider(), repoId, path, recurs);
370    }
371
372    protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
373
374        if (searchDocumentModel == null) {
375            return FilterBuilders.matchAllFilter();
376        }
377
378        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
379
380        int nbFilters = 0;
381
382        for (PredicateDefinition predicate : predicates) {
383
384            // extract data from DocumentModel
385            PredicateFieldDefinition[] fieldDef = predicate.getValues();
386            Object[] val = new Object[fieldDef.length];
387            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
388                if (fieldDef[fidx].getXpath() != null) {
389                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
390                } else {
391                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
392                }
393            }
394
395            if (!isNonNullParam(val)) {
396                // skip predicate where all values are null
397                continue;
398            }
399
400            nbFilters++;
401
402            String op = predicate.getOperator();
403            if (op.equalsIgnoreCase("IN")) {
404
405                String[] values = null;
406                if (val[0] instanceof Iterable<?>) {
407                    List<String> l = new ArrayList<>();
408                    Iterable<?> vals = (Iterable<?>) val[0];
409                    Iterator<?> valueIterator = vals.iterator();
410
411                    while (valueIterator.hasNext()) {
412
413                        Object v = valueIterator.next();
414                        if (v != null) {
415                            l.add(v.toString());
416                        }
417                    }
418                    values = l.toArray(new String[l.size()]);
419                } else if (val[0] instanceof Object[]) {
420                    values = (String[]) val[0];
421                }
422                filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values));
423            } else if (op.equalsIgnoreCase("BETWEEN")) {
424                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
425                if (val.length > 1) {
426                    filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1]));
427                }
428            } else if (">".equals(op)) {
429                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
430            } else if (">=".equals(op)) {
431                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0]));
432            } else if ("<".equals(op)) {
433                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0]));
434            } else if ("<=".equals(op)) {
435                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0]));
436            } else {
437                filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0]));
438            }
439        }
440
441        if (nbFilters == 0) {
442            return FilterBuilders.matchAllFilter();
443        }
444        return filterBuilder;
445    }
446
447    public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
448            DocumentModel searchDocumentModel) {
449        SearchRequestBuilder builder = getSearchRequestBuilder();
450        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
451        FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
452        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
453        return builder;
454    }
455
456    protected boolean isNonNullParam(Object[] val) {
457        if (val == null) {
458            return false;
459        }
460        for (Object v : val) {
461            if (v != null) {
462                if (v instanceof String) {
463                    if (!((String) v).isEmpty()) {
464                        return true;
465                    }
466                } else if (v instanceof String[]) {
467                    if (((String[]) v).length > 0) {
468                        return true;
469                    }
470                } else {
471                    return true;
472                }
473            }
474        }
475        return false;
476    }
477
478    public String migrate(final int batchSize) {
479
480        final String MIGRATION_WORK_ID = "AuditMigration";
481
482        WorkManager wm = Framework.getService(WorkManager.class);
483        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
484        if (migrationState != null) {
485            return "Migration already scheduled : " + migrationState.toString();
486        }
487
488        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
489        wm.schedule(migrationWork);
490        return "Migration work started : " + MIGRATION_WORK_ID;
491    }
492
493    protected void logSearchResponse(SearchResponse response) {
494        if (log.isDebugEnabled()) {
495            log.debug("Response: " + response.toString());
496        }
497    }
498
499    protected void logSearchRequest(SearchRequestBuilder request) {
500        if (log.isDebugEnabled()) {
501            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
502                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
503        }
504    }
505
506    @Override
507    public void onApplicationStarted() {
508        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
509            if (!isMigrationDone()) {
510                log.info(String.format(
511                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
512                        MIGRATION_FLAG_PROP));
513                // Drop audit index first in case of a previous bad migration
514                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
515                esa.dropAndInitIndex(getESIndexName());
516                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
517                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
518                if (batchSizeProp != null) {
519                    batchSize = Integer.parseInt(batchSizeProp);
520                }
521                migrate(batchSize);
522            } else {
523                log.warn(String.format(
524                        "Property %s is true but migration is already done, please set this property to false",
525                        MIGRATION_FLAG_PROP));
526            }
527        } else {
528            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
529        }
530    }
531
532    /**
533     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
534     */
535    protected void ensureUIDSequencer(Client esClient) {
536        boolean auditIndexExists = esClient.admin().indices().prepareExists(getESIndexName()).execute().actionGet().isExists();
537        if (!auditIndexExists) {
538            return;
539        }
540
541        // Get max log entry id
542        SearchRequestBuilder builder = getSearchRequestBuilder();
543        builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id"));
544        SearchResponse searchResponse = builder.execute().actionGet();
545        Max agg = searchResponse.getAggregations().get("maxAgg");
546        int maxLogEntryId = (int) agg.getValue();
547
548        // Get next sequence id
549        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
550        UIDSequencer seq = uidGeneratorService.getSequencer();
551        int nextSequenceId = seq.getNext(SEQ_NAME);
552
553        // Increment sequence to max log entry id if needed
554        if (nextSequenceId < maxLogEntryId) {
555            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
556                    nextSequenceId, maxLogEntryId));
557            seq.initSequence(SEQ_NAME, maxLogEntryId);
558        }
559    }
560
561    @Override
562    public ExtendedInfo newExtendedInfo(Serializable value) {
563        return new ESExtendedInfo(value);
564    }
565
566    protected String getESIndexName() {
567        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
568        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
569    }
570}