001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.nuxeo.runtime.api; 017 018import java.time.Duration; 019import java.time.temporal.ChronoUnit; 020import java.time.temporal.TemporalAmount; 021import java.util.Optional; 022import java.util.Timer; 023import java.util.TimerTask; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.TimeUnit; 026import java.util.function.Consumer; 027import java.util.function.Function; 028import java.util.function.Supplier; 029 030import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting; 031import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting.InScopeOfContext; 032import org.nuxeo.runtime.api.ServicePassivator.Termination.Failure; 033import org.nuxeo.runtime.api.ServicePassivator.Termination.Success; 034import org.nuxeo.runtime.model.ComponentManager; 035 036/** 037 * Blocks service accesses in order to run an operation which alter the runtime. That gives a way to prevent service 038 * consumers to enter during the shutdown or the reload operation. 039 * <p> 040 * The invoke chain is split in the following steps 041 * <dl> 042 * <dt>passivate</dt> 043 * <dd>intercept service lookup</dd> 044 * <dt>monitor</dt> 045 * <dd>monitor service pass-through accesses</dd> 046 * <dt>await 047 * <dt> 048 * <dd>wait for the runtime being quiet before proceeding</dd> 049 * <dt>proceed</dt> 050 * <dd>proceed with the operation and handle termination hook</dd> 051 * </dl> 052 * 053 * <pre> 054 * ServicePassivator 055 * .passivate() 056 * .withQuietDelay(ChronoUnit.SECONDS.getDuration().multipliedBy(20)) 057 * .monitor() 058 * .withTimeout(ChronoUnit.MINUTES.getDuration().multipliedBy(2)) 059 * .await() 060 * .proceed(() -> System.out.println("do something")) 061 * .onFailure(failure -> System.out.println("failed " + failure)) 062 * .onSuccess(() -> System.out.println("succeed"));* 063 * </pre> 064 * </p> 065 * 066 * @since 8.1 067 */ 068public class ServicePassivator { 069 070 public static Passivator passivate() { 071 return new Passivator(); 072 } 073 074 public static Termination proceed(Runnable runnable) { 075 return passivate() 076 .monitor() 077 .await() 078 .proceed(runnable); 079 } 080 081 /** 082 * Intercepts service lookups for implementing the quiet logic. 083 */ 084 public static class Passivator { 085 086 Passivator() { 087 run(); 088 } 089 090 final CountDownLatch achieved = new CountDownLatch(1); 091 092 final Accounting accounting = new Accounting(); 093 094 Optional<ServiceProvider> installed = Optional.empty(); 095 096 void run() { 097 installed = Optional.ofNullable(DefaultServiceProvider.getProvider()); 098 ServiceProvider passthrough = installed.map( 099 (Function<ServiceProvider, ServiceProvider>) DelegateProvider::new).orElseGet( 100 (Supplier<ServiceProvider>) RuntimeProvider::new); 101 ServiceProvider waitfor = new WaitForProvider(achieved, passthrough); 102 PassivateProvider passivator = new PassivateProvider(Thread.currentThread(), accounting, waitfor, 103 passthrough); 104 DefaultServiceProvider.setProvider(passivator); 105 } 106 107 void resetProvider() { 108 PassivateProvider passivator = (PassivateProvider) DefaultServiceProvider.getProvider(); 109 ServiceProvider previous = passivator.passthrough instanceof DelegateProvider 110 ? ((DelegateProvider) passivator.passthrough).next 111 : null; 112 DefaultServiceProvider.setProvider(previous); 113 114 } 115 116 void commit() { 117 try { 118 DefaultServiceProvider.setProvider(installed.orElse(null)); 119 } finally { 120 achieved.countDown(); 121 } 122 } 123 124 TemporalAmount quietDelay = Duration.ofSeconds(5); 125 126 public Passivator withQuietDelay(TemporalAmount delay) { 127 quietDelay = delay; 128 return this; 129 } 130 131 public Passivator peek(Consumer<Passivator> consumer) { 132 consumer.accept(this); 133 return this; 134 } 135 136 public Monitor monitor() { 137 return new Monitor(this, quietDelay); 138 } 139 140 /** 141 * Snapshots service lookups and states about service scoping. * 142 */ 143 public class Accounting { 144 145 /** 146 * Takes a snapshot of the lookup. 147 * 148 * @param typeof 149 * @return 150 */ 151 Optional<InScopeOfContext> take(Class<?> serviceof) { 152 Class<?>[] callstack = dumper.dump(); 153 Optional<InScopeOfContext> snapshot = inscopeof(callstack) 154 .map(inscopeof -> new InScopeOfContext(inscopeof, Thread.currentThread(), callstack)); 155 snapshot.ifPresent(this::register); 156 return snapshot; 157 } 158 159 void register(InScopeOfContext context) { 160 last = Optional.of(context); 161 } 162 163 volatile Optional<InScopeOfContext> last = Optional.empty(); 164 165 public Optional<InScopeOfContext> get() { 166 return last; 167 } 168 169 Optional<InScopeOfContext> reset() { 170 try { 171 return last; 172 } finally { 173 last = Optional.empty(); 174 } 175 } 176 177 Optional<Class<?>> inscopeof(Class<?>[] callstack) { 178 for (Class<?> typeof : callstack) { 179 if (manager.getComponentProvidingService(typeof) != null) { 180 return Optional.of(typeof); 181 } 182 } 183 return Optional.empty(); 184 } 185 186 final ComponentManager manager = Framework.getRuntime().getComponentManager(); 187 188 final CallstackDumper dumper = new CallstackDumper(); 189 190 /** 191 * Scoped service call context. 192 */ 193 public class InScopeOfContext { 194 195 InScopeOfContext(Class<?> serviceof, Thread thread, Class<?>[] callstack) { 196 this.serviceof = serviceof; 197 this.thread = thread; 198 this.callstack = callstack; 199 } 200 201 final Class<?> serviceof; 202 203 final Thread thread; 204 205 final Class<?>[] callstack; 206 207 @Override 208 public String toString() { 209 StringBuilder builder = new StringBuilder().append("on ") 210 .append(thread) 211 .append(" in scope of ") 212 .append(serviceof) 213 .append(System.lineSeparator()); 214 for (Class<?> typeof : callstack) { 215 builder = builder.append(" ").append(typeof).append(System.lineSeparator()); 216 } 217 return builder.toString(); 218 } 219 } 220 221 /** 222 * Dumps caller stack and states for a service scope 223 */ 224 class CallstackDumper extends SecurityManager { 225 226 Class<?>[] dump() { 227 return super.getClassContext(); 228 } 229 230 } 231 232 } 233 234 } 235 236 /** 237 * Monitors service lookups for stating about quiet status. 238 */ 239 public static class Monitor { 240 241 Monitor(Passivator passivator, TemporalAmount quietDelay) { 242 this.passivator = passivator; 243 this.quietDelay = quietDelay; 244 run(); 245 } 246 247 final Passivator passivator; 248 249 final CountDownLatch passivated = new CountDownLatch(1); 250 251 final TemporalAmount quietDelay; 252 253 final Timer timer = new Timer(ServicePassivator.class.getSimpleName().toLowerCase()); 254 255 final TimerTask scheduledTask = new TimerTask() { 256 257 @Override 258 public void run() { 259 Optional<InScopeOfContext> observed = passivator.accounting.reset(); 260 if (observed.isPresent()) { 261 return; 262 } 263 cancel(); 264 passivated.countDown(); 265 } 266 }; 267 268 void run() { 269 long delay = TimeUnit.MILLISECONDS.convert(quietDelay.get(ChronoUnit.SECONDS), TimeUnit.SECONDS); 270 timer.scheduleAtFixedRate( 271 scheduledTask, 272 delay, 273 delay); 274 } 275 276 /** 277 * Cancels service lookups monitoring. 278 */ 279 void cancel() { 280 try { 281 timer.cancel(); 282 } finally { 283 passivator.commit(); 284 } 285 } 286 287 TemporalAmount timeout = Duration.ofSeconds(30); 288 289 public Monitor withTimeout(TemporalAmount timeout) { 290 this.timeout = timeout; 291 return this; 292 } 293 294 public Monitor peek(Consumer<Monitor> consumer) { 295 consumer.accept(this); 296 return this; 297 } 298 299 /** 300 * Installs the timer task which monitor the service lookups. Once there will be no more lookups in the 301 * scheduled period, notifies the runner for proceeding. 302 * 303 * @param next 304 * @return 305 */ 306 public Waiter await() { 307 return new Waiter(this, timeout); 308 } 309 310 } 311 312 /** 313 * Terminates the chain by running the operation in a passivated context. 314 */ 315 public static class Waiter { 316 317 Waiter(Monitor monitor, TemporalAmount timeout) { 318 this.monitor = monitor; 319 this.timeout = timeout; 320 } 321 322 final Monitor monitor; 323 324 final TemporalAmount timeout; 325 326 public Waiter peek(Consumer<Waiter> consumer) { 327 consumer.accept(this); 328 return this; 329 } 330 331 /** 332 * Terminates the chain by invoking the operation 333 * <ul> 334 * <li>waits for the runtime being passivated,</li> 335 * <li>and then runs the operation,</li> 336 * <li>and then notifies the blocked lookup to proceed.</li> 337 * 338 * @param runnable the runnable to execute 339 * @return the termination interface 340 */ 341 public Termination proceed(Runnable runnable) { 342 try { 343 if (monitor.passivated.await(timeout.get(ChronoUnit.SECONDS), TimeUnit.SECONDS)) { 344 runnable.run(); 345 } 346 return monitor.passivator.accounting.last 347 .<Termination> map(Failure::new) 348 .orElseGet(Success::new); 349 } catch (InterruptedException cause) { 350 Thread.currentThread().interrupt(); 351 throw new AssertionError("Interrupted while waiting for passivation", cause); 352 } finally { 353 monitor.cancel(); 354 } 355 } 356 357 } 358 359 /** 360 * Terminates the pacification by a success or a failure action and release the lock. 361 */ 362 public interface Termination { 363 364 /** 365 * Executes the runnable if the passivation was as success 366 * 367 * @param runnable 368 */ 369 default Termination onSuccess(Runnable finisher) { 370 return this; 371 } 372 373 /** 374 * Recover the failure if the passivation was a failure, ie: some activity has been detected during the 375 * protected operation. 376 * 377 * @param runnable the failure action 378 */ 379 default Termination onFailure(Consumer<InScopeOfContext> recoverer) { 380 return this; 381 } 382 383 default Termination peek(Consumer<Termination> consumer) { 384 consumer.accept(this); 385 return this; 386 } 387 388 class Success implements Termination { 389 390 @Override 391 public Termination onSuccess(Runnable finisher) { 392 finisher.run(); 393 return this; 394 } 395 396 } 397 398 class Failure implements Termination { 399 400 Failure(InScopeOfContext snapshot) { 401 this.snapshot = snapshot; 402 } 403 404 final InScopeOfContext snapshot; 405 406 @Override 407 public Termination onFailure(Consumer<InScopeOfContext> recoverer) { 408 recoverer.accept(snapshot); 409 return this; 410 } 411 } 412 } 413 414 /** 415 * Intercepts service lookups for blocking other threads. 416 */ 417 static class PassivateProvider implements ServiceProvider { 418 419 PassivateProvider(Thread ownerThread, Accounting accounting, ServiceProvider waitfor, 420 ServiceProvider passthrough) { 421 this.ownerThread = ownerThread; 422 this.accounting = accounting; 423 this.waitfor = waitfor; 424 this.passthrough = passthrough; 425 } 426 427 final Thread ownerThread; 428 429 final Accounting accounting; 430 431 final ServiceProvider passthrough; 432 433 final ServiceProvider waitfor; 434 435 @Override 436 public <T> T getService(Class<T> typeof) { 437 if (Thread.currentThread() == ownerThread) { 438 return passthrough.getService(typeof); 439 } 440 return accounting 441 .take(typeof) 442 .map(snapshot -> passthrough) 443 .orElse(waitfor) 444 .getService(typeof); 445 } 446 } 447 448 /** 449 * Delegates the lookup to the previously installed service provider. 450 */ 451 static class DelegateProvider implements ServiceProvider { 452 453 DelegateProvider(ServiceProvider provider) { 454 next = provider; 455 } 456 457 final ServiceProvider next; 458 459 @Override 460 public <T> T getService(Class<T> serviceClass) { 461 return next.getService(serviceClass); 462 } 463 464 } 465 466 /** 467 * Let runtime resolve the service. 468 */ 469 static class RuntimeProvider implements ServiceProvider { 470 471 @Override 472 public <T> T getService(Class<T> serviceClass) { 473 return Framework.getRuntime().getService(serviceClass); 474 } 475 476 } 477 478 /** 479 * Waits for the condition before invoking the effective lookup. 480 */ 481 static class WaitForProvider implements ServiceProvider { 482 483 WaitForProvider(CountDownLatch condition, ServiceProvider passthrough) { 484 this.condition = condition; 485 this.passthrough = passthrough; 486 } 487 488 final CountDownLatch condition; 489 490 final ServiceProvider passthrough; 491 492 @Override 493 public <T> T getService(Class<T> serviceClass) { 494 try { 495 condition.await(); 496 } catch (InterruptedException error) { 497 Thread.currentThread().interrupt(); 498 throw new AssertionError("Interrupted while waiting for " + serviceClass); 499 } 500 return passthrough.getService(serviceClass); 501 } 502 503 } 504 505}