001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Antoine Taillefer <ataillefer@nuxeo.com>
018 */
019package org.nuxeo.elasticsearch.audit;
020
021import java.util.Calendar;
022import java.util.Collections;
023import java.util.List;
024import java.util.TimeZone;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.ecm.core.api.security.SecurityConstants;
029import org.nuxeo.ecm.core.work.AbstractWork;
030import org.nuxeo.ecm.platform.audit.api.AuditLogger;
031import org.nuxeo.ecm.platform.audit.api.LogEntry;
032import org.nuxeo.ecm.platform.audit.service.AuditBackend;
033import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
034import org.nuxeo.ecm.platform.audit.service.extension.AuditBackendDescriptor;
035import org.nuxeo.runtime.api.Framework;
036import org.nuxeo.runtime.transaction.TransactionHelper;
037
038/**
039 * Work for the SQL to Elasticsearch audit migration.
040 *
041 * @since 7.10
042 */
043public class ESAuditMigrationWork extends AbstractWork {
044
045    private static final long serialVersionUID = 3764830939638449534L;
046
047    private static final Log log = LogFactory.getLog(ESAuditMigrationWork.class);
048
049    protected int batchSize;
050
051    public ESAuditMigrationWork(String id, int batchSize) {
052        super(id);
053        this.batchSize = batchSize;
054    }
055
056    @Override
057    public String getTitle() {
058        return "Audit migration worker";
059    }
060
061    @Override
062    public void work() {
063
064        NXAuditEventsService auditService = (NXAuditEventsService) Framework.getRuntime().getComponent(
065                NXAuditEventsService.NAME);
066        AuditBackendDescriptor config = new AuditBackendDescriptor();
067        AuditBackend sourceBackend = config.newInstance(auditService);
068        sourceBackend.onApplicationStarted();
069
070        try {
071        @SuppressWarnings("unchecked")
072        List<Long> res = (List<Long>) sourceBackend.nativeQuery("select count(*) from LogEntry", 1, 20);
073        long nbEntriesToMigrate = res.get(0);
074
075        AuditLogger destBackend = auditService.getBackend();
076
077        TransactionHelper.commitOrRollbackTransaction();
078            long t0 = System.currentTimeMillis();
079            long nbEntriesMigrated = 0;
080            int pageIdx = 1;
081
082            while (nbEntriesMigrated < nbEntriesToMigrate) {
083                int pageIdxF = pageIdx;
084                @SuppressWarnings("unchecked")
085                List<LogEntry> entries = TransactionHelper.runInTransaction(() -> (List<LogEntry>) sourceBackend.nativeQuery(
086                        "from LogEntry log order by log.id asc", pageIdxF, batchSize));
087
088                if (entries.size() == 0) {
089                    log.warn("Migration ending after " + nbEntriesMigrated + " entries");
090                    break;
091                }
092                setProgress(new Progress(nbEntriesMigrated, nbEntriesToMigrate));
093                destBackend.addLogEntries(entries);
094                pageIdx++;
095                nbEntriesMigrated += entries.size();
096                log.info("Migrated " + nbEntriesMigrated + " log entries on " + nbEntriesToMigrate);
097                double dt = (System.currentTimeMillis() - t0) / 1000.0;
098                if (dt != 0) {
099                    log.info("Migration speed: " + (nbEntriesMigrated / dt) + " entries/s");
100                }
101            }
102            log.info("Audit migration from SQL to Elasticsearch done: " + nbEntriesMigrated + " entries migrated");
103
104            // Log technical event in audit as a flag to know if the migration has been processed at application
105            // startup
106            AuditLogger logger = Framework.getService(AuditLogger.class);
107            LogEntry entry = logger.newLogEntry();
108            entry.setCategory("NuxeoTechnicalEvent");
109            entry.setEventId(ESAuditBackend.MIGRATION_DONE_EVENT);
110            entry.setPrincipalName(SecurityConstants.SYSTEM_USERNAME);
111            entry.setEventDate(Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
112            destBackend.addLogEntries(Collections.singletonList(entry));
113        } finally {
114            sourceBackend.onApplicationStopped();
115        }
116    }
117
118}