001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.migrator; 021 022import java.util.Collection; 023import java.util.function.Consumer; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.nuxeo.ecm.core.api.CoreInstance; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.runtime.migration.MigrationService.MigrationContext; 030import org.nuxeo.runtime.migration.MigrationService.Migrator; 031import org.nuxeo.runtime.transaction.TransactionHelper; 032 033/** 034 * @since 10.3 035 */ 036public abstract class AbstractRepositoryMigrator implements Migrator { 037 038 private static final Log log = LogFactory.getLog(AbstractRepositoryMigrator.class); 039 040 protected MigrationContext migrationContext; 041 042 protected String probeRepository(String repositoryName) { 043 return TransactionHelper.runInTransaction(() -> CoreInstance.doPrivileged(repositoryName, this::probeSession)); 044 } 045 046 protected void checkShutdownRequested() { 047 if (migrationContext.isShutdownRequested()) { 048 throw new MigrationShutdownException(); 049 } 050 } 051 052 protected void reportProgress(String message, long num, long total) { 053 log.debug(message + ": " + num + "/" + total); 054 migrationContext.reportProgress(message, num, total); 055 } 056 057 protected void reportProgress(String repositoryName, String message, long num, long total) { 058 reportProgress(String.format("[%s] %s", repositoryName, message), num, total); 059 } 060 061 protected void migrateRepository(String repositoryName) { 062 TransactionHelper.runInTransaction(() -> CoreInstance.doPrivileged(repositoryName, this::migrateSession)); 063 } 064 065 /** 066 * Runs a consumer on the collection, committing every BATCH_SIZE elements, reporting progress and checking for 067 * shutdown request. 068 */ 069 protected <T> void processBatched(int batchSize, Collection<T> collection, Consumer<T> consumer, 070 String progressMessage) { 071 int size = collection.size(); 072 int i = -1; 073 for (T element : collection) { 074 consumer.accept(element); 075 checkShutdownRequested(); 076 i++; 077 if (i % batchSize == 0 || i == size - 1) { 078 reportProgress(progressMessage, i + 1, size); 079 TransactionHelper.commitOrRollbackTransaction(); 080 TransactionHelper.startTransaction(); 081 } 082 } 083 } 084 085 protected abstract String probeSession(CoreSession session); 086 087 protected abstract void migrateSession(CoreSession session); 088 089 // exception used for simpler flow control 090 protected static class MigrationShutdownException extends RuntimeException { 091 092 private static final long serialVersionUID = 1L; 093 094 public MigrationShutdownException() { 095 super(); 096 } 097 } 098 099 100}