001/* 002 * (C) Copyright 2014-2016 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Tiry 018 * Benoit Delbosc 019 */ 020package org.nuxeo.elasticsearch.audit; 021 022import static org.elasticsearch.common.xcontent.DeprecationHandler.THROW_UNSUPPORTED_OPERATION; 023import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 024import static org.nuxeo.ecm.platform.audit.api.BuiltinLogEntryData.LOG_ID; 025 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.Serializable; 029import java.time.ZonedDateTime; 030import java.util.ArrayList; 031import java.util.Calendar; 032import java.util.Collections; 033import java.util.Date; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.function.Function; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.elasticsearch.action.bulk.BulkItemResponse; 043import org.elasticsearch.action.bulk.BulkRequest; 044import org.elasticsearch.action.bulk.BulkResponse; 045import org.elasticsearch.action.get.GetRequest; 046import org.elasticsearch.action.get.GetResponse; 047import org.elasticsearch.action.index.IndexRequest; 048import org.elasticsearch.action.search.ClearScrollRequest; 049import org.elasticsearch.action.search.SearchRequest; 050import org.elasticsearch.action.search.SearchResponse; 051import org.elasticsearch.action.search.SearchScrollRequest; 052import org.elasticsearch.action.search.SearchType; 053import org.elasticsearch.common.ParsingException; 054import org.elasticsearch.common.io.stream.BytesStreamOutput; 055import org.elasticsearch.common.settings.Settings; 056import org.elasticsearch.common.unit.TimeValue; 057import org.elasticsearch.common.xcontent.NamedXContentRegistry; 058import org.elasticsearch.common.xcontent.XContentBuilder; 059import org.elasticsearch.common.xcontent.XContentFactory; 060import org.elasticsearch.common.xcontent.XContentParser; 061import org.elasticsearch.common.xcontent.XContentType; 062import org.elasticsearch.index.query.BoolQueryBuilder; 063import org.elasticsearch.index.query.QueryBuilder; 064import org.elasticsearch.index.query.QueryBuilders; 065import org.elasticsearch.search.SearchHit; 066import org.elasticsearch.search.SearchModule; 067import org.elasticsearch.search.aggregations.Aggregation; 068import org.elasticsearch.search.aggregations.AggregationBuilders; 069import org.elasticsearch.search.builder.SearchSourceBuilder; 070import org.elasticsearch.search.sort.SortOrder; 071import org.json.JSONException; 072import org.json.JSONObject; 073import org.nuxeo.common.utils.TextTemplate; 074import org.nuxeo.ecm.core.api.CursorResult; 075import org.nuxeo.ecm.core.api.CursorService; 076import org.nuxeo.ecm.core.api.DocumentModel; 077import org.nuxeo.ecm.core.api.NuxeoException; 078import org.nuxeo.ecm.core.api.ScrollResult; 079import org.nuxeo.ecm.core.query.sql.model.Literals; 080import org.nuxeo.ecm.core.query.sql.model.MultiExpression; 081import org.nuxeo.ecm.core.query.sql.model.Operand; 082import org.nuxeo.ecm.core.query.sql.model.Operator; 083import org.nuxeo.ecm.core.query.sql.model.OrderByList; 084import org.nuxeo.ecm.core.query.sql.model.Predicate; 085import org.nuxeo.ecm.core.query.sql.model.Reference; 086import org.nuxeo.ecm.core.uidgen.UIDGeneratorService; 087import org.nuxeo.ecm.core.uidgen.UIDSequencer; 088import org.nuxeo.ecm.core.work.api.Work; 089import org.nuxeo.ecm.core.work.api.Work.State; 090import org.nuxeo.ecm.core.work.api.WorkManager; 091import org.nuxeo.ecm.platform.audit.api.AuditReader; 092import org.nuxeo.ecm.platform.audit.api.ExtendedInfo; 093import org.nuxeo.ecm.platform.audit.api.LogEntry; 094import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl; 095import org.nuxeo.ecm.platform.audit.service.AbstractAuditBackend; 096import org.nuxeo.ecm.platform.audit.service.AuditBackend; 097import org.nuxeo.ecm.platform.audit.service.BaseLogEntryProvider; 098import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 099import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 100import org.nuxeo.ecm.platform.query.api.PredicateDefinition; 101import org.nuxeo.ecm.platform.query.api.PredicateFieldDefinition; 102import org.nuxeo.elasticsearch.ElasticSearchConstants; 103import org.nuxeo.elasticsearch.api.ESClient; 104import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 105import org.nuxeo.elasticsearch.query.NxqlQueryConverter; 106import org.nuxeo.runtime.api.Framework; 107import org.nuxeo.runtime.model.DefaultComponent; 108 109import com.fasterxml.jackson.core.JsonFactory; 110import com.fasterxml.jackson.core.JsonGenerator; 111import com.fasterxml.jackson.databind.ObjectMapper; 112 113/** 114 * Implementation of the {@link AuditBackend} interface using Elasticsearch persistence 115 * 116 * @author tiry 117 */ 118public class ESAuditBackend extends AbstractAuditBackend implements AuditBackend { 119 120 public static final String SEQ_NAME = "audit"; 121 122 public static final String MIGRATION_FLAG_PROP = "audit.elasticsearch.migration"; 123 124 public static final String MIGRATION_BATCH_SIZE_PROP = "audit.elasticsearch.migration.batchSize"; 125 126 public static final String MIGRATION_DONE_EVENT = "sqlToElasticsearchMigrationDone"; 127 128 public static final int MIGRATION_DEFAULT_BACTH_SIZE = 1000; 129 130 protected CursorService<Iterator<SearchHit>, SearchHit, String> cursorService; 131 132 public ESAuditBackend(NXAuditEventsService component, AuditBackendDescriptor config) { 133 super(component, config); 134 } 135 136 /** 137 * @since 9.3 138 */ 139 public ESAuditBackend() { 140 super(); 141 } 142 143 protected ESClient esClient; 144 145 protected static final Log log = LogFactory.getLog(ESAuditBackend.class); 146 147 protected BaseLogEntryProvider provider = new BaseLogEntryProvider() { 148 149 @Override 150 public int removeEntries(String eventId, String pathPattern) { 151 throw new UnsupportedOperationException("Not implemented yet!"); 152 } 153 154 @Override 155 public void addLogEntry(LogEntry logEntry) { 156 List<LogEntry> entries = new ArrayList<>(); 157 entries.add(logEntry); 158 addLogEntries(entries); 159 } 160 161 }; 162 163 protected ESClient getClient() { 164 log.info("Activate Elasticsearch backend for Audit"); 165 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 166 ESClient client = esa.getClient(); 167 ensureUIDSequencer(client); 168 return client; 169 } 170 171 protected boolean isMigrationDone() { 172 AuditReader reader = Framework.getService(AuditReader.class); 173 List<LogEntry> entries = reader.queryLogs(new String[] { MIGRATION_DONE_EVENT }, null); 174 return !entries.isEmpty(); 175 } 176 177 @Override 178 public int getApplicationStartedOrder() { 179 int elasticOrder = ((DefaultComponent) Framework.getRuntime().getComponent( 180 "org.nuxeo.elasticsearch.ElasticSearchComponent")).getApplicationStartedOrder(); 181 int uidgenOrder = ((DefaultComponent) Framework.getRuntime().getComponent( 182 "org.nuxeo.ecm.core.uidgen.UIDGeneratorService")).getApplicationStartedOrder(); 183 return Integer.max(elasticOrder, uidgenOrder) + 1; 184 } 185 186 @Override 187 public void onApplicationStarted() { 188 esClient = getClient(); 189 if (Boolean.parseBoolean(Framework.getProperty(MIGRATION_FLAG_PROP))) { 190 if (!isMigrationDone()) { 191 log.info(String.format( 192 "Property %s is true and migration is not done yet, processing audit migration from SQL to Elasticsearch index", 193 MIGRATION_FLAG_PROP)); 194 // Drop audit index first in case of a previous bad migration 195 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 196 esa.dropAndInitIndex(getESIndexName()); 197 int batchSize = MIGRATION_DEFAULT_BACTH_SIZE; 198 String batchSizeProp = Framework.getProperty(MIGRATION_BATCH_SIZE_PROP); 199 if (batchSizeProp != null) { 200 batchSize = Integer.parseInt(batchSizeProp); 201 } 202 migrate(batchSize); 203 } else { 204 log.warn(String.format( 205 "Property %s is true but migration is already done, please set this property to false", 206 MIGRATION_FLAG_PROP)); 207 } 208 } else { 209 log.debug(String.format("Property %s is false, not processing any migration", MIGRATION_FLAG_PROP)); 210 } 211 cursorService = new CursorService<>(SearchHit::getSourceAsString); 212 } 213 214 @Override 215 public void onApplicationStopped() { 216 if (esClient == null) { 217 return; 218 } 219 try { 220 esClient.close(); 221 cursorService.clear(); 222 } catch (Exception e) { 223 log.warn("Fail to close esClient: " + e.getMessage(), e); 224 } finally { 225 esClient = null; 226 cursorService = null; 227 } 228 } 229 230 @Override 231 @SuppressWarnings("unchecked") 232 public List<LogEntry> queryLogs(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder) { 233 // prepare parameters 234 MultiExpression predicate = builder.predicate(); 235 OrderByList orders = builder.orders(); 236 long offset = builder.offset(); 237 long limit = builder.limit(); 238 239 SearchSourceBuilder source = createSearchRequestSource(predicate, orders); 240 241 // Perform search 242 List<LogEntry> logEntries; 243 SearchRequest request = createSearchRequest(); 244 request.source(source); 245 if (limit == 0) { 246 // return all result -> use the scroll api 247 // offset is not taking into account when querying all results 248 TimeValue keepAlive = TimeValue.timeValueMinutes(1); 249 request.scroll(keepAlive); 250 // the size here is the size of each scrolls 251 source.size(100); 252 253 // run request 254 SearchResponse searchResponse = runRequest(request); 255 256 // Build log entries 257 logEntries = buildLogEntries(searchResponse); 258 // Scroll on next results 259 for (; // 260 searchResponse.getHits().getHits().length > 0 261 && logEntries.size() < searchResponse.getHits().getTotalHits().value; // 262 searchResponse = runNextScroll(searchResponse.getScrollId(), keepAlive)) { 263 // Build log entries 264 logEntries.addAll(buildLogEntries(searchResponse)); 265 } 266 } else { 267 // return a page -> use a regular search 268 source.from((int) offset).size((int) limit); 269 270 // run request 271 SearchResponse searchResponse = runRequest(request); 272 273 // Build log entries 274 logEntries = buildLogEntries(searchResponse); 275 } 276 277 return logEntries; 278 } 279 280 protected SearchSourceBuilder createSearchRequestSource(MultiExpression predicate, OrderByList orders) { 281 // create ES query builder 282 QueryBuilder query = createQueryBuilder(predicate); 283 284 // create ES source 285 SearchSourceBuilder source = new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(query)).size(100); 286 287 // create sort 288 orders.forEach(order -> source.sort(order.reference.name, order.isDescending ? SortOrder.DESC : SortOrder.ASC)); 289 return source; 290 } 291 292 protected QueryBuilder createQueryBuilder(MultiExpression andPredicate) { 293 // cast parameters 294 // current implementation only support a MultiExpression with AND operator 295 List<Predicate> predicates = andPredicate.predicates; 296 // current implementation only use Predicate/OrderByExpr with a simple Reference for left and right 297 Function<Operand, String> getFieldName = operand -> ((Reference) operand).name; 298 299 QueryBuilder query; 300 if (predicates.isEmpty()) { 301 query = QueryBuilders.matchAllQuery(); 302 } else { 303 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); 304 for (Predicate predicate : predicates) { 305 String leftName = getFieldName.apply(predicate.lvalue); 306 Operator operator = predicate.operator; 307 Object rightValue = Literals.valueOf(predicate.rvalue); 308 if (rightValue instanceof ZonedDateTime) { 309 // The ZonedDateTime representation is not compatible with Elasticsearch query 310 rightValue = ((ZonedDateTime) rightValue).toEpochSecond(); 311 } 312 if (Operator.EQ.equals(operator)) { 313 boolQuery.must(QueryBuilders.termQuery(leftName, rightValue)); 314 } else if (Operator.NOTEQ.equals(operator)) { 315 boolQuery.mustNot(QueryBuilders.termQuery(leftName, rightValue)); 316 } else if (Operator.LT.equals(operator)) { 317 boolQuery.must(QueryBuilders.rangeQuery(leftName).lt(rightValue)); 318 } else if (Operator.LTEQ.equals(operator)) { 319 boolQuery.must(QueryBuilders.rangeQuery(leftName).lte(rightValue)); 320 } else if (Operator.GTEQ.equals(operator)) { 321 boolQuery.must(QueryBuilders.rangeQuery(leftName).gte(rightValue)); 322 } else if (Operator.GT.equals(operator)) { 323 boolQuery.must(QueryBuilders.rangeQuery(leftName).gt(rightValue)); 324 } else if (Operator.IN.equals(operator)) { 325 boolQuery.must(QueryBuilders.termsQuery(leftName, (List<?>) rightValue)); 326 } else if (Operator.STARTSWITH.equals(operator)) { 327 boolQuery.must(NxqlQueryConverter.makeStartsWithQuery(leftName, rightValue)); 328 } 329 } 330 query = boolQuery; 331 } 332 return query; 333 } 334 335 protected List<LogEntry> buildLogEntries(SearchResponse searchResponse) { 336 List<LogEntry> entries = new ArrayList<>(searchResponse.getHits().getHits().length); 337 ObjectMapper mapper = new ObjectMapper(); 338 for (SearchHit hit : searchResponse.getHits()) { 339 try { 340 entries.add(mapper.readValue(hit.getSourceAsString(), LogEntryImpl.class)); 341 } catch (IOException e) { 342 log.error("Error while reading Audit Entry from ES", e); 343 } 344 } 345 return entries; 346 } 347 348 protected SearchRequest createSearchRequest() { 349 return new SearchRequest(getESIndexName()).searchType(SearchType.DFS_QUERY_THEN_FETCH); 350 } 351 352 @Override 353 public LogEntry getLogEntryByID(long id) { 354 GetResponse ret = esClient.get( 355 new GetRequest(getESIndexName(), String.valueOf(id))); 356 if (!ret.isExists()) { 357 return null; 358 } 359 try { 360 return new ObjectMapper().readValue(ret.getSourceAsString(), LogEntryImpl.class); 361 } catch (IOException e) { 362 throw new NuxeoException("Unable to read Entry for id " + id, e); 363 } 364 } 365 366 public SearchRequest buildQuery(String query, Map<String, Object> params) { 367 if (params != null && params.size() > 0) { 368 query = expandQueryVariables(query, params); 369 } 370 SearchRequest request = createSearchRequest(); 371 SearchSourceBuilder sourceBuilder = createSearchSourceBuilder(query); 372 return request.source(sourceBuilder); 373 } 374 375 protected SearchSourceBuilder createSearchSourceBuilder(String query) { 376 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 377 SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); 378 try { 379 try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( 380 new NamedXContentRegistry(searchModule.getNamedXContents()), THROW_UNSUPPORTED_OPERATION, query)) { 381 searchSourceBuilder.parseXContent(parser); 382 } 383 } catch (IOException | ParsingException e) { 384 log.error("Invalid query: " + query + ": " + e.getMessage(), e); 385 throw new IllegalArgumentException("Bad query: " + query); 386 } 387 return searchSourceBuilder; 388 } 389 390 public String expandQueryVariables(String query, Object[] params) { 391 Map<String, Object> qParams = new HashMap<>(); 392 for (int i = 0; i < params.length; i++) { 393 query = query.replaceFirst("\\?", "\\${param" + i + "}"); 394 qParams.put("param" + i, params[i]); 395 } 396 return expandQueryVariables(query, qParams); 397 } 398 399 public String expandQueryVariables(String query, Map<String, Object> params) { 400 if (params != null && params.size() > 0) { 401 TextTemplate tmpl = new TextTemplate(); 402 for (String key : params.keySet()) { 403 Object val = params.get(key); 404 if (val == null) { 405 continue; 406 } else if (val instanceof Calendar) { 407 tmpl.setVariable(key, Long.toString(((Calendar) val).getTime().getTime())); 408 } else if (val instanceof Date) { 409 tmpl.setVariable(key, Long.toString(((Date) val).getTime())); 410 } else { 411 tmpl.setVariable(key, val.toString()); 412 } 413 } 414 query = tmpl.processText(query); 415 } 416 return query; 417 } 418 419 @Override 420 public List<?> nativeQuery(String query, Map<String, Object> params, int pageNb, int pageSize) { 421 SearchRequest request = buildQuery(query, params); 422 if (pageNb > 0) { 423 request.source().from(pageNb * pageSize); 424 } 425 if (pageSize > 0) { 426 request.source().size(pageSize); 427 } 428 SearchResponse searchResponse = runRequest(request); 429 return buildLogEntries(searchResponse); 430 } 431 432 @Override 433 public List<LogEntry> queryLogsByPage(String[] eventIds, Date limit, String[] categories, String path, int pageNb, 434 int pageSize) { 435 SearchRequest request = createSearchRequest(); 436 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 437 if (eventIds != null && eventIds.length > 0) { 438 if (eventIds.length == 1) { 439 filterBuilder.must(QueryBuilders.termQuery("eventId", eventIds[0])); 440 } else { 441 filterBuilder.must(QueryBuilders.termsQuery("eventId", eventIds)); 442 } 443 } 444 if (categories != null && categories.length > 0) { 445 if (categories.length == 1) { 446 filterBuilder.must(QueryBuilders.termQuery("category", categories[0])); 447 } else { 448 filterBuilder.must(QueryBuilders.termsQuery("category", categories)); 449 } 450 } 451 if (path != null) { 452 filterBuilder.must(QueryBuilders.termQuery("docPath", path)); 453 } 454 455 if (limit != null) { 456 filterBuilder.must(QueryBuilders.rangeQuery("eventDate").lt(convertDate(limit))); 457 } 458 request.source(new SearchSourceBuilder().query(QueryBuilders.constantScoreQuery(filterBuilder))); 459 if (pageNb > 0) { 460 request.source().from(pageNb * pageSize); 461 } 462 if (pageSize > 0) { 463 request.source().size(pageSize); 464 } 465 SearchResponse searchResponse = runRequest(request); 466 return buildLogEntries(searchResponse); 467 } 468 469 @Override 470 public void addLogEntries(List<LogEntry> entries) { 471 472 if (entries.isEmpty()) { 473 return; 474 } 475 476 BulkRequest bulkRequest = new BulkRequest(); 477 JsonFactory factory = new JsonFactory(); 478 479 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 480 UIDSequencer seq = uidGeneratorService.getSequencer(); 481 482 try { 483 List<Long> block = seq.getNextBlock(SEQ_NAME, entries.size()); 484 for (int i = 0; i < entries.size(); i++) { 485 LogEntry entry = entries.get(i); 486 entry.setId(block.get(i)); 487 if (log.isDebugEnabled()) { 488 log.debug(String.format("Indexing log entry: %s", entry)); 489 } 490 entry.setLogDate(new Date()); 491 try (OutputStream out = new BytesStreamOutput(); // 492 JsonGenerator jg = factory.createGenerator(out); // 493 XContentBuilder builder = jsonBuilder(out)) { 494 ObjectMapper mapper = new ObjectMapper(); 495 mapper.writeValue(jg, entry); 496 bulkRequest.add( 497 new IndexRequest(getESIndexName()).id(String.valueOf(entry.getId())).source(builder)); 498 } 499 } 500 501 BulkResponse bulkResponse = esClient.bulk(bulkRequest); 502 if (bulkResponse.hasFailures()) { 503 for (BulkItemResponse response : bulkResponse.getItems()) { 504 if (response.isFailed()) { 505 log.error("Unable to index audit entry " + response.getItemId() + " :" 506 + response.getFailureMessage()); 507 } 508 } 509 } 510 } catch (IOException e) { 511 throw new NuxeoException("Error while indexing Audit entries", e); 512 } 513 514 } 515 516 @Override 517 public Long getEventsCount(String eventId) { 518 SearchResponse res = esClient.search( 519 new SearchRequest(getESIndexName()).source( 520 new SearchSourceBuilder().query( 521 QueryBuilders.constantScoreQuery(QueryBuilders.termQuery("eventId", eventId))) 522 .size(0))); 523 return res.getHits().getTotalHits().value; 524 } 525 526 @Override 527 public long syncLogCreationEntries(final String repoId, final String path, final Boolean recurs) { 528 return syncLogCreationEntries(provider, repoId, path, recurs); 529 } 530 531 public SearchResponse search(SearchRequest request) { 532 String[] indices = request.indices(); 533 if (indices == null || indices.length != 1) { 534 throw new IllegalStateException("Search on audit must include index name: " + request); 535 } 536 if (!getESIndexName().equals(indices[0])) { 537 throw new IllegalStateException("Search on audit must be on audit index: " + request); 538 } 539 return runRequest(request); 540 } 541 542 protected QueryBuilder buildFilter(PredicateDefinition[] predicates, DocumentModel searchDocumentModel) { 543 544 if (searchDocumentModel == null) { 545 return QueryBuilders.matchAllQuery(); 546 } 547 548 BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery(); 549 550 int nbFilters = 0; 551 552 for (PredicateDefinition predicate : predicates) { 553 554 // extract data from DocumentModel 555 PredicateFieldDefinition[] fieldDef = predicate.getValues(); 556 Object[] val = new Object[fieldDef.length]; 557 for (int fidx = 0; fidx < fieldDef.length; fidx++) { 558 if (fieldDef[fidx].getXpath() != null) { 559 val[fidx] = searchDocumentModel.getPropertyValue(fieldDef[fidx].getXpath()); 560 } else { 561 val[fidx] = searchDocumentModel.getProperty(fieldDef[fidx].getSchema(), fieldDef[fidx].getName()); 562 } 563 } 564 565 if (!isNonNullParam(val)) { 566 // skip predicate where all values are null 567 continue; 568 } 569 570 nbFilters++; 571 572 String op = predicate.getOperator(); 573 if (op.equalsIgnoreCase("IN")) { 574 575 String[] values = null; 576 if (val[0] instanceof Iterable<?>) { 577 List<String> l = new ArrayList<>(); 578 Iterable<?> vals = (Iterable<?>) val[0]; 579 580 for (Object v : vals) { 581 if (v != null) { 582 l.add(v.toString()); 583 } 584 } 585 values = l.toArray(new String[l.size()]); 586 } else if (val[0] instanceof Object[]) { 587 values = (String[]) val[0]; 588 } 589 filterBuilder.must(QueryBuilders.termsQuery(predicate.getParameter(), values)); 590 } else if (op.equalsIgnoreCase("BETWEEN")) { 591 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0]))); 592 if (val.length > 1) { 593 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[1]))); 594 } 595 } else if (">".equals(op)) { 596 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gt(convertDate(val[0]))); 597 } else if (">=".equals(op)) { 598 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).gte(convertDate(val[0]))); 599 } else if ("<".equals(op)) { 600 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lt(convertDate(val[0]))); 601 } else if ("<=".equals(op)) { 602 filterBuilder.must(QueryBuilders.rangeQuery(predicate.getParameter()).lte(convertDate(val[0]))); 603 } else { 604 filterBuilder.must(QueryBuilders.termQuery(predicate.getParameter(), convertDate(val[0]))); 605 } 606 } 607 608 if (nbFilters == 0) { 609 return QueryBuilders.matchAllQuery(); 610 } 611 return filterBuilder; 612 } 613 614 protected Object convertDate(Object o) { 615 // Date are convert to timestamp ms which is a known format by default for ES 616 if (o instanceof Calendar) { 617 return Long.valueOf(((Calendar) o).getTime().getTime()); 618 } else if (o instanceof Date) { 619 return Long.valueOf(((Date) o).getTime()); 620 } 621 return o; 622 } 623 624 public SearchRequest buildSearchQuery(String fixedPart, PredicateDefinition[] predicates, 625 DocumentModel searchDocumentModel) { 626 SearchRequest request = createSearchRequest(); 627 QueryBuilder queryBuilder = QueryBuilders.wrapperQuery(fixedPart); 628 QueryBuilder filterBuilder = buildFilter(predicates, searchDocumentModel); 629 request.source( 630 new SearchSourceBuilder().query(QueryBuilders.boolQuery().must(queryBuilder).filter(filterBuilder))); 631 return request; 632 } 633 634 protected boolean isNonNullParam(Object[] val) { 635 if (val == null) { 636 return false; 637 } 638 for (Object v : val) { 639 if (v != null) { 640 if (v instanceof String) { 641 if (!((String) v).isEmpty()) { 642 return true; 643 } 644 } else if (v instanceof String[]) { 645 if (((String[]) v).length > 0) { 646 return true; 647 } 648 } else { 649 return true; 650 } 651 } 652 } 653 return false; 654 } 655 656 @SuppressWarnings("deprecation") 657 public String migrate(final int batchSize) { 658 659 final String MIGRATION_WORK_ID = "AuditMigration"; 660 661 WorkManager wm = Framework.getService(WorkManager.class); 662 State migrationState = wm.getWorkState(MIGRATION_WORK_ID); 663 if (migrationState != null) { 664 return "Migration already scheduled : " + migrationState.toString(); 665 } 666 667 Work migrationWork = new ESAuditMigrationWork(MIGRATION_WORK_ID, batchSize); 668 wm.schedule(migrationWork); 669 return "Migration work started : " + MIGRATION_WORK_ID; 670 } 671 672 SearchResponse runRequest(SearchRequest request) { 673 logSearchRequest(request); 674 SearchResponse response = esClient.search(request); 675 logSearchResponse(response); 676 return response; 677 } 678 679 SearchResponse runNextScroll(String scrollId, TimeValue keepAlive) { 680 if (log.isDebugEnabled()) { 681 log.debug(String.format( 682 "Scroll request: -XGET 'localhost:9200/_search/scroll' -d '{\"scroll\": \"%s\", \"scroll_id\": \"%s\" }'", 683 keepAlive, scrollId)); 684 } 685 SearchResponse response = esClient.searchScroll(new SearchScrollRequest(scrollId).scroll(keepAlive)); 686 logSearchResponse(response); 687 return response; 688 } 689 690 protected void logSearchResponse(SearchResponse response) { 691 if (log.isDebugEnabled()) { 692 log.debug("Response: " + response.toString()); 693 } 694 } 695 696 protected void logSearchRequest(SearchRequest request) { 697 if (log.isDebugEnabled()) { 698 log.debug(String.format("Search query: curl -XGET 'http://localhost:9200/%s/_search?pretty' -d '%s'", 699 getESIndexName(), request.toString())); 700 } 701 } 702 703 /** 704 * Ensures the audit sequence returns an UID greater or equal than the maximum log entry id. 705 */ 706 protected void ensureUIDSequencer(ESClient esClient) { 707 boolean auditIndexExists = esClient.indexExists(getESIndexName()); 708 if (!auditIndexExists) { 709 return; 710 } 711 712 // Get max log entry id 713 SearchRequest request = createSearchRequest(); 714 request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) 715 .aggregation(AggregationBuilders.max("maxAgg").field("id"))); 716 SearchResponse searchResponse = esClient.search(request); 717 Aggregation agg = searchResponse.getAggregations().get("maxAgg"); 718 long maxLogEntryId = 0; 719 if (agg.getMetadata() != null && agg.getMetadata().containsKey(Aggregation.CommonFields.VALUE)) { 720 maxLogEntryId = (long) agg.getMetadata().get(Aggregation.CommonFields.VALUE); 721 } 722 723 // Get next sequence id 724 UIDGeneratorService uidGeneratorService = Framework.getService(UIDGeneratorService.class); 725 UIDSequencer seq = uidGeneratorService.getSequencer(); 726 seq.init(); 727 long nextSequenceId = seq.getNextLong(SEQ_NAME); 728 729 // Increment sequence to max log entry id if needed 730 if (nextSequenceId < maxLogEntryId) { 731 log.info(String.format("Next UID returned by %s sequence is %d, initializing sequence to %d", SEQ_NAME, 732 nextSequenceId, maxLogEntryId)); 733 seq.initSequence(SEQ_NAME, maxLogEntryId); 734 } 735 } 736 737 @Override 738 public ExtendedInfo newExtendedInfo(Serializable value) { 739 return new ESExtendedInfo(value); 740 } 741 742 protected String getESIndexName() { 743 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 744 return esa.getIndexNameForType(ElasticSearchConstants.ENTRY_TYPE); 745 } 746 747 @Override 748 public void append(List<String> jsonEntries) { 749 BulkRequest bulkRequest = new BulkRequest(); 750 for (String json : jsonEntries) { 751 try { 752 Object entryId = new JSONObject(json).opt(LOG_ID); 753 if (entryId == null) { 754 throw new NuxeoException("A json entry has an empty id. entry=" + json); 755 } 756 IndexRequest request = new IndexRequest(getESIndexName()).id(entryId.toString()); 757 request.source(json, XContentType.JSON); 758 bulkRequest.add(request); 759 } catch (JSONException e) { 760 throw new NuxeoException("Unable to deserialize json entry=" + json, e); 761 } 762 } 763 esClient.bulk(bulkRequest); 764 } 765 766 @SuppressWarnings("resource") // CursorResult is being registered, must not be closed 767 @Override 768 public ScrollResult<String> scroll(org.nuxeo.ecm.core.query.sql.model.QueryBuilder builder, int batchSize, 769 int keepAliveSeconds) { 770 // prepare parameters 771 MultiExpression predicate = builder.predicate(); 772 OrderByList orders = builder.orders(); 773 774 // create source 775 SearchSourceBuilder source = createSearchRequestSource(predicate, orders); 776 source.size(batchSize); 777 // create request 778 SearchRequest request = createSearchRequest(); 779 request.source(source).scroll(TimeValue.timeValueSeconds(keepAliveSeconds)); 780 SearchResponse response = runRequest(request); 781 // register cursor 782 String scrollId = cursorService.registerCursorResult(new ESCursorResult(response, batchSize, keepAliveSeconds)); 783 return scroll(scrollId); 784 } 785 786 @Override 787 public ScrollResult<String> scroll(String scrollId) { 788 return cursorService.scroll(scrollId); 789 } 790 791 public class ESCursorResult extends CursorResult<Iterator<SearchHit>, SearchHit> { 792 793 protected final String scrollId; 794 795 protected boolean end; 796 797 public ESCursorResult(SearchResponse response, int batchSize, int keepAliveSeconds) { 798 super(response.getHits().iterator(), batchSize, keepAliveSeconds); 799 this.scrollId = response.getScrollId(); 800 } 801 802 @Override 803 public boolean hasNext() { 804 if (cursor == null || end) { 805 return false; 806 } else if (cursor.hasNext()) { 807 return true; 808 } else { 809 runNextScroll(); 810 return !end; 811 } 812 } 813 814 @Override 815 public SearchHit next() { 816 if (cursor != null && !cursor.hasNext() && !end) { 817 // try to run a next scroll 818 runNextScroll(); 819 } 820 return super.next(); 821 } 822 823 protected void runNextScroll() { 824 SearchResponse response = ESAuditBackend.this.runNextScroll(scrollId, 825 TimeValue.timeValueSeconds(keepAliveSeconds)); 826 cursor = response.getHits().iterator(); 827 end = !cursor.hasNext(); 828 } 829 830 @Override 831 public void close() { 832 ClearScrollRequest request = new ClearScrollRequest(); 833 request.addScrollId(scrollId); 834 esClient.clearScroll(request); 835 end = true; 836 // Call super close to clear cursor 837 super.close(); 838 } 839 840 } 841 842}