001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.migration; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.time.Instant; 027import java.util.Collection; 028import java.util.List; 029import java.util.Objects; 030import java.util.Random; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.SynchronousQueue; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.function.BiConsumer; 036import java.util.function.Consumer; 037 038import org.apache.commons.io.IOUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.logging.log4j.LogManager; 041import org.apache.logging.log4j.Logger; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.kv.KeyValueService; 044import org.nuxeo.runtime.kv.KeyValueServiceImpl; 045import org.nuxeo.runtime.kv.KeyValueStore; 046import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor; 047import org.nuxeo.runtime.model.ComponentContext; 048import org.nuxeo.runtime.model.ComponentManager; 049import org.nuxeo.runtime.model.DefaultComponent; 050import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 051import org.nuxeo.runtime.pubsub.SerializableMessage; 052 053/** 054 * Implementation for the Migration Service. 055 * <p> 056 * Data about migration status is stored in the "migration" Key/Value Store in the following format: 057 * 058 * <pre> 059 * mymigration:lock write lock, containing debug info about locker; set with a TTL 060 * mymigration the state of the migration, if not running 061 * mymigration:step the step of the migration, if running 062 * mymigration:starttime the migration step start time (milliseconds since epoch) 063 * mymigration:pingtime the migration step last ping time (milliseconds since epoch) 064 * mymigration:message the migration step current message 065 * mymigration:num the migration step current num 066 * mymigration:total the migration step current total 067 * </pre> 068 * 069 * @since 9.3 070 */ 071public class MigrationServiceImpl extends DefaultComponent implements MigrationService { 072 073 private static final Logger log = LogManager.getLogger(MigrationServiceImpl.class); 074 075 public static final String KEYVALUE_STORE_NAME = "migration"; 076 077 public static final String XP_CONFIG = "configuration"; 078 079 public static final String LOCK = ":lock"; 080 081 public static final String STEP = ":step"; 082 083 public static final String START_TIME = ":starttime"; 084 085 public static final String PING_TIME = ":pingtime"; 086 087 public static final String PROGRESS_MESSAGE = ":message"; 088 089 public static final String PROGRESS_NUM = ":num"; 090 091 public static final String PROGRESS_TOTAL = ":total"; 092 093 public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough 094 095 public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval"; 096 097 public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled"; 098 099 public static final String NODE_ID_PROP = "repository.clustering.id"; 100 101 protected static final Random RANDOM = new Random(); // NOSONAR (doesn't need cryptographic strength) 102 103 protected MigrationThreadPoolExecutor executor; 104 105 protected MigrationInvalidator invalidator; 106 107 public static class MigrationInvalidation implements SerializableMessage { 108 109 private static final long serialVersionUID = 1L; 110 111 public final String id; 112 113 public MigrationInvalidation(String id) { 114 this.id = id; 115 } 116 117 @Override 118 public void serialize(OutputStream out) throws IOException { 119 IOUtils.write(id, out, UTF_8); 120 } 121 122 public static MigrationInvalidation deserialize(InputStream in) throws IOException { 123 String id = IOUtils.toString(in, UTF_8); 124 return new MigrationInvalidation(id); 125 } 126 127 @Override 128 public String toString() { 129 return getClass().getSimpleName() + "(" + id + ")"; 130 } 131 } 132 133 public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> { 134 135 @Override 136 public MigrationInvalidation deserialize(InputStream in) throws IOException { 137 return MigrationInvalidation.deserialize(in); 138 } 139 140 @Override 141 public void receivedMessage(MigrationInvalidation message) { 142 String id = message.id; 143 Migrator migrator = getMigrator(id); 144 if (migrator == null) { 145 log.error("Unknown migration id received in invalidation: {}", id); 146 return; 147 } 148 // don't send again invalidation, call sub migrator directly 149 ((InvalidatorMigrator) migrator).migrator.notifyStatusChange(); 150 } 151 } 152 153 /** 154 * A {@link Migrator migrator} wrapper to send invalidations to other nodes when calling 155 * {@link #notifyStatusChange()}. 156 * 157 * @since 10.10 158 */ 159 public static class InvalidatorMigrator implements Migrator { 160 161 protected final String id; 162 163 protected final Migrator migrator; 164 165 protected final MigrationInvalidator invalidator; 166 167 public InvalidatorMigrator(String id, Migrator migrator, MigrationInvalidator invalidator) { 168 this.id = id; 169 this.migrator = migrator; 170 this.invalidator = invalidator; 171 } 172 173 @Override 174 public String probeState() { 175 return migrator.probeState(); 176 } 177 178 @Override 179 public void run(String step, MigrationContext migrationContext) { 180 migrator.run(step, migrationContext); 181 } 182 183 @Override 184 public void notifyStatusChange() { 185 migrator.notifyStatusChange(); 186 invalidator.sendMessage(new MigrationInvalidation(id)); 187 } 188 } 189 190 protected static KeyValueStore getKeyValueStore() { 191 KeyValueService service = Framework.getService(KeyValueService.class); 192 Objects.requireNonNull(service, "Missing KeyValueService"); 193 return service.getKeyValueStore(KEYVALUE_STORE_NAME); 194 } 195 196 public Collection<MigrationDescriptor> getMigrationDescriptors() { 197 return getDescriptors(XP_CONFIG); 198 } 199 200 @Override 201 public int getApplicationStartedOrder() { 202 return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10; 203 } 204 205 /** 206 * Progress reporter that reports progress in the key/value store. 207 * 208 * @since 9.3 209 */ 210 protected static class ProgressReporter { 211 212 protected final String id; 213 214 public ProgressReporter(String id) { 215 this.id = id; 216 } 217 218 /** 219 * Reports progress. If num or total are -2 then null is used. 220 */ 221 public void reportProgress(String message, long num, long total, boolean ping) { 222 KeyValueStore keyValueStore = getKeyValueStore(); 223 keyValueStore.put(id + PROGRESS_MESSAGE, message); 224 keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num)); 225 keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total)); 226 keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null); 227 } 228 } 229 230 /** 231 * Migration context implementation that reports progress in the key/value store and can be shutdown. 232 * 233 * @since 9.3 234 */ 235 protected static class MigrationContextImpl implements MigrationContext { 236 237 protected final ProgressReporter progressReporter; 238 239 protected volatile boolean shutdown; 240 241 public MigrationContextImpl(ProgressReporter progressReporter) { 242 this.progressReporter = progressReporter; 243 } 244 245 @Override 246 public void reportProgress(String message, long num, long total) { 247 progressReporter.reportProgress(message, num, total, true); 248 } 249 250 @Override 251 public void requestShutdown() { 252 shutdown = true; 253 } 254 255 @Override 256 public boolean isShutdownRequested() { 257 return shutdown || Thread.currentThread().isInterrupted(); 258 } 259 } 260 261 /** 262 * Runnable for the migrator, that knows about the migration context. 263 * 264 * @since 9.3 265 */ 266 protected static class MigratorWithContext implements Runnable { 267 268 protected final Consumer<MigrationContext> migration; 269 270 protected final MigrationContext migrationContext; 271 272 protected final BiConsumer<MigrationContext, Throwable> afterMigration; 273 274 public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter, 275 BiConsumer<MigrationContext, Throwable> afterMigration) { 276 this.migration = migration; 277 this.migrationContext = new MigrationContextImpl(progressReporter); 278 this.afterMigration = afterMigration; 279 } 280 281 @Override 282 public void run() { 283 migration.accept(migrationContext); 284 } 285 286 public void afterMigration(Throwable t) { 287 afterMigration.accept(migrationContext, t); 288 } 289 290 public void requestShutdown() { 291 migrationContext.requestShutdown(); 292 } 293 } 294 295 /** 296 * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them. 297 * 298 * @since 9.3 299 */ 300 protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor { 301 302 protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>(); 303 304 public MigrationThreadPoolExecutor() { 305 // like Executors.newCachedThreadPool but with keepAliveTime of 0 306 super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 307 } 308 309 @Override 310 protected void beforeExecute(Thread thread, Runnable runnable) { 311 runnables.add((MigratorWithContext) runnable); 312 } 313 314 @Override 315 protected void afterExecute(Runnable runnable, Throwable t) { 316 runnables.remove(runnable); 317 ((MigratorWithContext) runnable).afterMigration(t); 318 } 319 320 public void requestShutdown() { 321 runnables.forEach(MigratorWithContext::requestShutdown); 322 } 323 } 324 325 @Override 326 public void start(ComponentContext context) { 327 super.start(context); 328 if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) { 329 // register migration invalidator 330 String nodeId = Framework.getProperty(NODE_ID_PROP); 331 if (StringUtils.isBlank(nodeId)) { 332 nodeId = String.valueOf(RANDOM.nextLong()); 333 log.warn("Missing cluster node id configuration, please define it explicitly " 334 + "(usually through repository.clustering.id). Using random cluster node id instead: " 335 + nodeId); 336 } else { 337 nodeId = nodeId.trim(); 338 } 339 invalidator = new MigrationInvalidator(); 340 invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId); 341 log.info("Registered migration invalidator for node: {}", nodeId); 342 } else { 343 log.info("Not registering a migration invalidator because clustering is not enabled"); 344 } 345 346 executor = new MigrationThreadPoolExecutor(); 347 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 348 349 @Override 350 public void beforeStop(ComponentManager mgr, boolean isStandby) { 351 // flag all migration threads as shutdown requested, without interrupting them 352 executor.requestShutdown(); 353 } 354 355 @Override 356 public void afterStop(ComponentManager mgr, boolean isStandby) { 357 Framework.getRuntime().getComponentManager().removeListener(this); 358 } 359 }); 360 } 361 362 @Override 363 public void stop(ComponentContext context) throws InterruptedException { 364 if (invalidator != null) { 365 invalidator.close(); 366 invalidator = null; 367 } 368 // interrupt all migration tasks 369 executor.shutdownNow(); 370 executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination 371 executor = null; 372 super.stop(context); 373 } 374 375 @Override 376 public MigrationStatus getStatus(String id) { 377 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 378 if (descr == null) { 379 return null; // migration unknown 380 } 381 KeyValueStore kv = getKeyValueStore(); 382 String state = kv.getString(id); 383 if (state != null) { 384 return new MigrationStatus(state); 385 } 386 String step = kv.getString(id + STEP); 387 if (step == null) { 388 state = descr.defaultState; 389 return new MigrationStatus(state); 390 } 391 long startTime = Long.parseLong(kv.getString(id + START_TIME)); 392 long pingTime = Long.parseLong(kv.getString(id + PING_TIME)); 393 String progressMessage = kv.getString(id + PROGRESS_MESSAGE); 394 long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM)); 395 long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL)); 396 if (progressMessage == null) { 397 progressMessage = ""; 398 } 399 return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal); 400 } 401 402 @Override 403 public String probeAndSetState(String id) { 404 Migrator migrator = getMigrator(id); 405 String state = migrator.probeState(); 406 if (state != null) { 407 ProgressReporter progressReporter = new ProgressReporter(id); 408 setState(id, state, migrator, progressReporter); 409 } 410 return state; 411 } 412 413 protected void setState(String id, String state, Migrator migrator, ProgressReporter progressReporter) { 414 atomic(id, kv -> { 415 String currentState = kv.getString(id); 416 String currentStep = kv.getString(id + STEP); 417 if (currentState == null && currentStep != null) { 418 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 419 } 420 setState(id, state, progressReporter, kv); 421 }); 422 migrator.notifyStatusChange(); 423 } 424 425 protected void setState(String id, String state, ProgressReporter progressReporter, KeyValueStore kv) { 426 kv.put(id, state); 427 kv.put(id + STEP, (String) null); 428 kv.put(id + START_TIME, (String) null); 429 progressReporter.reportProgress(null, -2, -2, false); 430 } 431 432 @Override 433 public void runStep(String id, String step) { 434 Migrator migrator = getMigrator(id); 435 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 436 MigrationStepDescriptor stepDescr = descr.steps.get(step); 437 if (stepDescr == null) { 438 throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id); 439 } 440 441 ProgressReporter progressReporter = new ProgressReporter(id); 442 443 // switch to running 444 atomic(id, kv -> { 445 String state = kv.getString(id); 446 String currentStep = kv.getString(id + STEP); 447 if (state == null && currentStep == null) { 448 state = descr.defaultState; 449 if (!descr.states.containsKey(state)) { 450 throw new IllegalArgumentException("Invalid default state: " + state + " for migration: " + id); 451 } 452 } else if (state == null) { 453 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 454 } 455 if (!descr.states.containsKey(state)) { 456 throw new IllegalArgumentException("Invalid current state: " + state + " for migration: " + id); 457 } 458 if (!stepDescr.fromState.equals(state)) { 459 throw new IllegalArgumentException( 460 "Invalid step: " + step + " for migration: " + id + " in state: " + state); 461 } 462 String time = String.valueOf(System.currentTimeMillis()); 463 kv.put(id + STEP, step); 464 kv.put(id + START_TIME, time); 465 progressReporter.reportProgress("", 0, -1, true); 466 kv.put(id, (String) null); 467 }); 468 469 // allow notification of running step 470 migrator.notifyStatusChange(); 471 472 Consumer<MigrationContext> migration = migrationContext -> { 473 Thread.currentThread().setName("Nuxeo-Migrator-" + id); 474 migrator.run(step, migrationContext); 475 }; 476 477 BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> { 478 if (t != null) { 479 log.error("Exception during execution of step: {} for migration: {}", step, id, t); 480 } 481 // after the migrator is finished, change state, except if shutdown is requested or exception 482 String state = t != null || migrationContext.isShutdownRequested() 483 ? stepDescr.fromState 484 : stepDescr.toState; 485 atomic(id, kv -> setState(id, state, progressReporter, kv)); 486 // allow notification of new state 487 migrator.notifyStatusChange(); 488 }; 489 490 executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration)); 491 } 492 493 protected Migrator getMigrator(String id) { 494 MigrationDescriptor descr = getDescriptor(XP_CONFIG, id); 495 if (descr == null) { 496 throw new IllegalArgumentException("Unknown migration: " + id); 497 } 498 Class<?> klass = descr.klass; 499 if (!Migrator.class.isAssignableFrom(klass)) { 500 throw new RuntimeException( 501 "Invalid class not implementing Migrator: " + klass.getName() + " for migration: " + id); 502 } 503 try { 504 Migrator migrator = (Migrator) klass.getConstructor().newInstance(); 505 if (invalidator != null) { 506 migrator = new InvalidatorMigrator(id, migrator, invalidator); 507 } 508 return migrator; 509 } catch (ReflectiveOperationException e) { 510 throw new RuntimeException(e); 511 } 512 } 513 514 /** 515 * Executes something while setting a lock, retrying a few times if the lock is already set. 516 */ 517 protected void atomic(String id, Consumer<KeyValueStore> consumer) { 518 KeyValueStore kv = getKeyValueStore(); 519 String nodeid = Framework.getProperty(NODE_ID_PROP); 520 for (int i = 0; i < 5; i++) { 521 // the value of the lock is useful for debugging 522 String value = Instant.now() + " node=" + nodeid; 523 if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) { 524 try { 525 consumer.accept(kv); 526 return; 527 } finally { 528 kv.put(id + LOCK, (String) null); 529 } 530 } 531 try { 532 Thread.sleep((long) (RANDOM.nextInt(100) * i)); 533 } catch (InterruptedException e) { 534 Thread.currentThread().interrupt(); 535 throw new RuntimeException(e); 536 } 537 } 538 String currentLock = kv.getString(id + LOCK); 539 throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock); 540 } 541 542}