001/* 002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.audit; 021 022import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 023 024import java.io.IOException; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Calendar; 028import java.util.Date; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033 034import org.apache.commons.collections.MapUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.codehaus.jackson.JsonFactory; 038import org.codehaus.jackson.JsonGenerator; 039import org.elasticsearch.action.bulk.BulkItemResponse; 040import org.elasticsearch.action.bulk.BulkRequestBuilder; 041import org.elasticsearch.action.bulk.BulkResponse; 042import org.elasticsearch.action.count.CountResponse; 043import org.elasticsearch.action.get.GetResponse; 044import org.elasticsearch.action.search.SearchRequestBuilder; 045import org.elasticsearch.action.search.SearchResponse; 046import org.elasticsearch.action.search.SearchType; 047import org.elasticsearch.client.Client; 048import org.elasticsearch.common.xcontent.XContentBuilder; 049import org.elasticsearch.index.query.BoolFilterBuilder; 050import org.elasticsearch.index.query.FilterBuilder; 051import org.elasticsearch.index.query.FilterBuilders; 052import org.elasticsearch.index.query.QueryBuilder; 053import org.elasticsearch.index.query.QueryBuilders; 054import org.elasticsearch.index.query.TermFilterBuilder; 055import org.elasticsearch.search.SearchHit; 056import org.elasticsearch.search.aggregations.AggregationBuilders; 057import org.elasticsearch.search.aggregations.metrics.max.Max; 058import org.elasticsearch.search.sort.SortOrder; 059import org.joda.time.DateTime; 060import org.joda.time.format.ISODateTimeFormat; 061import org.nuxeo.common.utils.TextTemplate; 062import org.nuxeo.ecm.core.api.DocumentModel; 063import org.nuxeo.ecm.core.api.NuxeoException; 064import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 065import org.nuxeo.ecm.core.uidgen.UIDSequencer; 066import org.nuxeo.ecm.core.work.api.Work; 067import org.nuxeo.ecm.core.work.api.Work.State; 068import org.nuxeo.ecm.core.work.api.WorkManager; 069import org.nuxeo.ecm.platform.audit.api.AuditReader; 070import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 071import org.nuxeo.ecm.platform.audit.api.FilterMapEntry; 072import org.nuxeo.ecm.platform.audit.api.LogEntry; 073import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException; 074import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser; 075import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 076import org.nuxeo.ecm.platform.audit.service.AuditBackend; 077import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 078import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 079import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 080import org.nuxeo.elasticsearch.ElasticSearchConstants; 081import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 082import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 083import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter; 084import org.nuxeo.runtime.api.Framework; 085 086/** 087 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 088 * 089 * @author tiry 090 */ 091public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 092 093 public static final String SEQ_NAME = "audit"; 094 095 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 096 097 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 098 099 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 100 101 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 102 103 protected Client esClient = null; 104 105 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 106 107 protected BaseLogEntryProvider provider = null; 108 109 protected Client getClient() { 110 if (esClient == null) { 111 log.info("Activate Elasticsearch backend for Audit"); 112 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 113 esClient = esa.getClient(); 114 ensureUIDSequencer(esClient); 115 } 116 return esClient; 117 } 118 119 protected boolean isMigrationDone() { 120 AuditReader reader = Framework.getService(AuditReader.class); 121 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 122 return !entries.isEmpty(); 123 } 124 125 @Override 126 public void deactivate() { 127 if (esClient != null) { 128 esClient.close(); 129 } 130 } 131 132 @Override 133 public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) { 134 SearchRequestBuilder builder = getSearchRequestBuilder(); 135 TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid); 136 FilterBuilder filter; 137 if (MapUtils.isEmpty(filterMap)) { 138 filter = docFilter; 139 } else { 140 filter = FilterBuilders.boolFilter(); 141 ((BoolFilterBuilder) filter).must(docFilter); 142 for (String key : filterMap.keySet()) { 143 FilterMapEntry entry = filterMap.get(key); 144 ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject())); 145 } 146 } 147 builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE); 148 if (doDefaultSort) { 149 builder.addSort("eventDate", SortOrder.DESC); 150 } 151 logSearchRequest(builder); 152 SearchResponse searchResponse = builder.get(); 153 logSearchResponse(searchResponse); 154 return buildLogEntries(searchResponse); 155 } 156 157 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 158 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 159 for (SearchHit hit : searchResponse.getHits()) { 160 try { 161 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 162 } catch (IOException e) { 163 log.error("Error while reading Audit Entry from ES", e); 164 } 165 } 166 return entries; 167 } 168 169 protected SearchRequestBuilder getSearchRequestBuilder() { 170 return getClient().prepareSearch(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setSearchType( 171 SearchType.DFS_QUERY_THEN_FETCH); 172 } 173 174 @Override 175 public LogEntry getLogEntryByID(long id) { 176 GetResponse ret = getClient().prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 177 String.valueOf(id)).get(); 178 if (!ret.isExists()) { 179 return null; 180 } 181 try { 182 return AuditEntryJSONReader.read(ret.getSourceAsString()); 183 } catch (IOException e) { 184 throw new RuntimeException("Unable to read Entry for id " + id, e); 185 } 186 } 187 188 public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) { 189 if (params != null && params.size() > 0) { 190 query = expandQueryVariables(query, params); 191 } 192 SearchRequestBuilder builder = getSearchRequestBuilder(); 193 builder.setQuery(query); 194 return builder; 195 } 196 197 public String expandQueryVariables(String query, Object[] params) { 198 Map<String, Object> qParams = new HashMap<>(); 199 for (int i = 0; i < params.length; i++) { 200 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 201 qParams.put("param" + i, params[i]); 202 } 203 return expandQueryVariables(query, qParams); 204 } 205 206 public String expandQueryVariables(String query, Map<String, Object> params) { 207 if (params != null && params.size() > 0) { 208 TextTemplate tmpl = new TextTemplate(); 209 for (String key : params.keySet()) { 210 Object val = params.get(key); 211 if (val == null) { 212 continue; 213 } else if (val instanceof Calendar) { 214 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 215 } else if (val instanceof Date) { 216 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 217 } else { 218 tmpl.setVariable(key, val.toString()); 219 } 220 } 221 query = tmpl.processText(query); 222 } 223 return query; 224 } 225 226 @Override 227 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 228 SearchRequestBuilder builder = buildQuery(query, params); 229 if (pageNb > 0) { 230 builder.setFrom(pageNb * pageSize); 231 } 232 if (pageSize > 0) { 233 builder.setSize(pageSize); 234 } 235 logSearchRequest(builder); 236 SearchResponse searchResponse = builder.get(); 237 logSearchResponse(searchResponse); 238 return buildLogEntries(searchResponse); 239 } 240 241 @Override 242 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 243 int pageSize) { 244 SearchRequestBuilder builder = getSearchRequestBuilder(); 245 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 246 if (eventIds != null && eventIds.length > 0) { 247 if (eventIds.length == 1) { 248 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 249 } else { 250 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 251 } 252 } 253 if (categories != null && categories.length > 0) { 254 if (categories.length == 1) { 255 filterBuilder.must(FilterBuilders.termFilter("category", categories[0])); 256 } else { 257 filterBuilder.must(FilterBuilders.termsFilter("category", categories)); 258 } 259 } 260 if (path != null) { 261 filterBuilder.must(FilterBuilders.termFilter("docPath", path)); 262 } 263 264 if (limit != null) { 265 filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit)); 266 } 267 268 builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder)); 269 270 if (pageNb > 0) { 271 builder.setFrom(pageNb * pageSize); 272 } 273 if (pageSize > 0) { 274 builder.setSize(pageSize); 275 } else { 276 builder.setSize(Integer.MAX_VALUE); 277 } 278 logSearchRequest(builder); 279 SearchResponse searchResponse = builder.get(); 280 logSearchResponse(searchResponse); 281 return buildLogEntries(searchResponse); 282 } 283 284 @Override 285 public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path, 286 int pageNb, int pageSize) { 287 288 Date limit = null; 289 if (dateRange != null) { 290 try { 291 limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange); 292 } catch (AuditQueryException aqe) { 293 aqe.addInfo("Wrong date range query. Query was " + dateRange); 294 throw aqe; 295 } 296 } 297 return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize); 298 } 299 300 @Override 301 public void addLogEntries(List<LogEntry> entries) { 302 303 BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 304 JsonFactory factory = new JsonFactory(); 305 306 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 307 UIDSequencer seq = uidGeneratorService.getSequencer(); 308 309 try { 310 311 for (LogEntry entry : entries) { 312 entry.setId(seq.getNext(SEQ_NAME)); 313 if (log.isDebugEnabled()) { 314 log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ", 315 entry.getId(), entry.getLogDate(), entry.getDocUUID())); 316 } 317 XContentBuilder builder = jsonBuilder(); 318 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 319 AuditEntryJSONWriter.asJSON(jsonGen, entry); 320 bulkRequest.add(getClient().prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 321 String.valueOf(entry.getId())).setSource(builder)); 322 } 323 324 BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 325 if (bulkResponse.hasFailures()) { 326 for (BulkItemResponse response : bulkResponse.getItems()) { 327 if (response.isFailed()) { 328 log.error("Unable to index audit entry " + response.getItemId() + " :" 329 + response.getFailureMessage()); 330 } 331 } 332 } 333 } catch (IOException e) { 334 throw new NuxeoException("Error while indexing Audit entries", e); 335 } 336 337 } 338 339 @Override 340 public Long getEventsCount(String eventId) { 341 CountResponse res = getClient().prepareCount(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setQuery( 342 QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get(); 343 return res.getCount(); 344 } 345 346 protected BaseLogEntryProvider getProvider() { 347 348 if (provider == null) { 349 provider = new BaseLogEntryProvider() { 350 351 @Override 352 public int removeEntries(String eventId, String pathPattern) { 353 throw new UnsupportedOperationException("Not implemented yet!"); 354 } 355 356 @Override 357 public void addLogEntry(LogEntry logEntry) { 358 List<LogEntry> entries = new ArrayList<>(); 359 entries.add(logEntry); 360 addLogEntries(entries); 361 } 362 }; 363 } 364 return provider; 365 } 366 367 @Override 368 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 369 return syncLogCreationEntries(getProvider(), repoId, path, recurs); 370 } 371 372 protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 373 374 if (searchDocumentModel == null) { 375 return FilterBuilders.matchAllFilter(); 376 } 377 378 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 379 380 int nbFilters = 0; 381 382 for (PredicateDefinition predicate : predicates) { 383 384 // extract data from DocumentModel 385 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 386 Object[] val = new Object[fieldDef.length]; 387 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 388 if (fieldDef[fidx].getXpath() != null) { 389 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 390 } else { 391 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 392 } 393 } 394 395 if (!isNonNullParam(val)) { 396 // skip predicate where all values are null 397 continue; 398 } 399 400 nbFilters++; 401 402 String op = predicate.getOperator(); 403 if (op.equalsIgnoreCase("IN")) { 404 405 String[] values = null; 406 if (val[0] instanceof Iterable<?>) { 407 List<String> l = new ArrayList<>(); 408 Iterable<?> vals = (Iterable<?>) val[0]; 409 Iterator<?> valueIterator = vals.iterator(); 410 411 while (valueIterator.hasNext()) { 412 413 Object v = valueIterator.next(); 414 if (v != null) { 415 l.add(v.toString()); 416 } 417 } 418 values = l.toArray(new String[l.size()]); 419 } else if (val[0] instanceof Object[]) { 420 values = (String[]) val[0]; 421 } 422 filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values)); 423 } else if (op.equalsIgnoreCase("BETWEEN")) { 424 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 425 if (val.length > 1) { 426 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1])); 427 } 428 } else if (">".equals(op)) { 429 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 430 } else if (">=".equals(op)) { 431 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0])); 432 } else if ("<".equals(op)) { 433 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0])); 434 } else if ("<=".equals(op)) { 435 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0])); 436 } else { 437 filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0])); 438 } 439 } 440 441 if (nbFilters == 0) { 442 return FilterBuilders.matchAllFilter(); 443 } 444 return filterBuilder; 445 } 446 447 public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 448 DocumentModel searchDocumentModel) { 449 SearchRequestBuilder builder = getSearchRequestBuilder(); 450 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 451 FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 452 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 453 return builder; 454 } 455 456 protected boolean isNonNullParam(Object[] val) { 457 if (val == null) { 458 return false; 459 } 460 for (Object v : val) { 461 if (v != null) { 462 if (v instanceof String) { 463 if (!((String) v).isEmpty()) { 464 return true; 465 } 466 } else if (v instanceof String[]) { 467 if (((String[]) v).length > 0) { 468 return true; 469 } 470 } else { 471 return true; 472 } 473 } 474 } 475 return false; 476 } 477 478 public String migrate(final int batchSize) { 479 480 final String MIGRATION_WORK_ID = "AuditMigration"; 481 482 WorkManager wm = Framework.getService(WorkManager.class); 483 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 484 if (migrationState != null) { 485 return "Migration already scheduled : " + migrationState.toString(); 486 } 487 488 Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize); 489 wm.schedule(migrationWork); 490 return "Migration work started : " + MIGRATION_WORK_ID; 491 } 492 493 protected void logSearchResponse(SearchResponse response) { 494 if (log.isDebugEnabled()) { 495 log.debug("Response: " + response.toString()); 496 } 497 } 498 499 protected void logSearchRequest(SearchRequestBuilder request) { 500 if (log.isDebugEnabled()) { 501 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 502 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 503 } 504 } 505 506 @Override 507 public void onApplicationStarted() { 508 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 509 if (!isMigrationDone()) { 510 log.info(String.format( 511 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 512 MIGRATION_FLAG_PROP)); 513 // Drop audit index first in case of a previous bad migration 514 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 515 esa.dropAndInitIndex(getESIndexName()); 516 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 517 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 518 if (batchSizeProp != null) { 519 batchSize = Integer.parseInt(batchSizeProp); 520 } 521 migrate(batchSize); 522 } else { 523 log.warn(String.format( 524 "Property %s is true but migration is already done, please set this property to false", 525 MIGRATION_FLAG_PROP)); 526 } 527 } else { 528 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 529 } 530 } 531 532 /** 533 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 534 */ 535 protected void ensureUIDSequencer(Client esClient) { 536 boolean auditIndexExists = esClient.admin().indices().prepareExists(getESIndexName()).execute().actionGet().isExists(); 537 if (!auditIndexExists) { 538 return; 539 } 540 541 // Get max log entry id 542 SearchRequestBuilder builder = getSearchRequestBuilder(); 543 builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id")); 544 SearchResponse searchResponse = builder.execute().actionGet(); 545 Max agg = searchResponse.getAggregations().get("maxAgg"); 546 int maxLogEntryId = (int) agg.getValue(); 547 548 // Get next sequence id 549 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 550 UIDSequencer seq = uidGeneratorService.getSequencer(); 551 int nextSequenceId = seq.getNext(SEQ_NAME); 552 553 // Increment sequence to max log entry id if needed 554 if (nextSequenceId < maxLogEntryId) { 555 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 556 nextSequenceId, maxLogEntryId)); 557 seq.initSequence(SEQ_NAME, maxLogEntryId); 558 } 559 } 560 561 @Override 562 public ExtendedInfo newExtendedInfo(Serializable value) { 563 return new ESExtendedInfo(value); 564 } 565 566 protected String getESIndexName() { 567 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 568 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 569 } 570}