001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.migration; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.time.Instant; 027import java.util.Collection; 028import java.util.List; 029import java.util.Objects; 030import java.util.Random; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.SynchronousQueue; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.function.BiConsumer; 036import java.util.function.Consumer; 037 038import org.apache.commons.io.IOUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.logging.log4j.LogManager; 041import org.apache.logging.log4j.Logger; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.kv.KeyValueService; 044import org.nuxeo.runtime.kv.KeyValueServiceImpl; 045import org.nuxeo.runtime.kv.KeyValueStore; 046import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor; 047import org.nuxeo.runtime.model.ComponentContext; 048import org.nuxeo.runtime.model.ComponentManager; 049import org.nuxeo.runtime.model.DefaultComponent; 050import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 051import org.nuxeo.runtime.pubsub.SerializableMessage; 052 053/** 054 * Implementation for the Migration Service. 055 * <p> 056 * Data about migration status is stored in the "migration" Key/Value Store in the following format: 057 * 058 * <pre> 059 * mymigration:lock write lock, containing debug info about locker; set with a TTL 060 * mymigration the state of the migration, if not running 061 * mymigration:step the step of the migration, if running 062 * mymigration:starttime the migration step start time (milliseconds since epoch) 063 * mymigration:pingtime the migration step last ping time (milliseconds since epoch) 064 * mymigration:message the migration step current message 065 * mymigration:num the migration step current num 066 * mymigration:total the migration step current total 067 * </pre> 068 * 069 * @since 9.3 070 */ 071public class MigrationServiceImpl extends DefaultComponent implements MigrationService { 072 073 private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class); 074 075 public static final String KEYVALUE_STORE_NAME = "migration"; 076 077 public static final String XP_CONFIG = "configuration"; 078 079 public static final String LOCK = ":lock"; 080 081 public static final String STEP = ":step"; 082 083 public static final String START_TIME = ":starttime"; 084 085 public static final String PING_TIME = ":pingtime"; 086 087 public static final String PROGRESS_MESSAGE = ":message"; 088 089 public static final String PROGRESS_NUM = ":num"; 090 091 public static final String PROGRESS_TOTAL = ":total"; 092 093 public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough 094 095 public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval"; 096 097 public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled"; 098 099 public static final String NODE_ID_PROP = "repository.clustering.id"; 100 101 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 102 103 protected MigrationThreadPoolExecutor executor; 104 105 protected MigrationInvalidator invalidator; 106 107 public static class MigrationInvalidation implements SerializableMessage { 108 109 private static final long serialVersionUID = 1L; 110 111 public final String id; 112 113 public MigrationInvalidation(String id) { 114 this.id = id; 115 } 116 117 @Override 118 public void serialize(OutputStream out) throws IOException { 119 IOUtils.write(id, out, UTF_8); 120 } 121 122 public static MigrationInvalidation deserialize(InputStream in) throws IOException { 123 String id = IOUtils.toString(in, UTF_8); 124 return new MigrationInvalidation(id); 125 } 126 127 @Override 128 public String toString() { 129 return getClass().getSimpleName() + "(" + id + ")"; 130 } 131 } 132 133 public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> { 134 135 @Override 136 public MigrationInvalidation deserialize(InputStream in) throws IOException { 137 return MigrationInvalidation.deserialize(in); 138 } 139 140 @Override 141 public void receivedMessage(MigrationInvalidation message) { 142 String id = message.id; 143 Migrator migrator = getMigrator(id); 144 if (migrator == null) { 145 log.error("Unknown migration id received in invalidation: {}", id); 146 return; 147 } 148 migrator.notifyStatusChange(); 149 } 150 } 151 152 protected static KeyValueStore getKeyValueStore() { 153 KeyValueService service = Framework.getService(KeyValueService.class); 154 Objects.requireNonNull(service, "Missing KeyValueService"); 155 return service.getKeyValueStore(KEYVALUE_STORE_NAME); 156 } 157 158 public Collection<MigrationDescriptor> getMigrationDescriptors() { 159 return getDescriptors(XP_CONFIG); 160 } 161 162 @Override 163 public int getApplicationStartedOrder() { 164 return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10; 165 } 166 167 /** 168 * Progress reporter that reports progress in the key/value store. 169 * 170 * @since 9.3 171 */ 172 protected static class ProgressReporter { 173 174 protected final String id; 175 176 public ProgressReporter(String id) { 177 this.id = id; 178 } 179 180 /** 181 * Reports progress. If num or total are -2 then null is used. 182 */ 183 public void reportProgress(String message, long num, long total, boolean ping) { 184 KeyValueStore keyValueStore = getKeyValueStore(); 185 keyValueStore.put(id + PROGRESS_MESSAGE, message); 186 keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num)); 187 keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total)); 188 keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null); 189 } 190 } 191 192 /** 193 * Migration context implementation that reports progress in the key/value store and can be shutdown. 194 * 195 * @since 9.3 196 */ 197 protected static class MigrationContextImpl implements MigrationContext { 198 199 protected final ProgressReporter progressReporter; 200 201 protected volatile boolean shutdown; 202 203 public MigrationContextImpl(ProgressReporter progressReporter) { 204 this.progressReporter = progressReporter; 205 } 206 207 @Override 208 public void reportProgress(String message, long num, long total) { 209 progressReporter.reportProgress(message, num, total, true); 210 } 211 212 @Override 213 public void requestShutdown() { 214 shutdown = true; 215 } 216 217 @Override 218 public boolean isShutdownRequested() { 219 return shutdown || Thread.currentThread().isInterrupted(); 220 } 221 } 222 223 /** 224 * Runnable for the migrator, that knows about the migration context. 225 * 226 * @since 9.3 227 */ 228 protected static class MigratorWithContext implements Runnable { 229 230 protected final Consumer<MigrationContext> migration; 231 232 protected final MigrationContext migrationContext; 233 234 protected final BiConsumer<MigrationContext, Throwable> afterMigration; 235 236 public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter, 237 BiConsumer<MigrationContext, Throwable> afterMigration) { 238 this.migration = migration; 239 this.migrationContext = new MigrationContextImpl(progressReporter); 240 this.afterMigration = afterMigration; 241 } 242 243 @Override 244 public void run() { 245 migration.accept(migrationContext); 246 } 247 248 public void afterMigration(Throwable t) { 249 afterMigration.accept(migrationContext, t); 250 } 251 252 public void requestShutdown() { 253 migrationContext.requestShutdown(); 254 } 255 } 256 257 /** 258 * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them. 259 * 260 * @since 9.3 261 */ 262 protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor { 263 264 protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>(); 265 266 public MigrationThreadPoolExecutor() { 267 // like Executors.newCachedThreadPool but with keepAliveTime of 0 268 super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 269 } 270 271 @Override 272 protected void beforeExecute(Thread thread, Runnable runnable) { 273 runnables.add((MigratorWithContext) runnable); 274 } 275 276 @Override 277 protected void afterExecute(Runnable runnable, Throwable t) { 278 runnables.remove(runnable); 279 ((MigratorWithContext) runnable).afterMigration(t); 280 } 281 282 public void requestShutdown() { 283 runnables.forEach(MigratorWithContext::requestShutdown); 284 } 285 } 286 287 @Override 288 public void start(ComponentContext context) { 289 super.start(context); 290 if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) { 291 // register migration invalidator 292 String nodeId = Framework.getProperty(NODE_ID_PROP); 293 if (StringUtils.isBlank(nodeId)) { 294 nodeId = String.valueOf(RANDOM.nextLong()); 295 log.warn("Missing cluster node id configuration, please define it explicitly " 296 + "(usually through repository.clustering.id). Using random cluster node id instead: " 297 + nodeId); 298 } else { 299 nodeId = nodeId.trim(); 300 } 301 invalidator = new MigrationInvalidator(); 302 invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId); 303 log.info("Registered migration invalidator for node: {}", nodeId); 304 } else { 305 log.info("Not registering a migration invalidator because clustering is not enabled"); 306 } 307 308 executor = new MigrationThreadPoolExecutor(); 309 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 310 311 @Override 312 public void beforeStop(ComponentManager mgr, boolean isStandby) { 313 // flag all migration threads as shutdown requested, without interrupting them 314 executor.requestShutdown(); 315 } 316 317 @Override 318 public void afterStop(ComponentManager mgr, boolean isStandby) { 319 Framework.getRuntime().getComponentManager().removeListener(this); 320 } 321 }); 322 } 323 324 @Override 325 public void stop(ComponentContext context) throws InterruptedException { 326 // interrupt all migration tasks 327 executor.shutdownNow(); 328 executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination 329 executor = null; 330 super.stop(context); 331 } 332 333 @Override 334 public MigrationStatus getStatus(String id) { 335 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 336 if (descr == null) { 337 return null; // migration unknown 338 } 339 KeyValueStore kv = getKeyValueStore(); 340 String state = kv.getString(id); 341 if (state != null) { 342 return new MigrationStatus(state); 343 } 344 String step = kv.getString(id + STEP); 345 if (step == null) { 346 state = descr.defaultState; 347 return new MigrationStatus(state); 348 } 349 long startTime = Long.parseLong(kv.getString(id + START_TIME)); 350 long pingTime = Long.parseLong(kv.getString(id + PING_TIME)); 351 String progressMessage = kv.getString(id + PROGRESS_MESSAGE); 352 long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM)); 353 long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL)); 354 if (progressMessage == null) { 355 progressMessage = ""; 356 } 357 return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal); 358 } 359 360 @Override 361 public String probeAndSetState(String id) { 362 Migrator migrator = getMigrator(id); 363 String state = migrator.probeState(); 364 if (state != null) { 365 ProgressReporter progressReporter = new ProgressReporter(id); 366 setState(id, state, migrator, progressReporter); 367 } 368 return state; 369 } 370 371 protected void setState(String id, String state, Migrator migrator, ProgressReporter progressReporter) { 372 atomic(id, kv -> { 373 String currentState = kv.getString(id); 374 String currentStep = kv.getString(id + STEP); 375 if (currentState == null && currentStep != null) { 376 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 377 } 378 setState(id, state, progressReporter, kv); 379 }); 380 migrator.notifyStatusChange(); 381 } 382 383 protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) { 384 kv.put(id, state); 385 kv.put(id + STEP, (String) null); 386 kv.put(id + START_TIME, (String) null); 387 progressReporter.reportProgress(null, -2, -2, false); 388 } 389 390 @Override 391 public void runStep(String id, String step) { 392 Migrator migrator = getMigrator(id); 393 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 394 MigrationStepDescriptor stepDescr = descr.steps.get(step); 395 if (stepDescr == null) { 396 throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id); 397 } 398 399 ProgressReporter progressReporter = new ProgressReporter(id); 400 401 // switch to running 402 atomic(id, kv -> { 403 String state = kv.getString(id); 404 String currentStep = kv.getString(id + STEP); 405 if (state == null && currentStep == null) { 406 state = descr.defaultState; 407 if (!descr.states.containsKey(state)) { 408 throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id); 409 } 410 } else if (state == null) { 411 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 412 } 413 if (!descr.states.containsKey(state)) { 414 throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id); 415 } 416 if (!stepDescr.fromState.equals(state)) { 417 throw new IllegalArgumentException( 418 "Invalid step: " + step + " for migration: " + id + " in state: " + state); 419 } 420 String time = String.valueOf(System.currentTimeMillis()); 421 kv.put(id + STEP, step); 422 kv.put(id + START_TIME, time); 423 progressReporter.reportProgress("", 0, -1, true); 424 kv.put(id, (String) null); 425 }); 426 427 // allow notification of running step 428 migrator.notifyStatusChange(); 429 430 Consumer<MigrationContext> migration = migrationContext -> { 431 Thread.currentThread().setName("Nuxeo-Migrator-" + id); 432 migrator.run(step, migrationContext); 433 }; 434 435 BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> { 436 if (t != null) { 437 log.error("Exception during execution of step: {} for migration: {}", step, id, t); 438 } 439 // after the migrator is finished, change state, except if shutdown is requested or exception 440 String state = t != null || migrationContext.isShutdownRequested() 441 ? stepDescr.fromState 442 : stepDescr.toState; 443 atomic(id, kv -> setState(id, state, progressReporter, kv)); 444 // allow notification of new state 445 migrator.notifyStatusChange(); 446 }; 447 448 executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration)); 449 } 450 451 protected Migrator getMigrator(String id) { 452 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 453 if (descr == null) { 454 throw new IllegalArgumentException("Unknown migration: " + id); 455 } 456 Class<?> klass = descr.klass; 457 if (!Migrator.class.isAssignableFrom(klass)) { 458 throw new RuntimeException( 459 "Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id); 460 } 461 try { 462 return (Migrator) klass.getConstructor().newInstance(); 463 } catch (ReflectiveOperationException e) { 464 throw new RuntimeException(e); 465 } 466 } 467 468 /** 469 * Executes something while setting a lock, retrying a few times if the lock is already set. 470 */ 471 protected void atomic(String id, Consumer<KeyValueStore> consumer) { 472 KeyValueStore kv = getKeyValueStore(); 473 String nodeid = Framework.getProperty(NODE_ID_PROP); 474 for (int i = 0; i < 5; i++) { 475 // the value of the lock is useful for debugging 476 String value = Instant.now() + " node=" + nodeid; 477 if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) { 478 try { 479 consumer.accept(kv); 480 return; 481 } finally { 482 kv.put(id + LOCK, (String) null); 483 } 484 } 485 try { 486 Thread.sleep((long) (RANDOM.nextInt(100) * i)); 487 } catch (InterruptedException e) { 488 Thread.currentThread().interrupt(); 489 throw new RuntimeException(e); 490 } 491 } 492 String currentLock = kv.getString(id + LOCK); 493 throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock); 494 } 495 496}