001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Antoine Taillefer <ataillefer@nuxeo.com> 018 */ 019package org.nuxeo.elasticsearch.audit; 020 021import java.util.Calendar; 022import java.util.Collections; 023import java.util.List; 024import java.util.TimeZone; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.ecm.core.api.security.SecurityConstants; 029import org.nuxeo.ecm.core.work.AbstractWork; 030import org.nuxeo.ecm.platform.audit.api.AuditLogger; 031import org.nuxeo.ecm.platform.audit.api.LogEntry; 032import org.nuxeo.ecm.platform.audit.service.AuditBackend; 033import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService; 034import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor; 035import org.nuxeo.runtime.api.Framework; 036import org.nuxeo.runtime.transaction.TransactionHelper; 037 038/** 039 * Work for the SQL to Elasticsearch audit migration. 040 * 041 * @since 7.10 042 */ 043public class ESAuditMigrationWork extends AbstractWork { 044 045 private static final long serialVersionUID = 3764830939638449534L; 046 047 private static final Log log = LogFactory.getLog(ESAuditMigrationWork.class); 048 049 protected int batchSize; 050 051 public ESAuditMigrationWork(String id, int batchSize) { 052 super(id); 053 this.batchSize = batchSize; 054 } 055 056 @Override 057 public String getTitle() { 058 return "Audit migration worker"; 059 } 060 061 @Override 062 public void work() { 063 064 NXAuditEventsService auditService = (NXAuditEventsService) Framework.getRuntime().getComponent( 065 NXAuditEventsService.NAME); 066 AuditBackendDescriptor config = new AuditBackendDescriptor(); 067 AuditBackend sourceBackend = config.newInstance(auditService); 068 sourceBackend.onApplicationStarted(); 069 070 try { 071 @SuppressWarnings("unchecked") 072 List<Long> res = (List<Long>) sourceBackend.nativeQuery("select count(*) from LogEntry", 1, 20); 073 long nbEntriesToMigrate = res.get(0); 074 075 AuditLogger destBackend = auditService.getBackend(); 076 077 TransactionHelper.commitOrRollbackTransaction(); 078 long t0 = System.currentTimeMillis(); 079 long nbEntriesMigrated = 0; 080 int pageIdx = 1; 081 082 while (nbEntriesMigrated < nbEntriesToMigrate) { 083 @SuppressWarnings("unchecked") 084 List<LogEntry> entries = (List<LogEntry>) sourceBackend.nativeQuery( 085 "from LogEntry log order by log.id asc", pageIdx, batchSize); 086 087 if (entries.size() == 0) { 088 log.warn("Migration ending after " + nbEntriesMigrated + " entries"); 089 break; 090 } 091 setProgress(new Progress(nbEntriesMigrated, nbEntriesToMigrate)); 092 destBackend.addLogEntries(entries); 093 pageIdx++; 094 nbEntriesMigrated += entries.size(); 095 log.info("Migrated " + nbEntriesMigrated + " log entries on " + nbEntriesToMigrate); 096 double dt = (System.currentTimeMillis() - t0) / 1000.0; 097 if (dt != 0) { 098 log.info("Migration speed: " + (nbEntriesMigrated / dt) + " entries/s"); 099 } 100 } 101 log.info("Audit migration from SQL to Elasticsearch done: " + nbEntriesMigrated + " entries migrated"); 102 103 // Log technical event in audit as a flag to know if the migration has been processed at application 104 // startup 105 AuditLogger logger = Framework.getService(AuditLogger.class); 106 LogEntry entry = logger.newLogEntry(); 107 entry.setCategory("NuxeoTechnicalEvent"); 108 entry.setEventId(ESAuditBackend.MIGRATION_DONE_EVENT); 109 entry.setPrincipalName(SecurityConstants.SYSTEM_USERNAME); 110 entry.setEventDate(Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime()); 111 destBackend.addLogEntries(Collections.singletonList(entry)); 112 } finally { 113 sourceBackend.onApplicationStopped(); 114 } 115 } 116 117}