001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.migration; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static java.util.stream.Collectors.toList; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.time.Instant; 028import java.util.Collection; 029import java.util.List; 030import java.util.Objects; 031import java.util.Random; 032import java.util.concurrent.CopyOnWriteArrayList; 033import java.util.concurrent.SynchronousQueue; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.BiConsumer; 037import java.util.function.Consumer; 038 039import org.apache.commons.io.IOUtils; 040import org.apache.logging.log4j.LogManager; 041import org.apache.logging.log4j.Logger; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.cluster.ClusterService; 044import org.nuxeo.runtime.kv.KeyValueService; 045import org.nuxeo.runtime.kv.KeyValueServiceImpl; 046import org.nuxeo.runtime.kv.KeyValueStore; 047import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor; 048import org.nuxeo.runtime.model.ComponentContext; 049import org.nuxeo.runtime.model.ComponentManager; 050import org.nuxeo.runtime.model.DefaultComponent; 051import org.nuxeo.runtime.model.Descriptor; 052import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 053import org.nuxeo.runtime.pubsub.SerializableMessage; 054 055/** 056 * Implementation for the Migration Service. 057 * <p> 058 * Data about migration status is stored in the "migration" Key/Value Store in the following format: 059 * 060 * <pre> 061 * mymigration:lock write lock, containing debug info about locker; set with a TTL 062 * mymigration the state of the migration, if not running 063 * mymigration:step the step of the migration, if running 064 * mymigration:starttime the migration step start time (milliseconds since epoch) 065 * mymigration:pingtime the migration step last ping time (milliseconds since epoch) 066 * mymigration:message the migration step current message 067 * mymigration:num the migration step current num 068 * mymigration:total the migration step current total 069 * </pre> 070 * 071 * @since 9.3 072 */ 073public class MigrationServiceImpl extends DefaultComponent implements MigrationService { 074 075 private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class); 076 077 public static final String KEYVALUE_STORE_NAME = "migration"; 078 079 public static final String XP_CONFIG = "configuration"; 080 081 public static final String LOCK = ":lock"; 082 083 public static final String STEP = ":step"; 084 085 public static final String START_TIME = ":starttime"; 086 087 public static final String PING_TIME = ":pingtime"; 088 089 public static final String PROGRESS_MESSAGE = ":message"; 090 091 public static final String PROGRESS_NUM = ":num"; 092 093 public static final String PROGRESS_TOTAL = ":total"; 094 095 public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough 096 097 public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval"; 098 099 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 100 101 protected MigrationThreadPoolExecutor executor; 102 103 protected String nodeId; 104 105 protected MigrationInvalidator invalidator; 106 107 public static class MigrationInvalidation implements SerializableMessage { 108 109 private static final long serialVersionUID = 1L; 110 111 public final String id; 112 113 public MigrationInvalidation(String id) { 114 this.id = id; 115 } 116 117 @Override 118 public void serialize(OutputStream out) throws IOException { 119 IOUtils.write(id, out, UTF_8); 120 } 121 122 public static MigrationInvalidation deserialize(InputStream in) throws IOException { 123 String id = IOUtils.toString(in, UTF_8); 124 return new MigrationInvalidation(id); 125 } 126 127 @Override 128 public String toString() { 129 return getClass().getSimpleName() + "(" + id + ")"; 130 } 131 } 132 133 public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> { 134 135 @Override 136 public MigrationInvalidation deserialize(InputStream in) throws IOException { 137 return MigrationInvalidation.deserialize(in); 138 } 139 140 @Override 141 public void receivedMessage(MigrationInvalidation message) { 142 String id = message.id; 143 Migrator migrator = getMigrator(id); 144 if (migrator == null) { 145 log.error("Unknown migration id received in invalidation: {}", id); 146 return; 147 } 148 // don't send again invalidation, call sub migrator directly 149 ((InvalidatorMigrator) migrator).migrator.notifyStatusChange(); 150 } 151 } 152 153 /** 154 * A {@link Migrator migrator} wrapper to send invalidations to other nodes when calling 155 * {@link #notifyStatusChange()}. 156 * 157 * @since 10.10 158 */ 159 public static class InvalidatorMigrator implements Migrator { 160 161 protected final String id; 162 163 protected final Migrator migrator; 164 165 protected final MigrationInvalidator invalidator; 166 167 public InvalidatorMigrator(String id, Migrator migrator, MigrationInvalidator invalidator) { 168 this.id = id; 169 this.migrator = migrator; 170 this.invalidator = invalidator; 171 } 172 173 @Override 174 public String probeState() { 175 return migrator.probeState(); 176 } 177 178 @Override 179 public void run(String step, MigrationContext migrationContext) { 180 migrator.run(step, migrationContext); 181 } 182 183 @Override 184 public void notifyStatusChange() { 185 migrator.notifyStatusChange(); 186 invalidator.sendMessage(new MigrationInvalidation(id)); 187 } 188 } 189 190 protected static KeyValueStore getKeyValueStore() { 191 KeyValueService service = Framework.getService(KeyValueService.class); 192 Objects.requireNonNull(service, "Missing KeyValueService"); 193 return service.getKeyValueStore(KEYVALUE_STORE_NAME); 194 } 195 196 public Collection<MigrationDescriptor> getMigrationDescriptors() { 197 return getDescriptors(XP_CONFIG); 198 } 199 200 @Override 201 public int getApplicationStartedOrder() { 202 return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10; 203 } 204 205 /** 206 * Progress reporter that reports progress in the key/value store. 207 * 208 * @since 9.3 209 */ 210 protected static class ProgressReporter { 211 212 protected final String id; 213 214 public ProgressReporter(String id) { 215 this.id = id; 216 } 217 218 /** 219 * Reports progress. If num or total are -2 then null is used. 220 */ 221 public void reportProgress(String message, long num, long total, boolean ping) { 222 KeyValueStore keyValueStore = getKeyValueStore(); 223 keyValueStore.put(id + PROGRESS_MESSAGE, message); 224 keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num)); 225 keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total)); 226 keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null); 227 } 228 } 229 230 /** 231 * Migration context implementation that reports progress in the key/value store and can be shutdown. 232 * 233 * @since 9.3 234 */ 235 protected static class MigrationContextImpl implements MigrationContext { 236 237 protected final ProgressReporter progressReporter; 238 239 protected volatile boolean shutdown; 240 241 public MigrationContextImpl(ProgressReporter progressReporter) { 242 this.progressReporter = progressReporter; 243 } 244 245 @Override 246 public void reportProgress(String message, long num, long total) { 247 progressReporter.reportProgress(message, num, total, true); 248 } 249 250 @Override 251 public void requestShutdown() { 252 shutdown = true; 253 } 254 255 @Override 256 public boolean isShutdownRequested() { 257 return shutdown || Thread.currentThread().isInterrupted(); 258 } 259 } 260 261 /** 262 * Runnable for the migrator, that knows about the migration context. 263 * 264 * @since 9.3 265 */ 266 protected static class MigratorWithContext implements Runnable { 267 268 protected final Consumer<MigrationContext> migration; 269 270 protected final MigrationContext migrationContext; 271 272 protected final BiConsumer<MigrationContext, Throwable> afterMigration; 273 274 public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter, 275 BiConsumer<MigrationContext, Throwable> afterMigration) { 276 this.migration = migration; 277 this.migrationContext = new MigrationContextImpl(progressReporter); 278 this.afterMigration = afterMigration; 279 } 280 281 @Override 282 public void run() { 283 migration.accept(migrationContext); 284 } 285 286 public void afterMigration(Throwable t) { 287 afterMigration.accept(migrationContext, t); 288 } 289 290 public void requestShutdown() { 291 migrationContext.requestShutdown(); 292 } 293 } 294 295 /** 296 * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them. 297 * 298 * @since 9.3 299 */ 300 protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor { 301 302 protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>(); 303 304 public MigrationThreadPoolExecutor() { 305 // like Executors.newCachedThreadPool but with keepAliveTime of 0 306 super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 307 } 308 309 @Override 310 protected void beforeExecute(Thread thread, Runnable runnable) { 311 runnables.add((MigratorWithContext) runnable); 312 } 313 314 @Override 315 protected void afterExecute(Runnable runnable, Throwable t) { 316 runnables.remove(runnable); 317 ((MigratorWithContext) runnable).afterMigration(t); 318 } 319 320 public void requestShutdown() { 321 runnables.forEach(MigratorWithContext::requestShutdown); 322 } 323 } 324 325 @Override 326 public void start(ComponentContext context) { 327 super.start(context); 328 ClusterService clusterService = Framework.getService(ClusterService.class); 329 if (clusterService.isEnabled()) { 330 // register invalidator 331 nodeId = clusterService.getNodeId(); 332 invalidator = new MigrationInvalidator(); 333 invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId); 334 log.info("Registered migration invalidator for node: {}", nodeId); 335 } else { 336 log.info("Not registering a migration invalidator because clustering is not enabled"); 337 } 338 339 executor = new MigrationThreadPoolExecutor(); 340 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 341 342 @Override 343 public void beforeStop(ComponentManager mgr, boolean isStandby) { 344 // flag all migration threads as shutdown requested, without interrupting them 345 executor.requestShutdown(); 346 } 347 348 @Override 349 public void afterStop(ComponentManager mgr, boolean isStandby) { 350 Framework.getRuntime().getComponentManager().removeListener(this); 351 } 352 }); 353 } 354 355 @Override 356 public void stop(ComponentContext context) throws InterruptedException { 357 if (invalidator != null) { 358 invalidator.close(); 359 invalidator = null; 360 } 361 // interrupt all migration tasks 362 executor.shutdownNow(); 363 executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination 364 executor = null; 365 super.stop(context); 366 } 367 368 @Override 369 public MigrationStatus getStatus(String id) { 370 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 371 if (descr == null) { 372 return null; // migration unknown 373 } 374 KeyValueStore kv = getKeyValueStore(); 375 String state = kv.getString(id); 376 if (state != null) { 377 return new MigrationStatus(state); 378 } 379 String step = kv.getString(id + STEP); 380 if (step == null) { 381 state = descr.defaultState; 382 return new MigrationStatus(state); 383 } 384 long startTime = Long.parseLong(kv.getString(id + START_TIME)); 385 long pingTime = Long.parseLong(kv.getString(id + PING_TIME)); 386 String progressMessage = kv.getString(id + PROGRESS_MESSAGE); 387 long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM)); 388 long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL)); 389 if (progressMessage == null) { 390 progressMessage = ""; 391 } 392 return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal); 393 } 394 395 @Override 396 public String probeAndSetState(String id) { 397 Migrator migrator = getMigrator(id); 398 String state = migrator.probeState(); 399 if (state != null) { 400 ProgressReporter progressReporter = new ProgressReporter(id); 401 setState(id, state, migrator, progressReporter); 402 } 403 return state; 404 } 405 406 protected void setState(String id, String state, Migrator migrator, ProgressReporter progressReporter) { 407 atomic(id, kv -> { 408 String currentState = kv.getString(id); 409 String currentStep = kv.getString(id + STEP); 410 if (currentState == null && currentStep != null) { 411 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 412 } 413 setState(id, state, progressReporter, kv); 414 }); 415 migrator.notifyStatusChange(); 416 } 417 418 protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) { 419 kv.put(id, state); 420 kv.put(id + STEP, (String) null); 421 kv.put(id + START_TIME, (String) null); 422 progressReporter.reportProgress(null, -2, -2, false); 423 } 424 425 @Override 426 public void runStep(String id, String step) { 427 Migrator migrator = getMigrator(id); 428 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 429 MigrationStepDescriptor stepDescr = descr.steps.get(step); 430 if (stepDescr == null) { 431 throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id); 432 } 433 434 ProgressReporter progressReporter = new ProgressReporter(id); 435 436 // switch to running 437 atomic(id, kv -> { 438 String state = kv.getString(id); 439 String currentStep = kv.getString(id + STEP); 440 if (state == null && currentStep == null) { 441 state = descr.defaultState; 442 if (!descr.states.containsKey(state)) { 443 throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id); 444 } 445 } else if (state == null) { 446 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 447 } 448 if (!descr.states.containsKey(state)) { 449 throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id); 450 } 451 if (!stepDescr.fromState.equals(state)) { 452 throw new IllegalArgumentException( 453 "Invalid step: " + step + " for migration: " + id + " in state: " + state); 454 } 455 String time = String.valueOf(System.currentTimeMillis()); 456 kv.put(id + STEP, step); 457 kv.put(id + START_TIME, time); 458 progressReporter.reportProgress("", 0, -1, true); 459 kv.put(id, (String) null); 460 }); 461 462 // allow notification of running step 463 migrator.notifyStatusChange(); 464 465 Consumer<MigrationContext> migration = migrationContext -> { 466 Thread.currentThread().setName("Nuxeo-Migrator-" + id); 467 migrator.run(step, migrationContext); 468 }; 469 470 BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> { 471 if (t != null) { 472 log.error("Exception during execution of step: {} for migration: {}", step, id, t); 473 } 474 // after the migrator is finished, change state, except if shutdown is requested or exception 475 String state = t != null || migrationContext.isShutdownRequested() ? stepDescr.fromState 476 : stepDescr.toState; 477 atomic(id, kv -> setState(id, state, progressReporter, kv)); 478 // allow notification of new state 479 migrator.notifyStatusChange(); 480 }; 481 482 executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration)); 483 } 484 485 protected Migrator getMigrator(String id) { 486 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 487 if (descr == null) { 488 throw new IllegalArgumentException("Unknown migration: " + id); 489 } 490 Class<?> klass = descr.klass; 491 if (!Migrator.class.isAssignableFrom(klass)) { 492 throw new RuntimeException( 493 "Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id); 494 } 495 try { 496 Migrator migrator = (Migrator) klass.getConstructor().newInstance(); 497 if (invalidator != null) { 498 migrator = new InvalidatorMigrator(id, migrator, invalidator); 499 } 500 return migrator; 501 } catch (ReflectiveOperationException e) { 502 throw new RuntimeException(e); 503 } 504 } 505 506 /** 507 * Executes something while setting a lock, retrying a few times if the lock is already set. 508 */ 509 protected void atomic(String id, Consumer<KeyValueStore> consumer) { 510 KeyValueStore kv = getKeyValueStore(); 511 for (int i = 0; i < 5; i++) { 512 // the value of the lock is useful for debugging 513 String value = Instant.now() + " node=" + nodeId; 514 if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) { 515 try { 516 consumer.accept(kv); 517 return; 518 } finally { 519 kv.put(id + LOCK, (String) null); 520 } 521 } 522 try { 523 Thread.sleep(RANDOM.nextInt(100) * i); 524 } catch (InterruptedException e) { 525 Thread.currentThread().interrupt(); 526 throw new RuntimeException(e); 527 } 528 } 529 String currentLock = kv.getString(id + LOCK); 530 throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock); 531 } 532 533 @Override 534 public Migration getMigration(String id) { 535 MigrationDescriptor descriptor = getDescriptor(XP_CONFIG, id); 536 MigrationStatus status = getStatus(id); 537 if (descriptor == null || status == null) { 538 return null; 539 } 540 return Migration.from(descriptor, status); 541 } 542 543 @Override 544 public List<Migration> getMigrations() { 545 return getDescriptors(XP_CONFIG).stream().map(Descriptor::getId).map(this::getMigration).collect(toList()); 546 } 547 548 /** 549 * @implNote Runs a migration if it has exactly one available step to run. 550 */ 551 @Override 552 public void probeAndRun(String id) { 553 probeAndSetState(id); 554 MigrationStatus status = getStatus(id); 555 var steps = Migration.from(getDescriptor(XP_CONFIG, id), status).getSteps(); 556 if (steps.size() != 1) { 557 throw new IllegalArgumentException(String.format( 558 "Migration: %s must have only one runnable step from state: %s", id, status.getState())); // NOSONAR 559 } 560 runStep(id, steps.get(0).getId()); 561 } 562 563}