001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.runtime.api;
017
018import java.time.Duration;
019import java.time.temporal.ChronoUnit;
020import java.time.temporal.TemporalAmount;
021import java.util.Optional;
022import java.util.Timer;
023import java.util.TimerTask;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import java.util.function.Consumer;
027import java.util.function.Function;
028import java.util.function.Supplier;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting;
033import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting.InScopeOfContext;
034import org.nuxeo.runtime.api.ServicePassivator.Termination.Failure;
035import org.nuxeo.runtime.api.ServicePassivator.Termination.Success;
036import org.nuxeo.runtime.model.ComponentManager;
037
038/**
039 * Blocks service accesses in order to run an operation which alter the runtime. That gives a way to prevent service
040 * consumers to enter during the shutdown or the reload operation.
041 * <p>
042 * The invoke chain is split in the following steps
043 * <dl>
044 * <dt>passivate</dt>
045 * <dd>intercept service lookup</dd>
046 * <dt>monitor</dt>
047 * <dd>monitor service pass-through accesses</dd>
048 * <dt>await
049 * <dt>
050 * <dd>wait for the runtime being quiet before proceeding</dd>
051 * <dt>proceed</dt>
052 * <dd>proceed with the operation and handle termination hook</dd>
053 * </dl>
054 *
055 * <pre>
056 * ServicePassivator
057 *         .passivate()
058 *         .withQuietDelay(ChronoUnit.SECONDS.getDuration().multipliedBy(20))
059 *         .monitor()
060 *         .withTimeout(ChronoUnit.MINUTES.getDuration().multipliedBy(2))
061 *         .await()
062 *         .proceed(() -> System.out.println("do something"))
063 *         .onFailure(failure -> System.out.println("failed " + failure))
064 *         .onSuccess(() -> System.out.println("succeed"));*
065 * </pre>
066 * </p>
067 *
068 * @since 8.1
069 */
070public class ServicePassivator {
071
072    public static Passivator passivate() {
073        return new Passivator();
074    }
075
076    public static Termination proceed(Duration quiet, Duration timeout, boolean enforce, Runnable runnable) {
077        return passivate()
078                .withQuietDelay(quiet)
079                .monitor()
080                .withTimeout(timeout)
081                .withEnforceMode(enforce)
082                .await()
083                .proceed(runnable);
084    }
085
086    public static <X extends Exception> void proceed(Duration quiet, Duration timeout,
087            boolean enforce, RunnableCheckException<Exception> runnable, Class<X> oftype) throws X {
088        class CheckExceptionHolder extends RuntimeException {
089
090            private static final long serialVersionUID = 1L;
091
092            CheckExceptionHolder(Throwable cause) {
093                super(cause);
094            }
095
096            void rethrow(Class<X> oftype) throws X {
097                if (getCause() instanceof InterruptedException) {
098                    Thread.currentThread()
099                            .interrupt();
100                }
101                throw oftype.cast(getCause());
102            }
103        }
104        try {
105            ServicePassivator.passivate()
106                    .withQuietDelay(quiet)
107                    .monitor()
108                    .withTimeout(timeout)
109                    .withEnforceMode(enforce)
110                    .await()
111                    .proceed(() -> {
112                        try {
113                            runnable.run();
114                        } catch (Exception cause) {
115                            throw new CheckExceptionHolder(cause);
116                        }
117                    });
118        } catch (CheckExceptionHolder cause) {
119            cause.rethrow(oftype);
120        }
121
122    }
123
124    public interface RunnableCheckException<X extends Exception> {
125        void run() throws X;
126    }
127
128    /**
129     * Intercepts service lookups for implementing the quiet logic.
130     */
131    public static class Passivator {
132
133        final Log log = LogFactory.getLog(ServicePassivator.class);
134
135        Passivator() {
136            run();
137        }
138
139        final CountDownLatch achieved = new CountDownLatch(1);
140
141        final Accounting accounting = new Accounting();
142
143        Optional<ServiceProvider> installed = Optional.empty();
144
145        void run() {
146            installed = Optional.ofNullable(DefaultServiceProvider.getProvider());
147            ServiceProvider passthrough = installed.map(
148                    (Function<ServiceProvider, ServiceProvider>) DelegateProvider::new)
149                    .orElseGet(
150                            (Supplier<ServiceProvider>) RuntimeProvider::new);
151            ServiceProvider waitfor = new WaitForProvider(achieved, passthrough);
152            PassivateProvider passivator = new PassivateProvider(Thread.currentThread(), accounting, waitfor,
153                    passthrough);
154            DefaultServiceProvider.setProvider(passivator);
155            log.debug("installed passivator", log.isTraceEnabled() ? new Throwable("stack trace") : null);
156        }
157
158        void commit() {
159            try {
160                DefaultServiceProvider.setProvider(installed.orElse(null));
161            } finally {
162                achieved.countDown();
163                log.debug("uninstalled passivator");
164            }
165        }
166
167        TemporalAmount quietDelay = Duration.ofSeconds(5);
168
169        public Passivator withQuietDelay(TemporalAmount delay) {
170            quietDelay = delay;
171            return this;
172        }
173
174        public Passivator peek(Consumer<Passivator> consumer) {
175            consumer.accept(this);
176            return this;
177        }
178
179        public Monitor monitor() {
180            return new Monitor(this, quietDelay);
181        }
182
183        /**
184         * Snapshots service lookups and states about service scoping. *
185         */
186        public class Accounting {
187
188            /**
189             * Takes a snapshot of the lookup.
190             *
191             * @param typeof
192             * @return
193             */
194            Optional<InScopeOfContext> take(Class<?> serviceof) {
195                Class<?>[] callstack = dumper.dump();
196                Optional<InScopeOfContext> snapshot = inscopeof(callstack)
197                        .map(inscopeof -> new InScopeOfContext(inscopeof, Thread.currentThread(), callstack));
198                snapshot.ifPresent(this::register);
199                return snapshot;
200            }
201
202            void register(InScopeOfContext context) {
203                last = Optional.of(context);
204            }
205
206            volatile Optional<InScopeOfContext> last = Optional.empty();
207
208            public Optional<InScopeOfContext> get() {
209                return last;
210            }
211
212            Optional<InScopeOfContext> reset() {
213                try {
214                    return last;
215                } finally {
216                    last = Optional.empty();
217                }
218            }
219
220            Optional<Class<?>> inscopeof(Class<?>[] callstack) {
221                final ComponentManager cm = Framework.getRuntime()
222                        .getComponentManager();
223                if (cm != null) {
224
225                    for (Class<?> typeof : callstack) {
226                        if (cm.getComponentProvidingService(typeof) != null) {
227                            return Optional.of(typeof);
228                        }
229                    }
230                }
231                return Optional.empty();
232            }
233
234            final CallstackDumper dumper = new CallstackDumper();
235
236            /**
237             * Scoped service call context.
238             */
239            public class InScopeOfContext {
240
241                InScopeOfContext(Class<?> serviceof, Thread thread, Class<?>[] callstack) {
242                    this.serviceof = serviceof;
243                    this.thread = thread;
244                    this.callstack = callstack;
245                }
246
247                final Class<?> serviceof;
248
249                final Thread thread;
250
251                final Class<?>[] callstack;
252
253                @Override
254                public String toString() {
255                    StringBuilder builder = new StringBuilder().append("on ")
256                            .append(thread)
257                            .append(" in scope of ")
258                            .append(serviceof)
259                            .append(System.lineSeparator());
260                    for (Class<?> typeof : callstack) {
261                        builder = builder.append("  ")
262                                .append(typeof)
263                                .append(System.lineSeparator());
264                    }
265                    return builder.toString();
266                }
267            }
268
269            /**
270             * Dumps caller stack and states for a service scope
271             */
272            class CallstackDumper extends SecurityManager {
273
274                Class<?>[] dump() {
275                    return super.getClassContext();
276                }
277
278            }
279
280        }
281
282    }
283
284    /**
285     * Monitors service lookups for stating about quiet status.
286     */
287    public static class Monitor {
288
289        Monitor(Passivator passivator, TemporalAmount quietDelay) {
290            this.passivator = passivator;
291            this.quietDelay = quietDelay;
292            run();
293        }
294
295        final Passivator passivator;
296
297        final CountDownLatch passivated = new CountDownLatch(1);
298
299        final TemporalAmount quietDelay;
300
301        final Timer timer = new Timer(ServicePassivator.class.getSimpleName()
302                .toLowerCase());
303
304        final TimerTask scheduledTask = new TimerTask() {
305
306            @Override
307            public void run() {
308                Optional<InScopeOfContext> observed = passivator.accounting.reset();
309                if (observed.isPresent()) {
310                    return;
311                }
312                cancel();
313                passivated.countDown();
314            }
315        };
316
317        void run() {
318            long delay = TimeUnit.MILLISECONDS.convert(quietDelay.get(ChronoUnit.SECONDS), TimeUnit.SECONDS);
319            if (delay <= 0) {
320                passivated.countDown();
321                return;
322            }
323            timer.scheduleAtFixedRate(
324                    scheduledTask,
325                    delay,
326                    delay);
327            passivator.log.debug("monitoring accesses");
328        }
329
330        /**
331         * Cancels service lookups monitoring.
332         */
333        void cancel() {
334            try {
335                timer.cancel();
336            } finally {
337                passivator.commit();
338            }
339        }
340
341        TemporalAmount timeout = Duration.ofSeconds(30);
342
343        boolean enforce = true;
344
345        public Monitor withTimeout(TemporalAmount value) {
346            timeout = value;
347            return this;
348        }
349
350        public Monitor withEnforceMode(boolean value) {
351            enforce = value;
352            return this;
353        }
354
355        public Monitor peek(Consumer<Monitor> consumer) {
356            consumer.accept(this);
357            return this;
358        }
359
360        /**
361         * Installs the timer task which monitor the service lookups. Once there will be no more lookups in the
362         * scheduled period, notifies the runner for proceeding.
363         *
364         * @param next
365         * @return
366         */
367        public Waiter await() {
368            return new Waiter(this, timeout, enforce);
369        }
370
371    }
372
373    /**
374     * Terminates the chain by running the operation in a passivated context.
375     */
376    public static class Waiter {
377
378        Waiter(Monitor monitor, TemporalAmount timeout, boolean enforce) {
379            this.monitor = monitor;
380            this.timeout = timeout;
381            this.enforce = enforce;
382        }
383
384        final Monitor monitor;
385
386        final TemporalAmount timeout;
387
388        final boolean enforce;
389
390        public Waiter peek(Consumer<Waiter> consumer) {
391            consumer.accept(this);
392            return this;
393        }
394
395        /**
396         * Terminates the chain by invoking the operation
397         * <ul>
398         * <li>waits for the runtime being passivated,</li>
399         * <li>and then runs the operation,</li>
400         * <li>and then notifies the blocked lookup to proceed.</li>
401         *
402         * @param runnable the runnable to execute
403         * @return the termination interface
404         */
405        public Termination proceed(Runnable runnable) {
406            try {
407                final long delay = timeout.get(ChronoUnit.SECONDS);
408                monitor.passivator.log.debug("waiting " + delay + "s for passivation");
409                boolean passivated = monitor.passivated.await(delay, TimeUnit.SECONDS);
410                if (!enforce || passivated) {
411                    monitor.passivator.log.debug("proceeding");
412                    ClassLoader tcl = Thread.currentThread()
413                            .getContextClassLoader();
414                    try {
415                        runnable.run();
416                    } finally {
417                        Thread.currentThread()
418                                .setContextClassLoader(tcl);
419                    }
420                }
421                return monitor.passivator.accounting.last
422                        .<Termination> map(Failure::new)
423                        .orElseGet(Success::new);
424            } catch (InterruptedException cause) {
425                Thread.currentThread()
426                        .interrupt();
427                throw new AssertionError("Interrupted while waiting for passivation", cause);
428            } finally {
429                monitor.cancel();
430            }
431        }
432
433    }
434
435    /**
436     * Terminates the pacification by a success or a failure action and release the lock.
437     */
438    public interface Termination {
439
440        /**
441         * Executes the runnable if the passivation was as success
442         *
443         * @param runnable
444         */
445        default Termination onSuccess(Runnable finisher) {
446            return this;
447        }
448
449        /**
450         * Recover the failure if the passivation was a failure, ie: some activity has been detected during the
451         * protected operation.
452         *
453         * @param runnable the failure action
454         */
455        default Termination onFailure(Consumer<InScopeOfContext> recoverer) {
456            return this;
457        }
458
459        default Termination peek(Consumer<Termination> consumer) {
460            consumer.accept(this);
461            return this;
462        }
463
464        class Success implements Termination {
465
466            @Override
467            public Termination onSuccess(Runnable finisher) {
468                finisher.run();
469                return this;
470            }
471
472        }
473
474        class Failure implements Termination {
475
476            Failure(InScopeOfContext snapshot) {
477                this.snapshot = snapshot;
478            }
479
480            final InScopeOfContext snapshot;
481
482            @Override
483            public Termination onFailure(Consumer<InScopeOfContext> recoverer) {
484                recoverer.accept(snapshot);
485                return this;
486            }
487        }
488    }
489
490    /**
491     * Intercepts service lookups for blocking other threads.
492     */
493    static class PassivateProvider implements ServiceProvider {
494
495        PassivateProvider(Thread ownerThread, Accounting accounting, ServiceProvider waitfor,
496                ServiceProvider passthrough) {
497            this.ownerThread = ownerThread;
498            this.accounting = accounting;
499            this.waitfor = waitfor;
500            this.passthrough = passthrough;
501        }
502
503        final Thread ownerThread;
504
505        final Accounting accounting;
506
507        final ServiceProvider passthrough;
508
509        final ServiceProvider waitfor;
510
511        @Override
512        public <T> T getService(Class<T> typeof) {
513            if (Thread.currentThread() == ownerThread) {
514                return passthrough.getService(typeof);
515            }
516            return accounting
517                    .take(typeof)
518                    .map(snapshot -> passthrough)
519                    .orElse(waitfor)
520                    .getService(typeof);
521        }
522    }
523
524    /**
525     * Delegates the lookup to the previously installed service provider.
526     */
527    static class DelegateProvider implements ServiceProvider {
528
529        DelegateProvider(ServiceProvider provider) {
530            next = provider;
531        }
532
533        final ServiceProvider next;
534
535        @Override
536        public <T> T getService(Class<T> serviceClass) {
537            return next.getService(serviceClass);
538        }
539
540    }
541
542    /**
543     * Let runtime resolve the service.
544     */
545    static class RuntimeProvider implements ServiceProvider {
546
547        @Override
548        public <T> T getService(Class<T> serviceClass) {
549            return Framework.getRuntime()
550                    .getService(serviceClass);
551        }
552
553    }
554
555    /**
556     * Waits for the condition before invoking the effective lookup.
557     */
558    static class WaitForProvider implements ServiceProvider {
559
560        WaitForProvider(CountDownLatch condition, ServiceProvider passthrough) {
561            this.condition = condition;
562            this.passthrough = passthrough;
563        }
564
565        final CountDownLatch condition;
566
567        final ServiceProvider passthrough;
568
569        @Override
570        public <T> T getService(Class<T> serviceClass) {
571            try {
572                condition.await();
573            } catch (InterruptedException error) {
574                Thread.currentThread()
575                        .interrupt();
576                throw new AssertionError("Interrupted while waiting for " + serviceClass);
577            }
578            return passthrough.getService(serviceClass);
579        }
580
581    }
582
583}