001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.runtime.api; 017 018import java.time.Duration; 019import java.time.temporal.ChronoUnit; 020import java.time.temporal.TemporalAmount; 021import java.util.Optional; 022import java.util.Timer; 023import java.util.TimerTask; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.TimeUnit; 026import java.util.function.Consumer; 027import java.util.function.Function; 028import java.util.function.Supplier; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting; 033import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting.InScopeOfContext; 034import org.nuxeo.runtime.api.ServicePassivator.Termination.Failure; 035import org.nuxeo.runtime.api.ServicePassivator.Termination.Success; 036import org.nuxeo.runtime.model.ComponentManager; 037 038/** 039 * Blocks service accesses in order to run an operation which alter the runtime. That gives a way to prevent service 040 * consumers to enter during the shutdown or the reload operation. 041 * <p> 042 * The invoke chain is split in the following steps 043 * <dl> 044 * <dt>passivate</dt> 045 * <dd>intercept service lookup</dd> 046 * <dt>monitor</dt> 047 * <dd>monitor service pass-through accesses</dd> 048 * <dt>await 049 * <dt> 050 * <dd>wait for the runtime being quiet before proceeding</dd> 051 * <dt>proceed</dt> 052 * <dd>proceed with the operation and handle termination hook</dd> 053 * </dl> 054 * 055 * <pre> 056 * ServicePassivator 057 * .passivate() 058 * .withQuietDelay(ChronoUnit.SECONDS.getDuration().multipliedBy(20)) 059 * .monitor() 060 * .withTimeout(ChronoUnit.MINUTES.getDuration().multipliedBy(2)) 061 * .await() 062 * .proceed(() -> System.out.println("do something")) 063 * .onFailure(failure -> System.out.println("failed " + failure)) 064 * .onSuccess(() -> System.out.println("succeed"));* 065 * </pre> 066 * </p> 067 * 068 * @since 8.1 069 */ 070public class ServicePassivator { 071 072 public static Passivator passivate() { 073 return new Passivator(); 074 } 075 076 public static Termination proceed(Duration quiet, Duration timeout, boolean enforce, Runnable runnable) { 077 return passivate() 078 .withQuietDelay(quiet) 079 .monitor() 080 .withTimeout(timeout) 081 .withEnforceMode(enforce) 082 .await() 083 .proceed(runnable); 084 } 085 086 public static <X extends Exception> void proceed(Duration quiet, Duration timeout, 087 boolean enforce, RunnableCheckException<Exception> runnable, Class<X> oftype) throws X { 088 class CheckExceptionHolder extends RuntimeException { 089 090 private static final long serialVersionUID = 1L; 091 092 CheckExceptionHolder(Throwable cause) { 093 super(cause); 094 } 095 096 void rethrow(Class<X> oftype) throws X { 097 if (getCause() instanceof InterruptedException) { 098 Thread.currentThread() 099 .interrupt(); 100 } 101 throw oftype.cast(getCause()); 102 } 103 } 104 try { 105 ServicePassivator.passivate() 106 .withQuietDelay(quiet) 107 .monitor() 108 .withTimeout(timeout) 109 .withEnforceMode(enforce) 110 .await() 111 .proceed(() -> { 112 try { 113 runnable.run(); 114 } catch (Exception cause) { 115 throw new CheckExceptionHolder(cause); 116 } 117 }); 118 } catch (CheckExceptionHolder cause) { 119 cause.rethrow(oftype); 120 } 121 122 } 123 124 public interface RunnableCheckException<X extends Exception> { 125 void run() throws X; 126 } 127 128 /** 129 * Intercepts service lookups for implementing the quiet logic. 130 */ 131 public static class Passivator { 132 133 final Log log = LogFactory.getLog(ServicePassivator.class); 134 135 Passivator() { 136 run(); 137 } 138 139 final CountDownLatch achieved = new CountDownLatch(1); 140 141 final Accounting accounting = new Accounting(); 142 143 Optional<ServiceProvider> installed = Optional.empty(); 144 145 void run() { 146 installed = Optional.ofNullable(DefaultServiceProvider.getProvider()); 147 ServiceProvider passthrough = installed.map( 148 (Function<ServiceProvider, ServiceProvider>) DelegateProvider::new) 149 .orElseGet( 150 (Supplier<ServiceProvider>) RuntimeProvider::new); 151 ServiceProvider waitfor = new WaitForProvider(achieved, passthrough); 152 PassivateProvider passivator = new PassivateProvider(Thread.currentThread(), accounting, waitfor, 153 passthrough); 154 DefaultServiceProvider.setProvider(passivator); 155 log.debug("installed passivator", log.isTraceEnabled() ? new Throwable("stack trace") : null); 156 } 157 158 void commit() { 159 try { 160 DefaultServiceProvider.setProvider(installed.orElse(null)); 161 } finally { 162 achieved.countDown(); 163 log.debug("uninstalled passivator"); 164 } 165 } 166 167 TemporalAmount quietDelay = Duration.ofSeconds(5); 168 169 public Passivator withQuietDelay(TemporalAmount delay) { 170 quietDelay = delay; 171 return this; 172 } 173 174 public Passivator peek(Consumer<Passivator> consumer) { 175 consumer.accept(this); 176 return this; 177 } 178 179 public Monitor monitor() { 180 return new Monitor(this, quietDelay); 181 } 182 183 /** 184 * Snapshots service lookups and states about service scoping. * 185 */ 186 public class Accounting { 187 188 /** 189 * Takes a snapshot of the lookup. 190 * 191 * @param typeof 192 * @return 193 */ 194 Optional<InScopeOfContext> take(Class<?> serviceof) { 195 Class<?>[] callstack = dumper.dump(); 196 Optional<InScopeOfContext> snapshot = inscopeof(callstack) 197 .map(inscopeof -> new InScopeOfContext(inscopeof, Thread.currentThread(), callstack)); 198 snapshot.ifPresent(this::register); 199 return snapshot; 200 } 201 202 void register(InScopeOfContext context) { 203 last = Optional.of(context); 204 } 205 206 volatile Optional<InScopeOfContext> last = Optional.empty(); 207 208 public Optional<InScopeOfContext> get() { 209 return last; 210 } 211 212 Optional<InScopeOfContext> reset() { 213 try { 214 return last; 215 } finally { 216 last = Optional.empty(); 217 } 218 } 219 220 Optional<Class<?>> inscopeof(Class<?>[] callstack) { 221 final ComponentManager cm = Framework.getRuntime() 222 .getComponentManager(); 223 if (cm != null) { 224 225 for (Class<?> typeof : callstack) { 226 if (cm.getComponentProvidingService(typeof) != null) { 227 return Optional.of(typeof); 228 } 229 } 230 } 231 return Optional.empty(); 232 } 233 234 final CallstackDumper dumper = new CallstackDumper(); 235 236 /** 237 * Scoped service call context. 238 */ 239 public class InScopeOfContext { 240 241 InScopeOfContext(Class<?> serviceof, Thread thread, Class<?>[] callstack) { 242 this.serviceof = serviceof; 243 this.thread = thread; 244 this.callstack = callstack; 245 } 246 247 final Class<?> serviceof; 248 249 final Thread thread; 250 251 final Class<?>[] callstack; 252 253 @Override 254 public String toString() { 255 StringBuilder builder = new StringBuilder().append("on ") 256 .append(thread) 257 .append(" in scope of ") 258 .append(serviceof) 259 .append(System.lineSeparator()); 260 for (Class<?> typeof : callstack) { 261 builder = builder.append(" ") 262 .append(typeof) 263 .append(System.lineSeparator()); 264 } 265 return builder.toString(); 266 } 267 } 268 269 /** 270 * Dumps caller stack and states for a service scope 271 */ 272 class CallstackDumper extends SecurityManager { 273 274 Class<?>[] dump() { 275 return super.getClassContext(); 276 } 277 278 } 279 280 } 281 282 } 283 284 /** 285 * Monitors service lookups for stating about quiet status. 286 */ 287 public static class Monitor { 288 289 Monitor(Passivator passivator, TemporalAmount quietDelay) { 290 this.passivator = passivator; 291 this.quietDelay = quietDelay; 292 run(); 293 } 294 295 final Passivator passivator; 296 297 final CountDownLatch passivated = new CountDownLatch(1); 298 299 final TemporalAmount quietDelay; 300 301 final Timer timer = new Timer(ServicePassivator.class.getSimpleName() 302 .toLowerCase()); 303 304 final TimerTask scheduledTask = new TimerTask() { 305 306 @Override 307 public void run() { 308 Optional<InScopeOfContext> observed = passivator.accounting.reset(); 309 if (observed.isPresent()) { 310 return; 311 } 312 cancel(); 313 passivated.countDown(); 314 } 315 }; 316 317 void run() { 318 long delay = TimeUnit.MILLISECONDS.convert(quietDelay.get(ChronoUnit.SECONDS), TimeUnit.SECONDS); 319 if (delay <= 0) { 320 passivated.countDown(); 321 return; 322 } 323 timer.scheduleAtFixedRate( 324 scheduledTask, 325 delay, 326 delay); 327 passivator.log.debug("monitoring accesses"); 328 } 329 330 /** 331 * Cancels service lookups monitoring. 332 */ 333 void cancel() { 334 try { 335 timer.cancel(); 336 } finally { 337 passivator.commit(); 338 } 339 } 340 341 TemporalAmount timeout = Duration.ofSeconds(30); 342 343 boolean enforce = true; 344 345 public Monitor withTimeout(TemporalAmount value) { 346 timeout = value; 347 return this; 348 } 349 350 public Monitor withEnforceMode(boolean value) { 351 enforce = value; 352 return this; 353 } 354 355 public Monitor peek(Consumer<Monitor> consumer) { 356 consumer.accept(this); 357 return this; 358 } 359 360 /** 361 * Installs the timer task which monitor the service lookups. Once there will be no more lookups in the 362 * scheduled period, notifies the runner for proceeding. 363 * 364 * @param next 365 * @return 366 */ 367 public Waiter await() { 368 return new Waiter(this, timeout, enforce); 369 } 370 371 } 372 373 /** 374 * Terminates the chain by running the operation in a passivated context. 375 */ 376 public static class Waiter { 377 378 Waiter(Monitor monitor, TemporalAmount timeout, boolean enforce) { 379 this.monitor = monitor; 380 this.timeout = timeout; 381 this.enforce = enforce; 382 } 383 384 final Monitor monitor; 385 386 final TemporalAmount timeout; 387 388 final boolean enforce; 389 390 public Waiter peek(Consumer<Waiter> consumer) { 391 consumer.accept(this); 392 return this; 393 } 394 395 /** 396 * Terminates the chain by invoking the operation 397 * <ul> 398 * <li>waits for the runtime being passivated,</li> 399 * <li>and then runs the operation,</li> 400 * <li>and then notifies the blocked lookup to proceed.</li> 401 * 402 * @param runnable the runnable to execute 403 * @return the termination interface 404 */ 405 public Termination proceed(Runnable runnable) { 406 try { 407 final long delay = timeout.get(ChronoUnit.SECONDS); 408 monitor.passivator.log.debug("waiting " + delay + "s for passivation"); 409 boolean passivated = monitor.passivated.await(delay, TimeUnit.SECONDS); 410 if (!enforce || passivated) { 411 monitor.passivator.log.debug("proceeding"); 412 ClassLoader tcl = Thread.currentThread() 413 .getContextClassLoader(); 414 try { 415 runnable.run(); 416 } finally { 417 Thread.currentThread() 418 .setContextClassLoader(tcl); 419 } 420 } 421 return monitor.passivator.accounting.last 422 .<Termination> map(Failure::new) 423 .orElseGet(Success::new); 424 } catch (InterruptedException cause) { 425 Thread.currentThread() 426 .interrupt(); 427 throw new AssertionError("Interrupted while waiting for passivation", cause); 428 } finally { 429 monitor.cancel(); 430 } 431 } 432 433 } 434 435 /** 436 * Terminates the pacification by a success or a failure action and release the lock. 437 */ 438 public interface Termination { 439 440 /** 441 * Executes the runnable if the passivation was as success 442 * 443 * @param runnable 444 */ 445 default Termination onSuccess(Runnable finisher) { 446 return this; 447 } 448 449 /** 450 * Recover the failure if the passivation was a failure, ie: some activity has been detected during the 451 * protected operation. 452 * 453 * @param runnable the failure action 454 */ 455 default Termination onFailure(Consumer<InScopeOfContext> recoverer) { 456 return this; 457 } 458 459 default Termination peek(Consumer<Termination> consumer) { 460 consumer.accept(this); 461 return this; 462 } 463 464 class Success implements Termination { 465 466 @Override 467 public Termination onSuccess(Runnable finisher) { 468 finisher.run(); 469 return this; 470 } 471 472 } 473 474 class Failure implements Termination { 475 476 Failure(InScopeOfContext snapshot) { 477 this.snapshot = snapshot; 478 } 479 480 final InScopeOfContext snapshot; 481 482 @Override 483 public Termination onFailure(Consumer<InScopeOfContext> recoverer) { 484 recoverer.accept(snapshot); 485 return this; 486 } 487 } 488 } 489 490 /** 491 * Intercepts service lookups for blocking other threads. 492 */ 493 static class PassivateProvider implements ServiceProvider { 494 495 PassivateProvider(Thread ownerThread, Accounting accounting, ServiceProvider waitfor, 496 ServiceProvider passthrough) { 497 this.ownerThread = ownerThread; 498 this.accounting = accounting; 499 this.waitfor = waitfor; 500 this.passthrough = passthrough; 501 } 502 503 final Thread ownerThread; 504 505 final Accounting accounting; 506 507 final ServiceProvider passthrough; 508 509 final ServiceProvider waitfor; 510 511 @Override 512 public <T> T getService(Class<T> typeof) { 513 if (Thread.currentThread() == ownerThread) { 514 return passthrough.getService(typeof); 515 } 516 return accounting 517 .take(typeof) 518 .map(snapshot -> passthrough) 519 .orElse(waitfor) 520 .getService(typeof); 521 } 522 } 523 524 /** 525 * Delegates the lookup to the previously installed service provider. 526 */ 527 static class DelegateProvider implements ServiceProvider { 528 529 DelegateProvider(ServiceProvider provider) { 530 next = provider; 531 } 532 533 final ServiceProvider next; 534 535 @Override 536 public <T> T getService(Class<T> serviceClass) { 537 return next.getService(serviceClass); 538 } 539 540 } 541 542 /** 543 * Let runtime resolve the service. 544 */ 545 static class RuntimeProvider implements ServiceProvider { 546 547 @Override 548 public <T> T getService(Class<T> serviceClass) { 549 return Framework.getRuntime() 550 .getService(serviceClass); 551 } 552 553 } 554 555 /** 556 * Waits for the condition before invoking the effective lookup. 557 */ 558 static class WaitForProvider implements ServiceProvider { 559 560 WaitForProvider(CountDownLatch condition, ServiceProvider passthrough) { 561 this.condition = condition; 562 this.passthrough = passthrough; 563 } 564 565 final CountDownLatch condition; 566 567 final ServiceProvider passthrough; 568 569 @Override 570 public <T> T getService(Class<T> serviceClass) { 571 try { 572 condition.await(); 573 } catch (InterruptedException error) { 574 Thread.currentThread() 575 .interrupt(); 576 throw new AssertionError("Interrupted while waiting for " + serviceClass); 577 } 578 return passthrough.getService(serviceClass); 579 } 580 581 } 582 583}