001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl-2.1.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Antoine Taillefer <ataillefer@nuxeo.com>
016 */
017package org.nuxeo.elasticsearch.audit;
018
019import java.util.Calendar;
020import java.util.Collections;
021import java.util.List;
022import java.util.TimeZone;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.nuxeo.ecm.core.api.security.SecurityConstants;
027import org.nuxeo.ecm.core.work.AbstractWork;
028import org.nuxeo.ecm.platform.audit.api.AuditLogger;
029import org.nuxeo.ecm.platform.audit.api.LogEntry;
030import org.nuxeo.ecm.platform.audit.service.AuditBackend;
031import org.nuxeo.ecm.platform.audit.service.DefaultAuditBackend;
032import org.nuxeo.ecm.platform.audit.service.NXAuditEventsService;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.transaction.TransactionHelper;
035
036/**
037 * Work for the SQL to Elasticsearch audit migration.
038 *
039 * @since 7.10
040 */
041public class ESAuditMigrationWork extends AbstractWork {
042
043    private static final long serialVersionUID = 3764830939638449534L;
044
045    private static final Log log = LogFactory.getLog(ESAuditMigrationWork.class);
046
047    protected int batchSize;
048
049    public ESAuditMigrationWork(String id, int batchSize) {
050        super(id);
051        this.batchSize = batchSize;
052    }
053
054    @Override
055    public String getTitle() {
056        return "Audit migration worker";
057    }
058
059    @Override
060    public void work() {
061
062        NXAuditEventsService auditService = (NXAuditEventsService) Framework.getRuntime().getComponent(
063                NXAuditEventsService.NAME);
064
065        final AuditBackend sourceBackend = new DefaultAuditBackend();
066        sourceBackend.activate(auditService);
067
068        @SuppressWarnings("unchecked")
069        List<Long> res = (List<Long>) sourceBackend.nativeQuery("select count(*) from LogEntry", 1, 20);
070        final long nbEntriesToMigrate = res.get(0);
071
072        final AuditLogger destBackend = auditService.getBackend();
073
074        TransactionHelper.commitOrRollbackTransaction();
075        try {
076            long t0 = System.currentTimeMillis();
077            long nbEntriesMigrated = 0;
078            int pageIdx = 1;
079
080            while (nbEntriesMigrated < nbEntriesToMigrate) {
081                @SuppressWarnings("unchecked")
082                List<LogEntry> entries = (List<LogEntry>) sourceBackend.nativeQuery(
083                        "from LogEntry log order by log.id asc", pageIdx, batchSize);
084
085                if (entries.size() == 0) {
086                    log.warn("Migration ending after " + nbEntriesMigrated + " entries");
087                    break;
088                }
089                setProgress(new Progress(nbEntriesMigrated, nbEntriesToMigrate));
090                destBackend.addLogEntries(entries);
091                pageIdx++;
092                nbEntriesMigrated += entries.size();
093                log.info("Migrated " + nbEntriesMigrated + " log entries on " + nbEntriesToMigrate);
094                double dt = (System.currentTimeMillis() - t0) / 1000.0;
095                if (dt != 0) {
096                    log.info("Migration speed: " + (nbEntriesMigrated / dt) + " entries/s");
097                }
098            }
099            log.info("Audit migration from SQL to Elasticsearch done: " + nbEntriesMigrated + " entries migrated");
100
101            // Log technical event in audit as a flag to know if the migration has been processed at application
102            // startup
103            AuditLogger logger = Framework.getService(AuditLogger.class);
104            LogEntry entry = logger.newLogEntry();
105            entry.setCategory("NuxeoTechnicalEvent");
106            entry.setEventId(ESAuditBackend.MIGRATION_DONE_EVENT);
107            entry.setPrincipalName(SecurityConstants.SYSTEM_USERNAME);
108            entry.setEventDate(Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
109            destBackend.addLogEntries(Collections.singletonList(entry));
110        } finally {
111            TransactionHelper.startTransaction();
112            sourceBackend.deactivate();
113        }
114    }
115
116}