001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.migrator; 021 022import java.util.Collection; 023import java.util.function.Consumer; 024 025import org.apache.logging.log4j.LogManager; 026import org.apache.logging.log4j.Logger; 027import org.nuxeo.ecm.core.api.CoreInstance; 028import org.nuxeo.ecm.core.api.CoreSession; 029import org.nuxeo.runtime.migration.MigrationService.MigrationContext; 030import org.nuxeo.runtime.migration.MigrationService.Migrator; 031import org.nuxeo.runtime.transaction.TransactionHelper; 032 033/** 034 * @since 10.3 035 */ 036public abstract class AbstractRepositoryMigrator implements Migrator { 037 038 private static final Logger log = LogManager.getLogger(AbstractRepositoryMigrator.class); 039 040 /** 041 * @deprecated since 11.1, please use parameters when using or implementing: 042 * <ul> 043 * <li>{@link #migrateRepository(String, MigrationContext, String)}</li> 044 * <li>{@link #migrateSession(String, MigrationContext, CoreSession)}</li> 045 * <li>{@link #checkShutdownRequested(MigrationContext)} 046 * <li>{@link #processBatched(MigrationContext, int, Collection, Consumer, String)} 047 * </ul> 048 */ 049 @Deprecated(since = "11.1", forRemoval = true) 050 protected MigrationContext migrationContext; 051 052 protected String probeRepository(String repositoryName) { 053 return TransactionHelper.runInTransaction(() -> CoreInstance.doPrivileged(repositoryName, this::probeSession)); 054 } 055 056 /** 057 * @deprecated since 11.1. Use {@link #checkShutdownRequested(MigrationContext)} instead 058 */ 059 @Deprecated(since = "11.1", forRemoval = true) 060 protected void checkShutdownRequested() { 061 checkShutdownRequested(this.migrationContext); 062 } 063 064 protected void checkShutdownRequested(MigrationContext migrationContext) { 065 if (migrationContext.isShutdownRequested()) { 066 throw new MigrationShutdownException(); 067 } 068 } 069 070 protected void reportProgress(String message, long num, long total) { 071 log.debug("{}: {}/{}", message, num, total); 072 migrationContext.reportProgress(message, num, total); 073 } 074 075 protected void reportProgress(String repositoryName, String message, long num, long total) { 076 reportProgress(String.format("[%s] %s", repositoryName, message), num, total); 077 } 078 079 /** 080 * @deprecated since 11.1. Use {@link #migrateRepository(String, MigrationContext, String)} instead 081 */ 082 @Deprecated(since = "11.1", forRemoval = true) 083 protected void migrateRepository(String repositoryName) { 084 TransactionHelper.runInTransaction(() -> CoreInstance.doPrivileged(repositoryName, 085 (CoreSession session) -> migrateSession(session))); 086 } 087 088 /** 089 * @since 11.1 090 */ 091 protected void migrateRepository(String step, MigrationContext migrationContext, String repositoryName) { 092 TransactionHelper.runInTransaction(() -> CoreInstance.doPrivileged(repositoryName, 093 (CoreSession session) -> migrateSession(step, migrationContext, session))); 094 } 095 096 /** 097 * @since 11.1 098 */ 099 protected abstract void migrateSession(String step, MigrationContext context, CoreSession session); 100 101 /** 102 * @deprecated since 11.1 Use {@link #processBatched(MigrationContext, int, Collection, Consumer, String)} instead 103 */ 104 @Deprecated(since = "11.1", forRemoval = true) 105 protected <T> void processBatched(int batchSize, Collection<T> collection, Consumer<T> consumer, 106 String progressMessage) { 107 processBatched(migrationContext, batchSize, collection, consumer, progressMessage); 108 } 109 110 /** 111 * Runs a consumer on the collection, committing every BATCH_SIZE elements, reporting progress and checking for 112 * shutdown request. 113 */ 114 protected <T> void processBatched(MigrationContext migrationContext, int batchSize, Collection<T> collection, 115 Consumer<T> consumer, String progressMessage) { 116 int size = collection.size(); 117 int i = -1; 118 for (T element : collection) { 119 consumer.accept(element); 120 checkShutdownRequested(migrationContext); 121 i++; 122 if (i % batchSize == 0 || i == size - 1) { 123 reportProgress(progressMessage, i + 1, size); 124 TransactionHelper.commitOrRollbackTransaction(); 125 TransactionHelper.startTransaction(); 126 } 127 } 128 } 129 130 protected abstract String probeSession(CoreSession session); 131 132 /** 133 * @deprecated since 11.1 Use {@link #migrateSession(String, MigrationContext, CoreSession)} instead 134 */ 135 @Deprecated(since = "11.1", forRemoval = true) 136 protected void migrateSession(CoreSession session) { 137 migrateSession(null, this.migrationContext, session); 138 } 139 140 // exception used for simpler flow control 141 protected static class MigrationShutdownException extends RuntimeException { 142 143 private static final long serialVersionUID = 1L; 144 145 public MigrationShutdownException() { 146 super(); 147 } 148 } 149 150}