001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.audit; 021 022import static org.elasticsearch.common.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION; 023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID; 025 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.Serializable; 029import java.time.ZonedDateTime; 030import java.util.ArrayList; 031import java.util.Calendar; 032import java.util.Collections; 033import java.util.Date; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.function.Function; 039 040import org.apache.commons.lang3.StringUtils; 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.elasticsearch.action.bulk.BulkItemResponse; 044import org.elasticsearch.action.bulk.BulkRequest; 045import org.elasticsearch.action.bulk.BulkResponse; 046import org.elasticsearch.action.get.GetRequest; 047import org.elasticsearch.action.get.GetResponse; 048import org.elasticsearch.action.index.IndexRequest; 049import org.elasticsearch.action.search.ClearScrollRequest; 050import org.elasticsearch.action.search.SearchRequest; 051import org.elasticsearch.action.search.SearchResponse; 052import org.elasticsearch.action.search.SearchScrollRequest; 053import org.elasticsearch.action.search.SearchType; 054import org.elasticsearch.common.ParsingException; 055import org.elasticsearch.common.io.stream.BytesStreamOutput; 056import org.elasticsearch.common.settings.Settings; 057import org.elasticsearch.common.unit.TimeValue; 058import org.elasticsearch.common.xcontent.NamedXContentRegistry; 059import org.elasticsearch.common.xcontent.XContentBuilder; 060import org.elasticsearch.common.xcontent.XContentFactory; 061import org.elasticsearch.common.xcontent.XContentParser; 062import org.elasticsearch.common.xcontent.XContentType; 063import org.elasticsearch.index.query.BoolQueryBuilder; 064import org.elasticsearch.index.query.QueryBuilder; 065import org.elasticsearch.index.query.QueryBuilders; 066import org.elasticsearch.search.SearchHit; 067import org.elasticsearch.search.SearchModule; 068import org.elasticsearch.search.aggregations.AggregationBuilders; 069import org.elasticsearch.search.aggregations.metrics.max.Max; 070import org.elasticsearch.search.builder.SearchSourceBuilder; 071import org.elasticsearch.search.sort.SortOrder; 072import org.json.JSONException; 073import org.json.JSONObject; 074import org.nuxeo.common.utils.TextTemplate; 075import org.nuxeo.ecm.core.api.CursorResult; 076import org.nuxeo.ecm.core.api.CursorService; 077import org.nuxeo.ecm.core.api.DocumentModel; 078import org.nuxeo.ecm.core.api.NuxeoException; 079import org.nuxeo.ecm.core.api.ScrollResult; 080import org.nuxeo.ecm.core.query.sql.model.Literals; 081import org.nuxeo.ecm.core.query.sql.model.MultiExpression; 082import org.nuxeo.ecm.core.query.sql.model.Operand; 083import org.nuxeo.ecm.core.query.sql.model.Operator; 084import org.nuxeo.ecm.core.query.sql.model.OrderByList; 085import org.nuxeo.ecm.core.query.sql.model.Predicate; 086import org.nuxeo.ecm.core.query.sql.model.Reference; 087import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 088import org.nuxeo.ecm.core.uidgen.UIDSequencer; 089import org.nuxeo.ecm.core.work.api.Work; 090import org.nuxeo.ecm.core.work.api.Work.State; 091import org.nuxeo.ecm.core.work.api.WorkManager; 092import org.nuxeo.ecm.platform.audit.api.AuditReader; 093import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 094import org.nuxeo.ecm.platform.audit.api.LogEntry; 095import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 096import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 097import org.nuxeo.ecm.platform.audit.service.AuditBackend; 098import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 099import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 100import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 101import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 102import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 103import org.nuxeo.elasticsearch.ElasticSearchConstants; 104import org.nuxeo.elasticsearch.api.ESClient; 105import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 106import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 107import org.nuxeo.runtime.api.Framework; 108import org.nuxeo.runtime.model.DefaultComponent; 109 110import com.fasterxml.jackson.core.JsonFactory; 111import com.fasterxml.jackson.core.JsonGenerator; 112import com.fasterxml.jackson.databind.ObjectMapper; 113 114/** 115 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 116 * 117 * @author tiry 118 */ 119public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 120 121 public static final String SEQ_NAME = "audit"; 122 123 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 124 125 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 126 127 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 128 129 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 130 131 protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService; 132 133 public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 134 super(component, config); 135 } 136 137 /** 138 * @since 9.3 139 */ 140 public ESAuditBackend() { 141 super(); 142 } 143 144 protected ESClient esClient; 145 146 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 147 148 protected BaseLogEntryProvider provider = new BaseLogEntryProvider() { 149 150 @Override 151 public int removeEntries(String eventId, String pathPattern) { 152 throw new UnsupportedOperationException("Not implemented yet!"); 153 } 154 155 @Override 156 public void addLogEntry(LogEntry logEntry) { 157 List<LogEntry> entries = new ArrayList<>(); 158 entries.add(logEntry); 159 addLogEntries(entries); 160 } 161 162 }; 163 164 protected ESClient getClient() { 165 log.info("Activate Elasticsearch backend for Audit"); 166 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 167 ESClient client = esa.getClient(); 168 ensureUIDSequencer(client); 169 return client; 170 } 171 172 protected boolean isMigrationDone() { 173 AuditReader reader = Framework.getService(AuditReader.class); 174 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 175 return !entries.isEmpty(); 176 } 177 178 @Override 179 public int getApplicationStartedOrder() { 180 int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent( 181 "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder(); 182 int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent( 183 "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder(); 184 return Integer.max(elasticOrder, uidgenOrder) + 1; 185 } 186 187 @Override 188 public void onApplicationStarted() { 189 esClient = getClient(); 190 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 191 if (!isMigrationDone()) { 192 log.info(String.format( 193 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 194 MIGRATION_FLAG_PROP)); 195 // Drop audit index first in case of a previous bad migration 196 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 197 esa.dropAndInitIndex(getESIndexName()); 198 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 199 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 200 if (batchSizeProp != null) { 201 batchSize = Integer.parseInt(batchSizeProp); 202 } 203 migrate(batchSize); 204 } else { 205 log.warn(String.format( 206 "Property %s is true but migration is already done, please set this property to false", 207 MIGRATION_FLAG_PROP)); 208 } 209 } else { 210 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 211 } 212 cursorService = new CursorService<>(SearchHit::getSourceAsString); 213 } 214 215 @Override 216 public void onApplicationStopped() { 217 if (esClient == null) { 218 return; 219 } 220 try { 221 esClient.close(); 222 cursorService.clear(); 223 } catch (Exception e) { 224 log.warn("Fail to close esClient: " + e.getMessage(), e); 225 } finally { 226 esClient = null; 227 cursorService = null; 228 } 229 } 230 231 @Override 232 @SuppressWarnings("unchecked") 233 public List<LogEntry> queryLogs(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder) { 234 // prepare parameters 235 MultiExpression predicate = builder.predicate(); 236 OrderByList orders = builder.orders(); 237 long offset = builder.offset(); 238 long limit = builder.limit(); 239 240 SearchSourceBuilder source = createSearchRequestSource(predicate, orders); 241 242 // Perform search 243 List<LogEntry> logEntries; 244 SearchRequest request = createSearchRequest(); 245 request.source(source); 246 if (limit == 0) { 247 // return all result -> use the scroll api 248 // offset is not taking into account when querying all results 249 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 250 request.scroll(keepAlive); 251 // the size here is the size of each scrolls 252 source.size(100); 253 254 // run request 255 SearchResponse searchResponse = runRequest(request); 256 257 // Build log entries 258 logEntries = buildLogEntries(searchResponse); 259 // Scroll on next results 260 for (; // 261 searchResponse.getHits().getHits().length > 0 262 && logEntries.size() < searchResponse.getHits().getTotalHits(); // 263 searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) { 264 // Build log entries 265 logEntries.addAll(buildLogEntries(searchResponse)); 266 } 267 } else { 268 // return a page -> use a regular search 269 source.from((int) offset).size((int) limit); 270 271 // run request 272 SearchResponse searchResponse = runRequest(request); 273 274 // Build log entries 275 logEntries = buildLogEntries(searchResponse); 276 } 277 278 return logEntries; 279 } 280 281 protected SearchSourceBuilder createSearchRequestSource(MultiExpression predicate, OrderByList orders) { 282 // create ES query builder 283 QueryBuilder query = createQueryBuilder(predicate); 284 285 // create ES source 286 SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100); 287 288 // create sort 289 orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC)); 290 return source; 291 } 292 293 protected QueryBuilder createQueryBuilder(MultiExpression andPredicate) { 294 // cast parameters 295 // current implementation only support a MultiExpression with AND operator 296 List<Predicate> predicates = andPredicate.predicates; 297 // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right 298 Function<Operand, String> getFieldName = operand -> ((Reference) operand).name; 299 300 QueryBuilder query; 301 if (predicates.isEmpty()) { 302 query = QueryBuilders.matchAllQuery(); 303 } else { 304 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); 305 for (Predicate predicate : predicates) { 306 String leftName = getFieldName.apply(predicate.lvalue); 307 Operator operator = predicate.operator; 308 Object rightValue = Literals.valueOf(predicate.rvalue); 309 if (rightValue instanceof ZonedDateTime) { 310 // The ZonedDateTime representation is not compatible with Elasticsearch query 311 rightValue = ((ZonedDateTime) rightValue).toOffsetDateTime(); 312 } 313 if (Operator.EQ.equals(operator)) { 314 boolQuery.must(QueryBuilders.termQuery(leftName, rightValue)); 315 } else if (Operator.NOTEQ.equals(operator)) { 316 boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue)); 317 } else if (Operator.LT.equals(operator)) { 318 boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue)); 319 } else if (Operator.LTEQ.equals(operator)) { 320 boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue)); 321 } else if (Operator.GTEQ.equals(operator)) { 322 boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue)); 323 } else if (Operator.GT.equals(operator)) { 324 boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue)); 325 } else if (Operator.IN.equals(operator)) { 326 boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue)); 327 } else if (Operator.STARTSWITH.equals(operator)) { 328 boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue)); 329 } 330 } 331 query = boolQuery; 332 } 333 return query; 334 } 335 336 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 337 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 338 ObjectMapper mapper = new ObjectMapper(); 339 for (SearchHit hit : searchResponse.getHits()) { 340 try { 341 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 342 } catch (IOException e) { 343 log.error("Error while reading Audit Entry from ES", e); 344 } 345 } 346 return entries; 347 } 348 349 protected SearchRequest createSearchRequest() { 350 return new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH); 351 } 352 353 @Override 354 public LogEntry getLogEntryByID(long id) { 355 GetResponse ret = esClient.get( 356 new GetRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, String.valueOf(id))); 357 if (!ret.isExists()) { 358 return null; 359 } 360 try { 361 return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class); 362 } catch (IOException e) { 363 throw new NuxeoException("Unable to read Entry for id " + id, e); 364 } 365 } 366 367 public SearchRequest buildQuery(String query, Map<String, Object> params) { 368 if (params != null && params.size() > 0) { 369 query = expandQueryVariables(query, params); 370 } 371 SearchRequest request = createSearchRequest(); 372 SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query); 373 return request.source(sourceBuilder); 374 } 375 376 protected SearchSourceBuilder createSearchSourceBuilder(String query) { 377 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 378 SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); 379 try { 380 try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( 381 new NamedXContentRegistry(searchModule.getNamedXContents()), THROW_UNSUPPORTED_OPERATION, query)) { 382 searchSourceBuilder.parseXContent(parser); 383 } 384 } catch (IOException | ParsingException e) { 385 log.error("Invalid query: " + query + ": " + e.getMessage(), e); 386 throw new IllegalArgumentException("Bad query: " + query); 387 } 388 return searchSourceBuilder; 389 } 390 391 public String expandQueryVariables(String query, Object[] params) { 392 Map<String, Object> qParams = new HashMap<>(); 393 for (int i = 0; i < params.length; i++) { 394 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 395 qParams.put("param" + i, params[i]); 396 } 397 return expandQueryVariables(query, qParams); 398 } 399 400 public String expandQueryVariables(String query, Map<String, Object> params) { 401 if (params != null && params.size() > 0) { 402 TextTemplate tmpl = new TextTemplate(); 403 for (String key : params.keySet()) { 404 Object val = params.get(key); 405 if (val == null) { 406 continue; 407 } else if (val instanceof Calendar) { 408 tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime())); 409 } else if (val instanceof Date) { 410 tmpl.setVariable(key, Long.toString(((Date) val).getTime())); 411 } else { 412 tmpl.setVariable(key, val.toString()); 413 } 414 } 415 query = tmpl.processText(query); 416 } 417 return query; 418 } 419 420 @Override 421 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 422 SearchRequest request = buildQuery(query, params); 423 if (pageNb > 0) { 424 request.source().from(pageNb * pageSize); 425 } 426 if (pageSize > 0) { 427 request.source().size(pageSize); 428 } 429 SearchResponse searchResponse = runRequest(request); 430 return buildLogEntries(searchResponse); 431 } 432 433 @Override 434 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 435 int pageSize) { 436 SearchRequest request = createSearchRequest(); 437 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 438 if (eventIds != null && eventIds.length > 0) { 439 if (eventIds.length == 1) { 440 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 441 } else { 442 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 443 } 444 } 445 if (categories != null && categories.length > 0) { 446 if (categories.length == 1) { 447 filterBuilder.must(QueryBuilders.termQuery("category", categories[0])); 448 } else { 449 filterBuilder.must(QueryBuilders.termsQuery("category", categories)); 450 } 451 } 452 if (path != null) { 453 filterBuilder.must(QueryBuilders.termQuery("docPath", path)); 454 } 455 456 if (limit != null) { 457 filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit))); 458 } 459 request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder))); 460 if (pageNb > 0) { 461 request.source().from(pageNb * pageSize); 462 } 463 if (pageSize > 0) { 464 request.source().size(pageSize); 465 } 466 SearchResponse searchResponse = runRequest(request); 467 return buildLogEntries(searchResponse); 468 } 469 470 @Override 471 public void addLogEntries(List<LogEntry> entries) { 472 473 if (entries.isEmpty()) { 474 return; 475 } 476 477 BulkRequest bulkRequest = new BulkRequest(); 478 JsonFactory factory = new JsonFactory(); 479 480 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 481 UIDSequencer seq = uidGeneratorService.getSequencer(); 482 483 try { 484 List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size()); 485 for (int i = 0; i < entries.size(); i++) { 486 LogEntry entry = entries.get(i); 487 entry.setId(block.get(i)); 488 if (log.isDebugEnabled()) { 489 log.debug(String.format("Indexing log entry: %s", entry)); 490 } 491 entry.setLogDate(new Date()); 492 try (OutputStream out = new BytesStreamOutput(); // 493 JsonGenerator jg = factory.createGenerator(out); // 494 XContentBuilder builder = jsonBuilder(out)) { 495 ObjectMapper mapper = new ObjectMapper(); 496 mapper.writeValue(jg, entry); 497 bulkRequest.add(new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, 498 String.valueOf(entry.getId())).source(builder)); 499 } 500 } 501 502 BulkResponse bulkResponse = esClient.bulk(bulkRequest); 503 if (bulkResponse.hasFailures()) { 504 for (BulkItemResponse response : bulkResponse.getItems()) { 505 if (response.isFailed()) { 506 log.error("Unable to index audit entry " + response.getItemId() + " :" 507 + response.getFailureMessage()); 508 } 509 } 510 } 511 } catch (IOException e) { 512 throw new NuxeoException("Error while indexing Audit entries", e); 513 } 514 515 } 516 517 @Override 518 public Long getEventsCount(String eventId) { 519 SearchResponse res = esClient.search( 520 new SearchRequest(getESIndexName()).source( 521 new SearchSourceBuilder().query( 522 QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId))) 523 .size(0))); 524 return res.getHits().getTotalHits(); 525 } 526 527 @Override 528 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 529 return syncLogCreationEntries(provider, repoId, path, recurs); 530 } 531 532 public SearchResponse search(SearchRequest request) { 533 String[] indices = request.indices(); 534 if (indices == null || indices.length != 1) { 535 throw new IllegalStateException("Search on audit must include index name: " + request); 536 } 537 if (!getESIndexName().equals(indices[0])) { 538 throw new IllegalStateException("Search on audit must be on audit index: " + request); 539 } 540 return runRequest(request); 541 } 542 543 protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 544 545 if (searchDocumentModel == null) { 546 return QueryBuilders.matchAllQuery(); 547 } 548 549 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 550 551 int nbFilters = 0; 552 553 for (PredicateDefinition predicate : predicates) { 554 555 // extract data from DocumentModel 556 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 557 Object[] val = new Object[fieldDef.length]; 558 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 559 if (fieldDef[fidx].getXpath() != null) { 560 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 561 } else { 562 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 563 } 564 } 565 566 if (!isNonNullParam(val)) { 567 // skip predicate where all values are null 568 continue; 569 } 570 571 nbFilters++; 572 573 String op = predicate.getOperator(); 574 if (op.equalsIgnoreCase("IN")) { 575 576 String[] values = null; 577 if (val[0] instanceof Iterable<?>) { 578 List<String> l = new ArrayList<>(); 579 Iterable<?> vals = (Iterable<?>) val[0]; 580 581 for (Object v : vals) { 582 if (v != null) { 583 l.add(v.toString()); 584 } 585 } 586 values = l.toArray(new String[l.size()]); 587 } else if (val[0] instanceof Object[]) { 588 values = (String[]) val[0]; 589 } 590 filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values)); 591 } else if (op.equalsIgnoreCase("BETWEEN")) { 592 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0]))); 593 if (val.length > 1) { 594 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1]))); 595 } 596 } else if (">".equals(op)) { 597 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0]))); 598 } else if (">=".equals(op)) { 599 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0]))); 600 } else if ("<".equals(op)) { 601 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0]))); 602 } else if ("<=".equals(op)) { 603 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0]))); 604 } else { 605 filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0]))); 606 } 607 } 608 609 if (nbFilters == 0) { 610 return QueryBuilders.matchAllQuery(); 611 } 612 return filterBuilder; 613 } 614 615 protected Object convertDate(Object o) { 616 // Date are convert to timestamp ms which is a known format by default for ES 617 if (o instanceof Calendar) { 618 return Long.valueOf(((Calendar) o).getTime().getTime()); 619 } else if (o instanceof Date) { 620 return Long.valueOf(((Date) o).getTime()); 621 } 622 return o; 623 } 624 625 public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 626 DocumentModel searchDocumentModel) { 627 SearchRequest request = createSearchRequest(); 628 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 629 QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 630 request.source( 631 new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder))); 632 return request; 633 } 634 635 protected boolean isNonNullParam(Object[] val) { 636 if (val == null) { 637 return false; 638 } 639 for (Object v : val) { 640 if (v != null) { 641 if (v instanceof String) { 642 if (!((String) v).isEmpty()) { 643 return true; 644 } 645 } else if (v instanceof String[]) { 646 if (((String[]) v).length > 0) { 647 return true; 648 } 649 } else { 650 return true; 651 } 652 } 653 } 654 return false; 655 } 656 657 @SuppressWarnings("deprecation") 658 public String migrate(final int batchSize) { 659 660 final String MIGRATION_WORK_ID = "AuditMigration"; 661 662 WorkManager wm = Framework.getService(WorkManager.class); 663 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 664 if (migrationState != null) { 665 return "Migration already scheduled : " + migrationState.toString(); 666 } 667 668 Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize); 669 wm.schedule(migrationWork); 670 return "Migration work started : " + MIGRATION_WORK_ID; 671 } 672 673 SearchResponse runRequest(SearchRequest request) { 674 logSearchRequest(request); 675 SearchResponse response = esClient.search(request); 676 logSearchResponse(response); 677 return response; 678 } 679 680 SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) { 681 if (log.isDebugEnabled()) { 682 log.debug(String.format( 683 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 684 keepAlive, scrollId)); 685 } 686 SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive)); 687 logSearchResponse(response); 688 return response; 689 } 690 691 protected void logSearchResponse(SearchResponse response) { 692 if (log.isDebugEnabled()) { 693 log.debug("Response: " + response.toString()); 694 } 695 } 696 697 protected void logSearchRequest(SearchRequest request) { 698 if (log.isDebugEnabled()) { 699 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/%s/_search?pretty' -d '%s'", 700 getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, request.toString())); 701 } 702 } 703 704 /** 705 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 706 */ 707 protected void ensureUIDSequencer(ESClient esClient) { 708 boolean auditIndexExists = esClient.indexExists(getESIndexName()); 709 if (!auditIndexExists) { 710 return; 711 } 712 713 // Get max log entry id 714 SearchRequest request = createSearchRequest(); 715 request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) 716 .aggregation(AggregationBuilders.max("maxAgg").field("id"))); 717 SearchResponse searchResponse = esClient.search(request); 718 Max agg = searchResponse.getAggregations().get("maxAgg"); 719 long maxLogEntryId = (long) agg.getValue(); 720 721 // Get next sequence id 722 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 723 UIDSequencer seq = uidGeneratorService.getSequencer(); 724 seq.init(); 725 long nextSequenceId = seq.getNextLong(SEQ_NAME); 726 727 // Increment sequence to max log entry id if needed 728 if (nextSequenceId < maxLogEntryId) { 729 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 730 nextSequenceId, maxLogEntryId)); 731 seq.initSequence(SEQ_NAME, maxLogEntryId); 732 } 733 } 734 735 @Override 736 public ExtendedInfo newExtendedInfo(Serializable value) { 737 return new ESExtendedInfo(value); 738 } 739 740 protected String getESIndexName() { 741 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 742 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 743 } 744 745 @Override 746 public void append(List<String> jsonEntries) { 747 BulkRequest bulkRequest = new BulkRequest(); 748 for (String json : jsonEntries) { 749 try { 750 String entryId = new JSONObject(json).getString(LOG_ID); 751 if (StringUtils.isBlank(entryId)) { 752 throw new NuxeoException("A json entry has an empty id. entry=" + json); 753 } 754 IndexRequest request = new IndexRequest(getESIndexName(), ElasticSearchConstants.ENTRY_TYPE, entryId); 755 request.source(json, XContentType.JSON); 756 bulkRequest.add(request); 757 } catch (JSONException e) { 758 throw new NuxeoException("Unable to deserialize json entry=" + json, e); 759 } 760 } 761 esClient.bulk(bulkRequest); 762 } 763 764 @Override 765 public ScrollResult<String> scroll(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder, int batchSize, 766 int keepAliveSeconds) { 767 // prepare parameters 768 MultiExpression predicate = builder.predicate(); 769 OrderByList orders = builder.orders(); 770 771 // create source 772 SearchSourceBuilder source = createSearchRequestSource(predicate, orders); 773 source.size(batchSize); 774 // create request 775 SearchRequest request = createSearchRequest(); 776 request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds)); 777 SearchResponse response = runRequest(request); 778 // register cursor 779 String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds)); 780 return scroll(scrollId); 781 } 782 783 @Override 784 public ScrollResult<String> scroll(String scrollId) { 785 return cursorService.scroll(scrollId); 786 } 787 788 public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> { 789 790 protected final String scrollId; 791 792 protected boolean end; 793 794 public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) { 795 super(response.getHits().iterator(), batchSize, keepAliveSeconds); 796 this.scrollId = response.getScrollId(); 797 } 798 799 @Override 800 public boolean hasNext() { 801 if (cursor == null || end) { 802 return false; 803 } else if (cursor.hasNext()) { 804 return true; 805 } else { 806 runNextScroll(); 807 return !end; 808 } 809 } 810 811 @Override 812 public SearchHit next() { 813 if (cursor != null && !cursor.hasNext() && !end) { 814 // try to run a next scroll 815 runNextScroll(); 816 } 817 return super.next(); 818 } 819 820 protected void runNextScroll() { 821 SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId, 822 TimeValue.timeValueSeconds(keepAliveSeconds)); 823 cursor = response.getHits().iterator(); 824 end = !cursor.hasNext(); 825 } 826 827 @Override 828 public void close() { 829 ClearScrollRequest request = new ClearScrollRequest(); 830 request.addScrollId(scrollId); 831 esClient.clearScroll(request); 832 end = true; 833 // Call super close to clear cursor 834 super.close(); 835 } 836 837 } 838 839}