001/* 002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.audit; 021 022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Calendar; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.collections.MapUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.codehaus.jackson.JsonFactory; 038import org.codehaus.jackson.JsonGenerator; 039import org.elasticsearch.action.bulk.BulkItemResponse; 040import org.elasticsearch.action.bulk.BulkRequestBuilder; 041import org.elasticsearch.action.bulk.BulkResponse; 042import org.elasticsearch.action.count.CountResponse; 043import org.elasticsearch.action.get.GetResponse; 044import org.elasticsearch.action.search.SearchRequestBuilder; 045import org.elasticsearch.action.search.SearchResponse; 046import org.elasticsearch.action.search.SearchType; 047import org.elasticsearch.client.Client; 048import org.elasticsearch.common.xcontent.XContentBuilder; 049import org.elasticsearch.index.query.BoolFilterBuilder; 050import org.elasticsearch.index.query.FilterBuilder; 051import org.elasticsearch.index.query.FilterBuilders; 052import org.elasticsearch.index.query.QueryBuilder; 053import org.elasticsearch.index.query.QueryBuilders; 054import org.elasticsearch.index.query.TermFilterBuilder; 055import org.elasticsearch.search.SearchHit; 056import org.elasticsearch.search.aggregations.AggregationBuilders; 057import org.elasticsearch.search.aggregations.metrics.max.Max; 058import org.elasticsearch.search.sort.SortOrder; 059import org.joda.time.DateTime; 060import org.joda.time.format.ISODateTimeFormat; 061import org.nuxeo.common.utils.TextTemplate; 062import org.nuxeo.ecm.core.api.DocumentModel; 063import org.nuxeo.ecm.core.api.NuxeoException; 064import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 065import org.nuxeo.ecm.core.uidgen.UIDSequencer; 066import org.nuxeo.ecm.core.work.api.Work; 067import org.nuxeo.ecm.core.work.api.Work.State; 068import org.nuxeo.ecm.core.work.api.WorkManager; 069import org.nuxeo.ecm.platform.audit.api.AuditReader; 070import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 071import org.nuxeo.ecm.platform.audit.api.FilterMapEntry; 072import org.nuxeo.ecm.platform.audit.api.LogEntry; 073import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException; 074import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser; 075import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 076import org.nuxeo.ecm.platform.audit.service.AuditBackend; 077import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 078import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 079import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 080import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 081import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 082import org.nuxeo.elasticsearch.ElasticSearchConstants; 083import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 084import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 085import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter; 086import org.nuxeo.runtime.api.Framework; 087import org.nuxeo.runtime.model.DefaultComponent; 088 089/** 090 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 091 * 092 * @author tiry 093 */ 094public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 095 096 public static final String SEQ_NAME = "audit"; 097 098 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 099 100 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 101 102 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 103 104 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 105 106 public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 107 super(component, config); 108 } 109 110 protected Client esClient; 111 112 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 113 114 protected BaseLogEntryProvider provider = new BaseLogEntryProvider() { 115 116 @Override 117 public int removeEntries(String eventId, String pathPattern) { 118 throw new UnsupportedOperationException("Not implemented yet!"); 119 } 120 121 @Override 122 public void addLogEntry(LogEntry logEntry) { 123 List<LogEntry> entries = new ArrayList<>(); 124 entries.add(logEntry); 125 addLogEntries(entries); 126 } 127 }; 128 129 protected Client getClient() { 130 log.info("Activate Elasticsearch backend for Audit"); 131 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 132 Client client = esa.getClient(); 133 ensureUIDSequencer(client); 134 return client; 135 } 136 137 protected boolean isMigrationDone() { 138 AuditReader reader = Framework.getService(AuditReader.class); 139 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 140 return !entries.isEmpty(); 141 } 142 143 144 @Override 145 public int getApplicationStartedOrder() { 146 return ((DefaultComponent)Framework.getRuntime().getComponent("org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder()+1; 147 } 148 149 @Override 150 public void onApplicationStarted() { 151 esClient = getClient(); 152 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 153 if (!isMigrationDone()) { 154 log.info(String.format( 155 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 156 MIGRATION_FLAG_PROP)); 157 // Drop audit index first in case of a previous bad migration 158 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 159 esa.dropAndInitIndex(getESIndexName()); 160 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 161 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 162 if (batchSizeProp != null) { 163 batchSize = Integer.parseInt(batchSizeProp); 164 } 165 migrate(batchSize); 166 } else { 167 log.warn(String.format( 168 "Property %s is true but migration is already done, please set this property to false", 169 MIGRATION_FLAG_PROP)); 170 } 171 } else { 172 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 173 } 174 } 175 176 @Override 177 public void onShutdown() { 178 if (esClient != null) { 179 try { 180 esClient.close(); 181 } finally { 182 esClient = null; 183 } 184 } 185 } 186 187 @Override 188 public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) { 189 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 190 TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid); 191 FilterBuilder filter; 192 if (MapUtils.isEmpty(filterMap)) { 193 filter = docFilter; 194 } else { 195 filter = FilterBuilders.boolFilter(); 196 ((BoolFilterBuilder) filter).must(docFilter); 197 for (String key : filterMap.keySet()) { 198 FilterMapEntry entry = filterMap.get(key); 199 ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject())); 200 } 201 } 202 builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE); 203 if (doDefaultSort) { 204 builder.addSort("eventDate", SortOrder.DESC); 205 } 206 logSearchRequest(builder); 207 SearchResponse searchResponse = builder.get(); 208 logSearchResponse(searchResponse); 209 return buildLogEntries(searchResponse); 210 } 211 212 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 213 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 214 for (SearchHit hit : searchResponse.getHits()) { 215 try { 216 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 217 } catch (IOException e) { 218 log.error("Error while reading Audit Entry from ES", e); 219 } 220 } 221 return entries; 222 } 223 224 protected SearchRequestBuilder getSearchRequestBuilder(Client esClient) { 225 return esClient.prepareSearch(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setSearchType( 226 SearchType.DFS_QUERY_THEN_FETCH); 227 } 228 229 @Override 230 public LogEntry getLogEntryByID(long id) { 231 GetResponse ret = esClient.prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 232 String.valueOf(id)).get(); 233 if (!ret.isExists()) { 234 return null; 235 } 236 try { 237 return AuditEntryJSONReader.read(ret.getSourceAsString()); 238 } catch (IOException e) { 239 throw new RuntimeException("Unable to read Entry for id " + id, e); 240 } 241 } 242 243 public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) { 244 if (params != null && params.size() > 0) { 245 query = expandQueryVariables(query, params); 246 } 247 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 248 builder.setQuery(query); 249 return builder; 250 } 251 252 public String expandQueryVariables(String query, Object[] params) { 253 Map<String, Object> qParams = new HashMap<>(); 254 for (int i = 0; i < params.length; i++) { 255 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 256 qParams.put("param" + i, params[i]); 257 } 258 return expandQueryVariables(query, qParams); 259 } 260 261 public String expandQueryVariables(String query, Map<String, Object> params) { 262 if (params != null && params.size() > 0) { 263 TextTemplate tmpl = new TextTemplate(); 264 for (String key : params.keySet()) { 265 Object val = params.get(key); 266 if (val == null) { 267 continue; 268 } else if (val instanceof Calendar) { 269 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 270 } else if (val instanceof Date) { 271 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 272 } else { 273 tmpl.setVariable(key, val.toString()); 274 } 275 } 276 query = tmpl.processText(query); 277 } 278 return query; 279 } 280 281 @Override 282 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 283 SearchRequestBuilder builder = buildQuery(query, params); 284 if (pageNb > 0) { 285 builder.setFrom(pageNb * pageSize); 286 } 287 if (pageSize > 0) { 288 builder.setSize(pageSize); 289 } 290 logSearchRequest(builder); 291 SearchResponse searchResponse = builder.get(); 292 logSearchResponse(searchResponse); 293 return buildLogEntries(searchResponse); 294 } 295 296 @Override 297 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 298 int pageSize) { 299 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 300 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 301 if (eventIds != null && eventIds.length > 0) { 302 if (eventIds.length == 1) { 303 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 304 } else { 305 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 306 } 307 } 308 if (categories != null && categories.length > 0) { 309 if (categories.length == 1) { 310 filterBuilder.must(FilterBuilders.termFilter("category", categories[0])); 311 } else { 312 filterBuilder.must(FilterBuilders.termsFilter("category", categories)); 313 } 314 } 315 if (path != null) { 316 filterBuilder.must(FilterBuilders.termFilter("docPath", path)); 317 } 318 319 if (limit != null) { 320 filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit)); 321 } 322 323 builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder)); 324 325 if (pageNb > 0) { 326 builder.setFrom(pageNb * pageSize); 327 } 328 if (pageSize > 0) { 329 builder.setSize(pageSize); 330 } else { 331 builder.setSize(Integer.MAX_VALUE); 332 } 333 logSearchRequest(builder); 334 SearchResponse searchResponse = builder.get(); 335 logSearchResponse(searchResponse); 336 return buildLogEntries(searchResponse); 337 } 338 339 @Override 340 public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path, 341 int pageNb, int pageSize) { 342 343 Date limit = null; 344 if (dateRange != null) { 345 try { 346 limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange); 347 } catch (AuditQueryException aqe) { 348 aqe.addInfo("Wrong date range query. Query was " + dateRange); 349 throw aqe; 350 } 351 } 352 return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize); 353 } 354 355 @Override 356 public void addLogEntries(List<LogEntry> entries) { 357 358 if (entries.isEmpty()) { 359 return; 360 } 361 362 BulkRequestBuilder bulkRequest = esClient.prepareBulk(); 363 JsonFactory factory = new JsonFactory(); 364 365 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 366 UIDSequencer seq = uidGeneratorService.getSequencer(); 367 368 try { 369 370 for (LogEntry entry : entries) { 371 entry.setId(seq.getNext(SEQ_NAME)); 372 if (log.isDebugEnabled()) { 373 log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ", 374 entry.getId(), entry.getLogDate(), entry.getDocUUID())); 375 } 376 XContentBuilder builder = jsonBuilder(); 377 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 378 AuditEntryJSONWriter.asJSON(jsonGen, entry); 379 bulkRequest.add(esClient.prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 380 String.valueOf(entry.getId())).setSource(builder)); 381 } 382 383 BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 384 if (bulkResponse.hasFailures()) { 385 for (BulkItemResponse response : bulkResponse.getItems()) { 386 if (response.isFailed()) { 387 log.error("Unable to index audit entry " + response.getItemId() + " :" 388 + response.getFailureMessage()); 389 } 390 } 391 } 392 } catch (IOException e) { 393 throw new NuxeoException("Error while indexing Audit entries", e); 394 } 395 396 } 397 398 @Override 399 public Long getEventsCount(String eventId) { 400 CountResponse res = esClient.prepareCount(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setQuery( 401 QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get(); 402 return res.getCount(); 403 } 404 405 @Override 406 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 407 return syncLogCreationEntries(provider, repoId, path, recurs); 408 } 409 410 protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 411 412 if (searchDocumentModel == null) { 413 return FilterBuilders.matchAllFilter(); 414 } 415 416 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 417 418 int nbFilters = 0; 419 420 for (PredicateDefinition predicate : predicates) { 421 422 // extract data from DocumentModel 423 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 424 Object[] val = new Object[fieldDef.length]; 425 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 426 if (fieldDef[fidx].getXpath() != null) { 427 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 428 } else { 429 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 430 } 431 } 432 433 if (!isNonNullParam(val)) { 434 // skip predicate where all values are null 435 continue; 436 } 437 438 nbFilters++; 439 440 String op = predicate.getOperator(); 441 if (op.equalsIgnoreCase("IN")) { 442 443 String[] values = null; 444 if (val[0] instanceof Iterable<?>) { 445 List<String> l = new ArrayList<>(); 446 Iterable<?> vals = (Iterable<?>) val[0]; 447 Iterator<?> valueIterator = vals.iterator(); 448 449 while (valueIterator.hasNext()) { 450 451 Object v = valueIterator.next(); 452 if (v != null) { 453 l.add(v.toString()); 454 } 455 } 456 values = l.toArray(new String[l.size()]); 457 } else if (val[0] instanceof Object[]) { 458 values = (String[]) val[0]; 459 } 460 filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values)); 461 } else if (op.equalsIgnoreCase("BETWEEN")) { 462 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 463 if (val.length > 1) { 464 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1])); 465 } 466 } else if (">".equals(op)) { 467 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 468 } else if (">=".equals(op)) { 469 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0])); 470 } else if ("<".equals(op)) { 471 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0])); 472 } else if ("<=".equals(op)) { 473 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0])); 474 } else { 475 filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0])); 476 } 477 } 478 479 if (nbFilters == 0) { 480 return FilterBuilders.matchAllFilter(); 481 } 482 return filterBuilder; 483 } 484 485 public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 486 DocumentModel searchDocumentModel) { 487 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 488 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 489 FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 490 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 491 return builder; 492 } 493 494 protected boolean isNonNullParam(Object[] val) { 495 if (val == null) { 496 return false; 497 } 498 for (Object v : val) { 499 if (v != null) { 500 if (v instanceof String) { 501 if (!((String) v).isEmpty()) { 502 return true; 503 } 504 } else if (v instanceof String[]) { 505 if (((String[]) v).length > 0) { 506 return true; 507 } 508 } else { 509 return true; 510 } 511 } 512 } 513 return false; 514 } 515 516 public String migrate(final int batchSize) { 517 518 final String MIGRATION_WORK_ID = "AuditMigration"; 519 520 WorkManager wm = Framework.getService(WorkManager.class); 521 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 522 if (migrationState != null) { 523 return "Migration already scheduled : " + migrationState.toString(); 524 } 525 526 Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize); 527 wm.schedule(migrationWork); 528 return "Migration work started : " + MIGRATION_WORK_ID; 529 } 530 531 protected void logSearchResponse(SearchResponse response) { 532 if (log.isDebugEnabled()) { 533 log.debug("Response: " + response.toString()); 534 } 535 } 536 537 protected void logSearchRequest(SearchRequestBuilder request) { 538 if (log.isDebugEnabled()) { 539 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 540 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 541 } 542 } 543 544 545 /** 546 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 547 */ 548 protected void ensureUIDSequencer(Client esClient) { 549 boolean auditIndexExists = esClient.admin().indices().prepareExists(getESIndexName()).execute().actionGet().isExists(); 550 if (!auditIndexExists) { 551 return; 552 } 553 554 // Get max log entry id 555 SearchRequestBuilder builder = getSearchRequestBuilder(esClient); 556 builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id")); 557 SearchResponse searchResponse = builder.execute().actionGet(); 558 Max agg = searchResponse.getAggregations().get("maxAgg"); 559 int maxLogEntryId = (int) agg.getValue(); 560 561 // Get next sequence id 562 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 563 UIDSequencer seq = uidGeneratorService.getSequencer(); 564 int nextSequenceId = seq.getNext(SEQ_NAME); 565 566 // Increment sequence to max log entry id if needed 567 if (nextSequenceId < maxLogEntryId) { 568 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 569 nextSequenceId, maxLogEntryId)); 570 seq.initSequence(SEQ_NAME, maxLogEntryId); 571 } 572 } 573 574 @Override 575 public ExtendedInfo newExtendedInfo(Serializable value) { 576 return new ESExtendedInfo(value); 577 } 578 579 protected String getESIndexName() { 580 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 581 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 582 } 583 584}