001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.migration; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.time.Instant; 027import java.util.List; 028import java.util.Map; 029import java.util.Objects; 030import java.util.Random; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.SynchronousQueue; 033import java.util.concurrent.ThreadPoolExecutor; 034import java.util.concurrent.TimeUnit; 035import java.util.function.BiConsumer; 036import java.util.function.Consumer; 037 038import org.apache.commons.io.IOUtils; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.nuxeo.runtime.api.Framework; 043import org.nuxeo.runtime.kv.KeyValueService; 044import org.nuxeo.runtime.kv.KeyValueServiceImpl; 045import org.nuxeo.runtime.kv.KeyValueStore; 046import org.nuxeo.runtime.migration.MigrationDescriptor.MigrationStepDescriptor; 047import org.nuxeo.runtime.model.ComponentContext; 048import org.nuxeo.runtime.model.ComponentInstance; 049import org.nuxeo.runtime.model.ComponentManager; 050import org.nuxeo.runtime.model.DefaultComponent; 051import org.nuxeo.runtime.model.SimpleContributionRegistry; 052import org.nuxeo.runtime.pubsub.AbstractPubSubBroker; 053import org.nuxeo.runtime.pubsub.SerializableMessage; 054 055/** 056 * Implementation for the Migration Service. 057 * <p> 058 * Data about migration status is stored in the "migration" Key/Value Store in the following format: 059 * 060 * <pre> 061 * mymigration:lock write lock, containing debug info about locker; set with a TTL 062 * mymigration the state of the migration, if not running 063 * mymigration:step the step of the migration, if running 064 * mymigration:starttime the migration step start time (milliseconds since epoch) 065 * mymigration:pingtime the migration step last ping time (milliseconds since epoch) 066 * mymigration:message the migration step current message 067 * mymigration:num the migration step current num 068 * mymigration:total the migration step current total 069 * </pre> 070 * 071 * @since 9.3 072 */ 073public class MigrationServiceImpl extends DefaultComponent implements MigrationService { 074 075 // package-private to avoid synthetic accessor for access from nested class 076 static final Log log = LogFactory.getLog(MigrationServiceImpl.class); 077 078 public static final String KEYVALUE_STORE_NAME = "migration"; 079 080 public static final String CONFIG_XP = "configuration"; 081 082 public static final String LOCK = ":lock"; 083 084 public static final String STEP = ":step"; 085 086 public static final String START_TIME = ":starttime"; 087 088 public static final String PING_TIME = ":pingtime"; 089 090 public static final String PROGRESS_MESSAGE = ":message"; 091 092 public static final String PROGRESS_NUM = ":num"; 093 094 public static final String PROGRESS_TOTAL = ":total"; 095 096 public static final long WRITE_LOCK_TTL = 10; // 10 sec for a few k/v writes is plenty enough 097 098 public static final String MIGRATION_INVAL_PUBSUB_TOPIC = "migrationinval"; 099 100 public static final String CLUSTERING_ENABLED_PROP = "repository.clustering.enabled"; 101 102 public static final String NODE_ID_PROP = "repository.clustering.id"; 103 104 protected static final Random RANDOM = new Random(); 105 106 protected final MigrationRegistry registry = new MigrationRegistry(); 107 108 protected MigrationThreadPoolExecutor executor; 109 110 protected MigrationInvalidator invalidator; 111 112 public static class MigrationRegistry extends SimpleContributionRegistry<MigrationDescriptor> { 113 114 @Override 115 public String getContributionId(MigrationDescriptor contrib) { 116 return contrib.id; 117 } 118 119 public MigrationDescriptor getMigrationDescriptor(String id) { 120 return getCurrentContribution(id); 121 } 122 123 public Map<String, MigrationDescriptor> getMigrationDescriptors() { 124 return currentContribs; 125 } 126 127 @Override 128 public boolean isSupportingMerge() { 129 return true; 130 } 131 132 @Override 133 public MigrationDescriptor clone(MigrationDescriptor orig) { 134 return new MigrationDescriptor(orig); 135 } 136 137 @Override 138 public void merge(MigrationDescriptor src, MigrationDescriptor dst) { 139 dst.merge(src); 140 } 141 } 142 143 public static class MigrationInvalidation implements SerializableMessage { 144 145 private static final long serialVersionUID = 1L; 146 147 public final String id; 148 149 public MigrationInvalidation(String id) { 150 this.id = id; 151 } 152 153 @Override 154 public void serialize(OutputStream out) throws IOException { 155 IOUtils.write(id, out, UTF_8); 156 } 157 158 public static MigrationInvalidation deserialize(InputStream in) throws IOException { 159 String id = IOUtils.toString(in, UTF_8); 160 return new MigrationInvalidation(id); 161 } 162 163 @Override 164 public String toString() { 165 return getClass().getSimpleName() + "(" + id + ")"; 166 } 167 } 168 169 public class MigrationInvalidator extends AbstractPubSubBroker<MigrationInvalidation> { 170 171 @Override 172 public MigrationInvalidation deserialize(InputStream in) throws IOException { 173 return MigrationInvalidation.deserialize(in); 174 } 175 176 @Override 177 public void receivedMessage(MigrationInvalidation message) { 178 String id = message.id; 179 StatusChangeNotifier notifier = getStatusChangeNotifier(id); 180 if (notifier == null) { 181 log.error("Unknown migration id received in invalidation: " + id); 182 return; 183 } 184 notifier.notifyStatusChange(); 185 } 186 } 187 188 protected static KeyValueStore getKeyValueStore() { 189 KeyValueService service = Framework.getService(KeyValueService.class); 190 Objects.requireNonNull(service, "Missing KeyValueService"); 191 return service.getKeyValueStore(KEYVALUE_STORE_NAME); 192 } 193 194 @Override 195 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 196 switch (extensionPoint) { 197 case CONFIG_XP: 198 registerMigrationDescriptor((MigrationDescriptor) contribution); 199 break; 200 default: 201 throw new RuntimeException("Unknown extension point: " + extensionPoint); 202 } 203 } 204 205 @Override 206 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 207 switch (extensionPoint) { 208 case CONFIG_XP: 209 unregisterMigrationDescriptor((MigrationDescriptor) contribution); 210 break; 211 } 212 } 213 214 public void registerMigrationDescriptor(MigrationDescriptor descriptor) { 215 registry.addContribution(descriptor); 216 log.info("Registered migration: " + descriptor.id); 217 } 218 219 public void unregisterMigrationDescriptor(MigrationDescriptor descriptor) { 220 registry.removeContribution(descriptor); 221 log.info("Unregistered migration: " + descriptor.id); 222 } 223 224 public Map<String, MigrationDescriptor> getMigrationDescriptors() { 225 return registry.getMigrationDescriptors(); 226 } 227 228 @Override 229 public int getApplicationStartedOrder() { 230 return KeyValueServiceImpl.APPLICATION_STARTED_ORDER + 10; 231 } 232 233 /** 234 * Progress reporter that reports progress in the key/value store. 235 * 236 * @since 9.3 237 */ 238 protected static class ProgressReporter { 239 240 protected final String id; 241 242 public ProgressReporter(String id) { 243 this.id = id; 244 } 245 246 /** 247 * Reports progress. If num or total are -2 then null is used. 248 */ 249 public void reportProgress(String message, long num, long total, boolean ping) { 250 KeyValueStore keyValueStore = getKeyValueStore(); 251 keyValueStore.put(id + PROGRESS_MESSAGE, message); 252 keyValueStore.put(id + PROGRESS_NUM, num == -2 ? null : String.valueOf(num)); 253 keyValueStore.put(id + PROGRESS_TOTAL, total == -2 ? null : String.valueOf(total)); 254 keyValueStore.put(id + PING_TIME, ping ? String.valueOf(System.currentTimeMillis()) : null); 255 } 256 } 257 258 /** 259 * Migration context implementation that reports progress in the key/value store and can be shutdown. 260 * 261 * @since 9.3 262 */ 263 protected static class MigrationContextImpl implements MigrationContext { 264 265 protected final ProgressReporter progressReporter; 266 267 protected volatile boolean shutdown; 268 269 public MigrationContextImpl(ProgressReporter progressReporter) { 270 this.progressReporter = progressReporter; 271 } 272 273 @Override 274 public void reportProgress(String message, long num, long total) { 275 progressReporter.reportProgress(message, num, total, true); 276 } 277 278 @Override 279 public void requestShutdown() { 280 shutdown = true; 281 } 282 283 @Override 284 public boolean isShutdownRequested() { 285 return shutdown || Thread.currentThread().isInterrupted(); 286 } 287 } 288 289 /** 290 * Runnable for the migrator, that knows about the migration context. 291 * 292 * @since 9.3 293 */ 294 protected static class MigratorWithContext implements Runnable { 295 296 protected final Consumer<MigrationContext> migration; 297 298 protected final MigrationContext migrationContext; 299 300 protected final BiConsumer<MigrationContext, Throwable> afterMigration; 301 302 public MigratorWithContext(Consumer<MigrationContext> migration, ProgressReporter progressReporter, 303 BiConsumer<MigrationContext, Throwable> afterMigration) { 304 this.migration = migration; 305 this.migrationContext = new MigrationContextImpl(progressReporter); 306 this.afterMigration = afterMigration; 307 } 308 309 @Override 310 public void run() { 311 migration.accept(migrationContext); 312 } 313 314 public void afterMigration(Throwable t) { 315 afterMigration.accept(migrationContext, t); 316 } 317 318 public void requestShutdown() { 319 migrationContext.requestShutdown(); 320 } 321 } 322 323 /** 324 * Thread pool executor that records {@link Runnable}s to be able to request shutdown on them. 325 * 326 * @since 9.3 327 */ 328 protected static class MigrationThreadPoolExecutor extends ThreadPoolExecutor { 329 330 protected final List<MigratorWithContext> runnables = new CopyOnWriteArrayList<>(); 331 332 public MigrationThreadPoolExecutor() { 333 // like Executors.newCachedThreadPool but with keepAliveTime of 0 334 super(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 335 } 336 337 @Override 338 protected void beforeExecute(Thread thread, Runnable runnable) { 339 runnables.add((MigratorWithContext) runnable); 340 } 341 342 @Override 343 protected void afterExecute(Runnable runnable, Throwable t) { 344 runnables.remove(runnable); 345 ((MigratorWithContext) runnable).afterMigration(t); 346 } 347 348 public void requestShutdown() { 349 runnables.forEach(migratorWithContext -> migratorWithContext.requestShutdown()); 350 } 351 } 352 353 @Override 354 public void start(ComponentContext context) { 355 if (Framework.isBooleanPropertyTrue(CLUSTERING_ENABLED_PROP)) { 356 // register migration invalidator 357 String nodeId = Framework.getProperty(NODE_ID_PROP); 358 if (StringUtils.isBlank(nodeId)) { 359 nodeId = String.valueOf(RANDOM.nextLong()); 360 log.warn("Missing cluster node id configuration, please define it explicitly " 361 + "(usually through repository.clustering.id). Using random cluster node id instead: " 362 + nodeId); 363 } else { 364 nodeId = nodeId.trim(); 365 } 366 invalidator = new MigrationInvalidator(); 367 invalidator.initialize(MIGRATION_INVAL_PUBSUB_TOPIC, nodeId); 368 log.info("Registered migration invalidator for node: " + nodeId); 369 } else { 370 log.info("Not registering a migration invalidator because clustering is not enabled"); 371 } 372 373 executor = new MigrationThreadPoolExecutor(); 374 Framework.getRuntime().getComponentManager().addListener(new ComponentManager.Listener() { 375 376 @Override 377 public void beforeStop(ComponentManager mgr, boolean isStandby) { 378 // flag all migration threads as shutdown requested, without interrupting them 379 executor.requestShutdown(); 380 } 381 382 @Override 383 public void afterStop(ComponentManager mgr, boolean isStandby) { 384 Framework.getRuntime().getComponentManager().removeListener(this); 385 } 386 }); 387 } 388 389 @Override 390 public void stop(ComponentContext context) throws InterruptedException { 391 // interrupt all migration tasks 392 executor.shutdownNow(); 393 executor.awaitTermination(10, TimeUnit.SECONDS); // wait 10s for termination 394 executor = null; 395 } 396 397 @Override 398 public MigrationStatus getStatus(String id) { 399 MigrationDescriptor descr = registry.getMigrationDescriptor(id); 400 if (descr == null) { 401 return null; // migration unknown 402 } 403 KeyValueStore kv = getKeyValueStore(); 404 String state = kv.getString(id); 405 if (state != null) { 406 return new MigrationStatus(state); 407 } 408 String step = kv.getString(id + STEP); 409 if (step == null) { 410 state = descr.getDefaultState(); 411 return new MigrationStatus(state); 412 } 413 long startTime = Long.parseLong(kv.getString(id + START_TIME)); 414 long pingTime = Long.parseLong(kv.getString(id + PING_TIME)); 415 String progressMessage = kv.getString(id + PROGRESS_MESSAGE); 416 long progressNum = Long.parseLong(kv.getString(id + PROGRESS_NUM)); 417 long progressTotal = Long.parseLong(kv.getString(id + PROGRESS_TOTAL)); 418 if (progressMessage == null) { 419 progressMessage = ""; 420 } 421 return new MigrationStatus(step, startTime, pingTime, progressMessage, progressNum, progressTotal); 422 } 423 424 @Override 425 public void runStep(String id, String step) { 426 MigrationDescriptor descr = registry.getMigrationDescriptor(id); 427 if (descr == null) { 428 throw new IllegalArgumentException("Unknown migration: " + id); 429 } 430 MigrationStepDescriptor stepDescr = descr.getSteps().get(step); 431 if (stepDescr == null) { 432 throw new IllegalArgumentException("Unknown step: " + step + " for migration: " + id); 433 } 434 Class<?> klass = stepDescr.getKlass(); 435 if (!Migrator.class.isAssignableFrom(klass)) { 436 throw new RuntimeException("Invalid class not implementing Migrator: " + klass.getName() + " for step: " 437 + step + " for migration: " + id); 438 } 439 Migrator migrator; 440 try { 441 migrator = (Migrator) klass.getConstructor().newInstance(); 442 } catch (ReflectiveOperationException e) { 443 throw new RuntimeException(e); 444 } 445 StatusChangeNotifier notifier = getStatusChangeNotifier(descr); 446 447 ProgressReporter progressReporter = new ProgressReporter(id); 448 449 // switch to running 450 atomic(id, kv -> { 451 String state = kv.getString(id); 452 String currentStep = kv.getString(id + STEP); 453 if (state == null && currentStep == null) { 454 state = descr.getDefaultState(); 455 if (!descr.getStates().containsKey(state)) { 456 throw new RuntimeException("Invalid default state: " + state + " for migration: " + id); 457 } 458 } else if (state == null) { 459 throw new IllegalArgumentException("Migration: " + id + " already running step: " + currentStep); 460 } 461 if (!descr.getStates().containsKey(state)) { 462 throw new RuntimeException("Invalid current state: " + state + " for migration: " + id); 463 } 464 if (!stepDescr.getFromState().equals(state)) { 465 throw new IllegalArgumentException( 466 "Invalid step: " + step + " for migration: " + id + " in state: " + state); 467 } 468 String time = String.valueOf(System.currentTimeMillis()); 469 kv.put(id + STEP, step); 470 kv.put(id + START_TIME, time); 471 progressReporter.reportProgress("", 0, -1, true); 472 kv.put(id, (String) null); 473 }); 474 475 // allow notification of running step 476 notifier.notifyStatusChange(); 477 478 Consumer<MigrationContext> migration = migrationContext -> { 479 Thread.currentThread().setName("Nuxeo-Migrator-" + id); 480 migrator.run(migrationContext); 481 }; 482 483 BiConsumer<MigrationContext, Throwable> afterMigration = (migrationContext, t) -> { 484 if (t != null) { 485 log.error("Exception during execution of step: " + step + " for migration: " + id, t); 486 } 487 // after the migrator is finished, change state, except if shutdown is requested or exception 488 String state = (t != null || migrationContext.isShutdownRequested()) ? stepDescr.getFromState() 489 : stepDescr.getToState(); 490 atomic(id, kv -> { 491 kv.put(id, state); 492 kv.put(id + STEP, (String) null); 493 kv.put(id + START_TIME, (String) null); 494 progressReporter.reportProgress(null, -2, -2, false); 495 }); 496 // allow notification of new state 497 notifier.notifyStatusChange(); 498 }; 499 500 executor.execute(new MigratorWithContext(migration, progressReporter, afterMigration)); 501 } 502 503 protected StatusChangeNotifier getStatusChangeNotifier(String id) { 504 MigrationDescriptor descr = registry.getMigrationDescriptor(id); 505 return descr == null ? null : getStatusChangeNotifier(descr); 506 } 507 508 protected StatusChangeNotifier getStatusChangeNotifier(MigrationDescriptor descr) { 509 Class<?> klass = descr.getStatusChangeNotifierClass(); 510 if (klass == null) { 511 throw new RuntimeException("Missing statusChangeNotifier for migration: " + descr.getId()); 512 } 513 if (!StatusChangeNotifier.class.isAssignableFrom(klass)) { 514 throw new RuntimeException("Invalid class not implementing StatusChangeNotifier: " + klass.getName() 515 + " for migration: " + descr.getId()); 516 } 517 StatusChangeNotifier notifier; 518 try { 519 notifier = (StatusChangeNotifier) klass.getConstructor().newInstance(); 520 } catch (ReflectiveOperationException e) { 521 throw new RuntimeException(e); 522 } 523 return notifier; 524 } 525 526 /** 527 * Executes something while setting a lock, retrying a few times if the lock is already set. 528 */ 529 protected void atomic(String id, Consumer<KeyValueStore> consumer) { 530 KeyValueStore kv = getKeyValueStore(); 531 String nodeid = Framework.getProperty("repository.clustering.id"); 532 for (int i = 0; i < 5; i++) { 533 // the value of the lock is useful for debugging 534 String value = Instant.now() + " node=" + nodeid; 535 if (kv.compareAndSet(id + LOCK, null, value, WRITE_LOCK_TTL)) { 536 try { 537 consumer.accept(kv); 538 return; 539 } finally { 540 kv.put(id + LOCK, (String) null); 541 } 542 } 543 try { 544 Thread.sleep((long) (Math.random() * 100 * i)); 545 } catch (InterruptedException e) { 546 throw new RuntimeException(e); 547 } 548 } 549 String currentLock = kv.getString(id + LOCK); 550 throw new RuntimeException("Cannot lock for write migration: " + id + ", already locked: " + currentLock); 551 } 552 553}