001/*
002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Tiry
016 *     Benoit Delbosc
017 */
018package org.nuxeo.elasticsearch.audit;
019
020import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
021
022import java.io.IOException;
023import java.io.Serializable;
024import java.util.ArrayList;
025import java.util.Calendar;
026import java.util.Date;
027import java.util.HashMap;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.commons.collections.MapUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.codehaus.jackson.JsonFactory;
036import org.codehaus.jackson.JsonGenerator;
037import org.elasticsearch.action.bulk.BulkItemResponse;
038import org.elasticsearch.action.bulk.BulkRequestBuilder;
039import org.elasticsearch.action.bulk.BulkResponse;
040import org.elasticsearch.action.count.CountResponse;
041import org.elasticsearch.action.get.GetResponse;
042import org.elasticsearch.action.search.SearchRequestBuilder;
043import org.elasticsearch.action.search.SearchResponse;
044import org.elasticsearch.action.search.SearchType;
045import org.elasticsearch.client.Client;
046import org.elasticsearch.common.xcontent.XContentBuilder;
047import org.elasticsearch.index.query.BoolFilterBuilder;
048import org.elasticsearch.index.query.FilterBuilder;
049import org.elasticsearch.index.query.FilterBuilders;
050import org.elasticsearch.index.query.QueryBuilder;
051import org.elasticsearch.index.query.QueryBuilders;
052import org.elasticsearch.index.query.TermFilterBuilder;
053import org.elasticsearch.search.SearchHit;
054import org.elasticsearch.search.aggregations.AggregationBuilders;
055import org.elasticsearch.search.aggregations.metrics.max.Max;
056import org.elasticsearch.search.sort.SortOrder;
057import org.joda.time.DateTime;
058import org.joda.time.format.ISODateTimeFormat;
059import org.nuxeo.common.utils.TextTemplate;
060import org.nuxeo.ecm.core.api.DocumentModel;
061import org.nuxeo.ecm.core.api.NuxeoException;
062import org.nuxeo.ecm.core.uidgen.UIDGeneratorService;
063import org.nuxeo.ecm.core.uidgen.UIDSequencer;
064import org.nuxeo.ecm.core.work.api.Work;
065import org.nuxeo.ecm.core.work.api.Work.State;
066import org.nuxeo.ecm.core.work.api.WorkManager;
067import org.nuxeo.ecm.platform.audit.api.AuditReader;
068import org.nuxeo.ecm.platform.audit.api.ExtendedInfo;
069import org.nuxeo.ecm.platform.audit.api.FilterMapEntry;
070import org.nuxeo.ecm.platform.audit.api.LogEntry;
071import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException;
072import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser;
073import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend;
074import org.nuxeo.ecm.platform.audit.service.AuditBackend;
075import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider;
076import org.nuxeo.ecm.platform.query.api.PredicateDefinition;
077import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition;
078import org.nuxeo.elasticsearch.ElasticSearchConstants;
079import org.nuxeo.elasticsearch.api.ElasticSearchAdmin;
080import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader;
081import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter;
082import org.nuxeo.runtime.api.Framework;
083
084/**
085 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence
086 *
087 * @author tiry
088 */
089public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend {
090
091    public static final String SEQ_NAME = "audit";
092
093    public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration";
094
095    public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize";
096
097    public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone";
098
099    public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000;
100
101    protected Client esClient = null;
102
103    protected static final Log log = LogFactory.getLog(ESAuditBackend.class);
104
105    protected BaseLogEntryProvider provider = null;
106
107    protected Client getClient() {
108        if (esClient == null) {
109            log.info("Activate Elasticsearch backend for Audit");
110            ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
111            esClient = esa.getClient();
112            ensureUIDSequencer(esClient);
113        }
114        return esClient;
115    }
116
117    protected boolean isMigrationDone() {
118        AuditReader reader = Framework.getService(AuditReader.class);
119        List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null);
120        return !entries.isEmpty();
121    }
122
123    @Override
124    public void deactivate() {
125        if (esClient != null) {
126            esClient.close();
127        }
128    }
129
130    @Override
131    public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) {
132        SearchRequestBuilder builder = getSearchRequestBuilder();
133        TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid);
134        FilterBuilder filter;
135        if (MapUtils.isEmpty(filterMap)) {
136            filter = docFilter;
137        } else {
138            filter = FilterBuilders.boolFilter();
139            ((BoolFilterBuilder) filter).must(docFilter);
140            for (String key : filterMap.keySet()) {
141                FilterMapEntry entry = filterMap.get(key);
142                ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject()));
143            }
144        }
145        builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE);
146        if (doDefaultSort) {
147            builder.addSort("eventDate", SortOrder.DESC);
148        }
149        logSearchRequest(builder);
150        SearchResponse searchResponse = builder.get();
151        logSearchResponse(searchResponse);
152        return buildLogEntries(searchResponse);
153    }
154
155    protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) {
156        List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length);
157        for (SearchHit hit : searchResponse.getHits()) {
158            try {
159                entries.add(AuditEntryJSONReader.read(hit.getSourceAsString()));
160            } catch (IOException e) {
161                log.error("Error while reading Audit Entry from ES", e);
162            }
163        }
164        return entries;
165    }
166
167    protected SearchRequestBuilder getSearchRequestBuilder() {
168        return getClient().prepareSearch(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setSearchType(
169                SearchType.DFS_QUERY_THEN_FETCH);
170    }
171
172    @Override
173    public LogEntry getLogEntryByID(long id) {
174        GetResponse ret = getClient().prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
175                String.valueOf(id)).get();
176        if (!ret.isExists()) {
177            return null;
178        }
179        try {
180            return AuditEntryJSONReader.read(ret.getSourceAsString());
181        } catch (IOException e) {
182            throw new RuntimeException("Unable to read Entry for id " + id, e);
183        }
184    }
185
186    public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) {
187        if (params != null && params.size() > 0) {
188            query = expandQueryVariables(query, params);
189        }
190        SearchRequestBuilder builder = getSearchRequestBuilder();
191        builder.setQuery(query);
192        return builder;
193    }
194
195    public String expandQueryVariables(String query, Object[] params) {
196        Map<String, Object> qParams = new HashMap<>();
197        for (int i = 0; i < params.length; i++) {
198            query = query.replaceFirst("\\?", "\\${param" + i + "}");
199            qParams.put("param" + i, params[i]);
200        }
201        return expandQueryVariables(query, qParams);
202    }
203
204    public String expandQueryVariables(String query, Map<String, Object> params) {
205        if (params != null && params.size() > 0) {
206            TextTemplate tmpl = new TextTemplate();
207            for (String key : params.keySet()) {
208                Object val = params.get(key);
209                if (val == null) {
210                    continue;
211                } else if (val instanceof Calendar) {
212                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
213                } else if (val instanceof Date) {
214                    tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val)));
215                } else {
216                    tmpl.setVariable(key, val.toString());
217                }
218            }
219            query = tmpl.processText(query);
220        }
221        return query;
222    }
223
224    @Override
225    public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) {
226        SearchRequestBuilder builder = buildQuery(query, params);
227        if (pageNb > 0) {
228            builder.setFrom(pageNb * pageSize);
229        }
230        if (pageSize > 0) {
231            builder.setSize(pageSize);
232        }
233        logSearchRequest(builder);
234        SearchResponse searchResponse = builder.get();
235        logSearchResponse(searchResponse);
236        return buildLogEntries(searchResponse);
237    }
238
239    @Override
240    public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb,
241            int pageSize) {
242        SearchRequestBuilder builder = getSearchRequestBuilder();
243        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
244        if (eventIds != null && eventIds.length > 0) {
245            if (eventIds.length == 1) {
246                filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0]));
247            } else {
248                filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds));
249            }
250        }
251        if (categories != null && categories.length > 0) {
252            if (categories.length == 1) {
253                filterBuilder.must(FilterBuilders.termFilter("category", categories[0]));
254            } else {
255                filterBuilder.must(FilterBuilders.termsFilter("category", categories));
256            }
257        }
258        if (path != null) {
259            filterBuilder.must(FilterBuilders.termFilter("docPath", path));
260        }
261
262        if (limit != null) {
263            filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit));
264        }
265
266        builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder));
267
268        if (pageNb > 0) {
269            builder.setFrom(pageNb * pageSize);
270        }
271        if (pageSize > 0) {
272            builder.setSize(pageSize);
273        } else {
274            builder.setSize(Integer.MAX_VALUE);
275        }
276        logSearchRequest(builder);
277        SearchResponse searchResponse = builder.get();
278        logSearchResponse(searchResponse);
279        return buildLogEntries(searchResponse);
280    }
281
282    @Override
283    public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path,
284            int pageNb, int pageSize) {
285
286        Date limit = null;
287        if (dateRange != null) {
288            try {
289                limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange);
290            } catch (AuditQueryException aqe) {
291                aqe.addInfo("Wrong date range query. Query was " + dateRange);
292                throw aqe;
293            }
294        }
295        return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize);
296    }
297
298    @Override
299    public void addLogEntries(List<LogEntry> entries) {
300
301        BulkRequestBuilder bulkRequest = getClient().prepareBulk();
302        JsonFactory factory = new JsonFactory();
303
304        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
305        UIDSequencer seq = uidGeneratorService.getSequencer();
306
307        try {
308
309            for (LogEntry entry : entries) {
310                entry.setId(seq.getNext(SEQ_NAME));
311                if (log.isDebugEnabled()) {
312                    log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ",
313                            entry.getId(), entry.getLogDate(), entry.getDocUUID()));
314                }
315                XContentBuilder builder = jsonBuilder();
316                JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream());
317                AuditEntryJSONWriter.asJSON(jsonGen, entry);
318                bulkRequest.add(getClient().prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE,
319                        String.valueOf(entry.getId())).setSource(builder));
320            }
321
322            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
323            if (bulkResponse.hasFailures()) {
324                for (BulkItemResponse response : bulkResponse.getItems()) {
325                    if (response.isFailed()) {
326                        log.error("Unable to index audit entry " + response.getItemId() + " :"
327                                + response.getFailureMessage());
328                    }
329                }
330            }
331        } catch (IOException e) {
332            throw new NuxeoException("Error while indexing Audit entries", e);
333        }
334
335    }
336
337    @Override
338    public Long getEventsCount(String eventId) {
339        CountResponse res = getClient().prepareCount(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setQuery(
340                QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get();
341        return res.getCount();
342    }
343
344    protected BaseLogEntryProvider getProvider() {
345
346        if (provider == null) {
347            provider = new BaseLogEntryProvider() {
348
349                @Override
350                public int removeEntries(String eventId, String pathPattern) {
351                    throw new UnsupportedOperationException("Not implemented yet!");
352                }
353
354                @Override
355                public void addLogEntry(LogEntry logEntry) {
356                    List<LogEntry> entries = new ArrayList<>();
357                    entries.add(logEntry);
358                    addLogEntries(entries);
359                }
360            };
361        }
362        return provider;
363    }
364
365    @Override
366    public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) {
367        return syncLogCreationEntries(getProvider(), repoId, path, recurs);
368    }
369
370    protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) {
371
372        if (searchDocumentModel == null) {
373            return FilterBuilders.matchAllFilter();
374        }
375
376        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter();
377
378        int nbFilters = 0;
379
380        for (PredicateDefinition predicate : predicates) {
381
382            // extract data from DocumentModel
383            PredicateFieldDefinition[] fieldDef = predicate.getValues();
384            Object[] val = new Object[fieldDef.length];
385            for (int fidx = 0; fidx < fieldDef.length; fidx++) {
386                if (fieldDef[fidx].getXpath() != null) {
387                    val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath());
388                } else {
389                    val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName());
390                }
391            }
392
393            if (!isNonNullParam(val)) {
394                // skip predicate where all values are null
395                continue;
396            }
397
398            nbFilters++;
399
400            String op = predicate.getOperator();
401            if (op.equalsIgnoreCase("IN")) {
402
403                String[] values = null;
404                if (val[0] instanceof Iterable<?>) {
405                    List<String> l = new ArrayList<>();
406                    Iterable<?> vals = (Iterable<?>) val[0];
407                    Iterator<?> valueIterator = vals.iterator();
408
409                    while (valueIterator.hasNext()) {
410
411                        Object v = valueIterator.next();
412                        if (v != null) {
413                            l.add(v.toString());
414                        }
415                    }
416                    values = l.toArray(new String[l.size()]);
417                } else if (val[0] instanceof Object[]) {
418                    values = (String[]) val[0];
419                }
420                filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values));
421            } else if (op.equalsIgnoreCase("BETWEEN")) {
422                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
423                if (val.length > 1) {
424                    filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1]));
425                }
426            } else if (">".equals(op)) {
427                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0]));
428            } else if (">=".equals(op)) {
429                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0]));
430            } else if ("<".equals(op)) {
431                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0]));
432            } else if ("<=".equals(op)) {
433                filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0]));
434            } else {
435                filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0]));
436            }
437        }
438
439        if (nbFilters == 0) {
440            return FilterBuilders.matchAllFilter();
441        }
442        return filterBuilder;
443    }
444
445    public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates,
446            DocumentModel searchDocumentModel) {
447        SearchRequestBuilder builder = getSearchRequestBuilder();
448        QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart);
449        FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel);
450        builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder));
451        return builder;
452    }
453
454    protected boolean isNonNullParam(Object[] val) {
455        if (val == null) {
456            return false;
457        }
458        for (Object v : val) {
459            if (v != null) {
460                if (v instanceof String) {
461                    if (!((String) v).isEmpty()) {
462                        return true;
463                    }
464                } else if (v instanceof String[]) {
465                    if (((String[]) v).length > 0) {
466                        return true;
467                    }
468                } else {
469                    return true;
470                }
471            }
472        }
473        return false;
474    }
475
476    public String migrate(final int batchSize) {
477
478        final String MIGRATION_WORK_ID = "AuditMigration";
479
480        WorkManager wm = Framework.getService(WorkManager.class);
481        State migrationState = wm.getWorkState(MIGRATION_WORK_ID);
482        if (migrationState != null) {
483            return "Migration already scheduled : " + migrationState.toString();
484        }
485
486        Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize);
487        wm.schedule(migrationWork);
488        return "Migration work started : " + MIGRATION_WORK_ID;
489    }
490
491    protected void logSearchResponse(SearchResponse response) {
492        if (log.isDebugEnabled()) {
493            log.debug("Response: " + response.toString());
494        }
495    }
496
497    protected void logSearchRequest(SearchRequestBuilder request) {
498        if (log.isDebugEnabled()) {
499            log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'",
500                    getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString()));
501        }
502    }
503
504    @Override
505    public void onApplicationStarted() {
506        if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) {
507            if (!isMigrationDone()) {
508                log.info(String.format(
509                        "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index",
510                        MIGRATION_FLAG_PROP));
511                // Drop audit index first in case of a previous bad migration
512                ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
513                esa.dropAndInitIndex(getESIndexName());
514                int batchSize = MIGRATION_DEFAULT_BACTH_SIZE;
515                String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP);
516                if (batchSizeProp != null) {
517                    batchSize = Integer.parseInt(batchSizeProp);
518                }
519                migrate(batchSize);
520            } else {
521                log.warn(String.format(
522                        "Property %s is true but migration is already done, please set this property to false",
523                        MIGRATION_FLAG_PROP));
524            }
525        } else {
526            log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP));
527        }
528    }
529
530    /**
531     * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id.
532     */
533    protected void ensureUIDSequencer(Client esClient) {
534        boolean auditIndexExists = esClient.admin().indices().prepareExists(getESIndexName()).execute().actionGet().isExists();
535        if (!auditIndexExists) {
536            return;
537        }
538
539        // Get max log entry id
540        SearchRequestBuilder builder = getSearchRequestBuilder();
541        builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id"));
542        SearchResponse searchResponse = builder.execute().actionGet();
543        Max agg = searchResponse.getAggregations().get("maxAgg");
544        int maxLogEntryId = (int) agg.getValue();
545
546        // Get next sequence id
547        UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class);
548        UIDSequencer seq = uidGeneratorService.getSequencer();
549        int nextSequenceId = seq.getNext(SEQ_NAME);
550
551        // Increment sequence to max log entry id if needed
552        if (nextSequenceId < maxLogEntryId) {
553            log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME,
554                    nextSequenceId, maxLogEntryId));
555            seq.initSequence(SEQ_NAME, maxLogEntryId);
556        }
557    }
558
559    @Override
560    public ExtendedInfo newExtendedInfo(Serializable value) {
561        return new ESExtendedInfo(value);
562    }
563
564    protected String getESIndexName() {
565        ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class);
566        return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE);
567    }
568}