001/* 002 * (C) Copyright 2014-2015 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl-2.1.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Tiry 016 * Benoit Delbosc 017 */ 018package org.nuxeo.elasticsearch.audit; 019 020import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 021 022import java.io.IOException; 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.Calendar; 026import java.util.Date; 027import java.util.HashMap; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.commons.collections.MapUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.codehaus.jackson.JsonFactory; 036import org.codehaus.jackson.JsonGenerator; 037import org.elasticsearch.action.bulk.BulkItemResponse; 038import org.elasticsearch.action.bulk.BulkRequestBuilder; 039import org.elasticsearch.action.bulk.BulkResponse; 040import org.elasticsearch.action.count.CountResponse; 041import org.elasticsearch.action.get.GetResponse; 042import org.elasticsearch.action.search.SearchRequestBuilder; 043import org.elasticsearch.action.search.SearchResponse; 044import org.elasticsearch.action.search.SearchType; 045import org.elasticsearch.client.Client; 046import org.elasticsearch.common.xcontent.XContentBuilder; 047import org.elasticsearch.index.query.BoolFilterBuilder; 048import org.elasticsearch.index.query.FilterBuilder; 049import org.elasticsearch.index.query.FilterBuilders; 050import org.elasticsearch.index.query.QueryBuilder; 051import org.elasticsearch.index.query.QueryBuilders; 052import org.elasticsearch.index.query.TermFilterBuilder; 053import org.elasticsearch.search.SearchHit; 054import org.elasticsearch.search.aggregations.AggregationBuilders; 055import org.elasticsearch.search.aggregations.metrics.max.Max; 056import org.elasticsearch.search.sort.SortOrder; 057import org.joda.time.DateTime; 058import org.joda.time.format.ISODateTimeFormat; 059import org.nuxeo.common.utils.TextTemplate; 060import org.nuxeo.ecm.core.api.DocumentModel; 061import org.nuxeo.ecm.core.api.NuxeoException; 062import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 063import org.nuxeo.ecm.core.uidgen.UIDSequencer; 064import org.nuxeo.ecm.core.work.api.Work; 065import org.nuxeo.ecm.core.work.api.Work.State; 066import org.nuxeo.ecm.core.work.api.WorkManager; 067import org.nuxeo.ecm.platform.audit.api.AuditReader; 068import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 069import org.nuxeo.ecm.platform.audit.api.FilterMapEntry; 070import org.nuxeo.ecm.platform.audit.api.LogEntry; 071import org.nuxeo.ecm.platform.audit.api.query.AuditQueryException; 072import org.nuxeo.ecm.platform.audit.api.query.DateRangeParser; 073import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 074import org.nuxeo.ecm.platform.audit.service.AuditBackend; 075import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 076import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 077import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 078import org.nuxeo.elasticsearch.ElasticSearchConstants; 079import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 080import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONReader; 081import org.nuxeo.elasticsearch.audit.io.AuditEntryJSONWriter; 082import org.nuxeo.runtime.api.Framework; 083 084/** 085 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 086 * 087 * @author tiry 088 */ 089public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 090 091 public static final String SEQ_NAME = "audit"; 092 093 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 094 095 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 096 097 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 098 099 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 100 101 protected Client esClient = null; 102 103 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 104 105 protected BaseLogEntryProvider provider = null; 106 107 protected Client getClient() { 108 if (esClient == null) { 109 log.info("Activate Elasticsearch backend for Audit"); 110 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 111 esClient = esa.getClient(); 112 ensureUIDSequencer(esClient); 113 } 114 return esClient; 115 } 116 117 protected boolean isMigrationDone() { 118 AuditReader reader = Framework.getService(AuditReader.class); 119 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 120 return !entries.isEmpty(); 121 } 122 123 @Override 124 public void deactivate() { 125 if (esClient != null) { 126 esClient.close(); 127 } 128 } 129 130 @Override 131 public List<LogEntry> getLogEntriesFor(String uuid, Map<String, FilterMapEntry> filterMap, boolean doDefaultSort) { 132 SearchRequestBuilder builder = getSearchRequestBuilder(); 133 TermFilterBuilder docFilter = FilterBuilders.termFilter("docUUID", uuid); 134 FilterBuilder filter; 135 if (MapUtils.isEmpty(filterMap)) { 136 filter = docFilter; 137 } else { 138 filter = FilterBuilders.boolFilter(); 139 ((BoolFilterBuilder) filter).must(docFilter); 140 for (String key : filterMap.keySet()) { 141 FilterMapEntry entry = filterMap.get(key); 142 ((BoolFilterBuilder) filter).must(FilterBuilders.termFilter(entry.getColumnName(), entry.getObject())); 143 } 144 } 145 builder.setQuery(QueryBuilders.constantScoreQuery(filter)).setSize(Integer.MAX_VALUE); 146 if (doDefaultSort) { 147 builder.addSort("eventDate", SortOrder.DESC); 148 } 149 logSearchRequest(builder); 150 SearchResponse searchResponse = builder.get(); 151 logSearchResponse(searchResponse); 152 return buildLogEntries(searchResponse); 153 } 154 155 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 156 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 157 for (SearchHit hit : searchResponse.getHits()) { 158 try { 159 entries.add(AuditEntryJSONReader.read(hit.getSourceAsString())); 160 } catch (IOException e) { 161 log.error("Error while reading Audit Entry from ES", e); 162 } 163 } 164 return entries; 165 } 166 167 protected SearchRequestBuilder getSearchRequestBuilder() { 168 return getClient().prepareSearch(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setSearchType( 169 SearchType.DFS_QUERY_THEN_FETCH); 170 } 171 172 @Override 173 public LogEntry getLogEntryByID(long id) { 174 GetResponse ret = getClient().prepareGet(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 175 String.valueOf(id)).get(); 176 if (!ret.isExists()) { 177 return null; 178 } 179 try { 180 return AuditEntryJSONReader.read(ret.getSourceAsString()); 181 } catch (IOException e) { 182 throw new RuntimeException("Unable to read Entry for id " + id, e); 183 } 184 } 185 186 public SearchRequestBuilder buildQuery(String query, Map<String, Object> params) { 187 if (params != null && params.size() > 0) { 188 query = expandQueryVariables(query, params); 189 } 190 SearchRequestBuilder builder = getSearchRequestBuilder(); 191 builder.setQuery(query); 192 return builder; 193 } 194 195 public String expandQueryVariables(String query, Object[] params) { 196 Map<String, Object> qParams = new HashMap<>(); 197 for (int i = 0; i < params.length; i++) { 198 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 199 qParams.put("param" + i, params[i]); 200 } 201 return expandQueryVariables(query, qParams); 202 } 203 204 public String expandQueryVariables(String query, Map<String, Object> params) { 205 if (params != null && params.size() > 0) { 206 TextTemplate tmpl = new TextTemplate(); 207 for (String key : params.keySet()) { 208 Object val = params.get(key); 209 if (val == null) { 210 continue; 211 } else if (val instanceof Calendar) { 212 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 213 } else if (val instanceof Date) { 214 tmpl.setVariable(key, ISODateTimeFormat.dateTime().print(new DateTime(val))); 215 } else { 216 tmpl.setVariable(key, val.toString()); 217 } 218 } 219 query = tmpl.processText(query); 220 } 221 return query; 222 } 223 224 @Override 225 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 226 SearchRequestBuilder builder = buildQuery(query, params); 227 if (pageNb > 0) { 228 builder.setFrom(pageNb * pageSize); 229 } 230 if (pageSize > 0) { 231 builder.setSize(pageSize); 232 } 233 logSearchRequest(builder); 234 SearchResponse searchResponse = builder.get(); 235 logSearchResponse(searchResponse); 236 return buildLogEntries(searchResponse); 237 } 238 239 @Override 240 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 241 int pageSize) { 242 SearchRequestBuilder builder = getSearchRequestBuilder(); 243 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 244 if (eventIds != null && eventIds.length > 0) { 245 if (eventIds.length == 1) { 246 filterBuilder.must(FilterBuilders.termFilter("eventId", eventIds[0])); 247 } else { 248 filterBuilder.must(FilterBuilders.termsFilter("eventId", eventIds)); 249 } 250 } 251 if (categories != null && categories.length > 0) { 252 if (categories.length == 1) { 253 filterBuilder.must(FilterBuilders.termFilter("category", categories[0])); 254 } else { 255 filterBuilder.must(FilterBuilders.termsFilter("category", categories)); 256 } 257 } 258 if (path != null) { 259 filterBuilder.must(FilterBuilders.termFilter("docPath", path)); 260 } 261 262 if (limit != null) { 263 filterBuilder.must(FilterBuilders.rangeFilter("eventDate").lt(limit)); 264 } 265 266 builder.setQuery(QueryBuilders.constantScoreQuery(filterBuilder)); 267 268 if (pageNb > 0) { 269 builder.setFrom(pageNb * pageSize); 270 } 271 if (pageSize > 0) { 272 builder.setSize(pageSize); 273 } else { 274 builder.setSize(Integer.MAX_VALUE); 275 } 276 logSearchRequest(builder); 277 SearchResponse searchResponse = builder.get(); 278 logSearchResponse(searchResponse); 279 return buildLogEntries(searchResponse); 280 } 281 282 @Override 283 public List<LogEntry> queryLogsByPage(String[] eventIds, String dateRange, String[] categories, String path, 284 int pageNb, int pageSize) { 285 286 Date limit = null; 287 if (dateRange != null) { 288 try { 289 limit = DateRangeParser.parseDateRangeQuery(new Date(), dateRange); 290 } catch (AuditQueryException aqe) { 291 aqe.addInfo("Wrong date range query. Query was " + dateRange); 292 throw aqe; 293 } 294 } 295 return queryLogsByPage(eventIds, limit, categories, path, pageNb, pageSize); 296 } 297 298 @Override 299 public void addLogEntries(List<LogEntry> entries) { 300 301 BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 302 JsonFactory factory = new JsonFactory(); 303 304 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 305 UIDSequencer seq = uidGeneratorService.getSequencer(); 306 307 try { 308 309 for (LogEntry entry : entries) { 310 entry.setId(seq.getNext(SEQ_NAME)); 311 if (log.isDebugEnabled()) { 312 log.debug(String.format("Indexing log enry Id: %s, with logDate : %s, for docUUID: %s ", 313 entry.getId(), entry.getLogDate(), entry.getDocUUID())); 314 } 315 XContentBuilder builder = jsonBuilder(); 316 JsonGenerator jsonGen = factory.createJsonGenerator(builder.stream()); 317 AuditEntryJSONWriter.asJSON(jsonGen, entry); 318 bulkRequest.add(getClient().prepareIndex(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 319 String.valueOf(entry.getId())).setSource(builder)); 320 } 321 322 BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 323 if (bulkResponse.hasFailures()) { 324 for (BulkItemResponse response : bulkResponse.getItems()) { 325 if (response.isFailed()) { 326 log.error("Unable to index audit entry " + response.getItemId() + " :" 327 + response.getFailureMessage()); 328 } 329 } 330 } 331 } catch (IOException e) { 332 throw new NuxeoException("Error while indexing Audit entries", e); 333 } 334 335 } 336 337 @Override 338 public Long getEventsCount(String eventId) { 339 CountResponse res = getClient().prepareCount(getESIndexName()).setTypes(ElasticSearchConstants.ENTRY_TYPE).setQuery( 340 QueryBuilders.constantScoreQuery(FilterBuilders.termFilter("eventId", eventId))).get(); 341 return res.getCount(); 342 } 343 344 protected BaseLogEntryProvider getProvider() { 345 346 if (provider == null) { 347 provider = new BaseLogEntryProvider() { 348 349 @Override 350 public int removeEntries(String eventId, String pathPattern) { 351 throw new UnsupportedOperationException("Not implemented yet!"); 352 } 353 354 @Override 355 public void addLogEntry(LogEntry logEntry) { 356 List<LogEntry> entries = new ArrayList<>(); 357 entries.add(logEntry); 358 addLogEntries(entries); 359 } 360 }; 361 } 362 return provider; 363 } 364 365 @Override 366 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 367 return syncLogCreationEntries(getProvider(), repoId, path, recurs); 368 } 369 370 protected FilterBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 371 372 if (searchDocumentModel == null) { 373 return FilterBuilders.matchAllFilter(); 374 } 375 376 BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter(); 377 378 int nbFilters = 0; 379 380 for (PredicateDefinition predicate : predicates) { 381 382 // extract data from DocumentModel 383 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 384 Object[] val = new Object[fieldDef.length]; 385 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 386 if (fieldDef[fidx].getXpath() != null) { 387 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 388 } else { 389 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 390 } 391 } 392 393 if (!isNonNullParam(val)) { 394 // skip predicate where all values are null 395 continue; 396 } 397 398 nbFilters++; 399 400 String op = predicate.getOperator(); 401 if (op.equalsIgnoreCase("IN")) { 402 403 String[] values = null; 404 if (val[0] instanceof Iterable<?>) { 405 List<String> l = new ArrayList<>(); 406 Iterable<?> vals = (Iterable<?>) val[0]; 407 Iterator<?> valueIterator = vals.iterator(); 408 409 while (valueIterator.hasNext()) { 410 411 Object v = valueIterator.next(); 412 if (v != null) { 413 l.add(v.toString()); 414 } 415 } 416 values = l.toArray(new String[l.size()]); 417 } else if (val[0] instanceof Object[]) { 418 values = (String[]) val[0]; 419 } 420 filterBuilder.must(FilterBuilders.termsFilter(predicate.getParameter(), values)); 421 } else if (op.equalsIgnoreCase("BETWEEN")) { 422 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 423 if (val.length > 1) { 424 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[1])); 425 } 426 } else if (">".equals(op)) { 427 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gt(val[0])); 428 } else if (">=".equals(op)) { 429 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).gte(val[0])); 430 } else if ("<".equals(op)) { 431 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lt(val[0])); 432 } else if ("<=".equals(op)) { 433 filterBuilder.must(FilterBuilders.rangeFilter(predicate.getParameter()).lte(val[0])); 434 } else { 435 filterBuilder.must(FilterBuilders.termFilter(predicate.getParameter(), val[0])); 436 } 437 } 438 439 if (nbFilters == 0) { 440 return FilterBuilders.matchAllFilter(); 441 } 442 return filterBuilder; 443 } 444 445 public SearchRequestBuilder buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 446 DocumentModel searchDocumentModel) { 447 SearchRequestBuilder builder = getSearchRequestBuilder(); 448 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 449 FilterBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 450 builder.setQuery(QueryBuilders.filteredQuery(queryBuilder, filterBuilder)); 451 return builder; 452 } 453 454 protected boolean isNonNullParam(Object[] val) { 455 if (val == null) { 456 return false; 457 } 458 for (Object v : val) { 459 if (v != null) { 460 if (v instanceof String) { 461 if (!((String) v).isEmpty()) { 462 return true; 463 } 464 } else if (v instanceof String[]) { 465 if (((String[]) v).length > 0) { 466 return true; 467 } 468 } else { 469 return true; 470 } 471 } 472 } 473 return false; 474 } 475 476 public String migrate(final int batchSize) { 477 478 final String MIGRATION_WORK_ID = "AuditMigration"; 479 480 WorkManager wm = Framework.getService(WorkManager.class); 481 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 482 if (migrationState != null) { 483 return "Migration already scheduled : " + migrationState.toString(); 484 } 485 486 Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize); 487 wm.schedule(migrationWork); 488 return "Migration work started : " + MIGRATION_WORK_ID; 489 } 490 491 protected void logSearchResponse(SearchResponse response) { 492 if (log.isDebugEnabled()) { 493 log.debug("Response: " + response.toString()); 494 } 495 } 496 497 protected void logSearchRequest(SearchRequestBuilder request) { 498 if (log.isDebugEnabled()) { 499 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 500 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 501 } 502 } 503 504 @Override 505 public void onApplicationStarted() { 506 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 507 if (!isMigrationDone()) { 508 log.info(String.format( 509 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 510 MIGRATION_FLAG_PROP)); 511 // Drop audit index first in case of a previous bad migration 512 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 513 esa.dropAndInitIndex(getESIndexName()); 514 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 515 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 516 if (batchSizeProp != null) { 517 batchSize = Integer.parseInt(batchSizeProp); 518 } 519 migrate(batchSize); 520 } else { 521 log.warn(String.format( 522 "Property %s is true but migration is already done, please set this property to false", 523 MIGRATION_FLAG_PROP)); 524 } 525 } else { 526 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 527 } 528 } 529 530 /** 531 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 532 */ 533 protected void ensureUIDSequencer(Client esClient) { 534 boolean auditIndexExists = esClient.admin().indices().prepareExists(getESIndexName()).execute().actionGet().isExists(); 535 if (!auditIndexExists) { 536 return; 537 } 538 539 // Get max log entry id 540 SearchRequestBuilder builder = getSearchRequestBuilder(); 541 builder.setQuery(QueryBuilders.matchAllQuery()).addAggregation(AggregationBuilders.max("maxAgg").field("id")); 542 SearchResponse searchResponse = builder.execute().actionGet(); 543 Max agg = searchResponse.getAggregations().get("maxAgg"); 544 int maxLogEntryId = (int) agg.getValue(); 545 546 // Get next sequence id 547 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 548 UIDSequencer seq = uidGeneratorService.getSequencer(); 549 int nextSequenceId = seq.getNext(SEQ_NAME); 550 551 // Increment sequence to max log entry id if needed 552 if (nextSequenceId < maxLogEntryId) { 553 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 554 nextSequenceId, maxLogEntryId)); 555 seq.initSequence(SEQ_NAME, maxLogEntryId); 556 } 557 } 558 559 @Override 560 public ExtendedInfo newExtendedInfo(Serializable value) { 561 return new ESExtendedInfo(value); 562 } 563 564 protected String getESIndexName() { 565 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 566 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 567 } 568}