001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.nuxeo.runtime.api;
017
018import java.time.Duration;
019import java.time.temporal.ChronoUnit;
020import java.time.temporal.TemporalAmount;
021import java.util.Optional;
022import java.util.Timer;
023import java.util.TimerTask;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import java.util.function.Consumer;
027import java.util.function.Function;
028import java.util.function.Supplier;
029
030import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting;
031import org.nuxeo.runtime.api.ServicePassivator.Passivator.Accounting.InScopeOfContext;
032import org.nuxeo.runtime.api.ServicePassivator.Termination.Failure;
033import org.nuxeo.runtime.api.ServicePassivator.Termination.Success;
034import org.nuxeo.runtime.model.ComponentManager;
035
036/**
037 * Blocks service accesses in order to run an operation which alter the runtime. That gives a way to prevent service
038 * consumers to enter during the shutdown or the reload operation.
039 * <p>
040 * The invoke chain is split in the following steps
041 * <dl>
042 * <dt>passivate</dt>
043 * <dd>intercept service lookup</dd>
044 * <dt>monitor</dt>
045 * <dd>monitor service pass-through accesses</dd>
046 * <dt>await
047 * <dt>
048 * <dd>wait for the runtime being quiet before proceeding</dd>
049 * <dt>proceed</dt>
050 * <dd>proceed with the operation and handle termination hook</dd>
051 * </dl>
052 *
053 * <pre>
054 * ServicePassivator
055 *         .passivate()
056 *         .withQuietDelay(ChronoUnit.SECONDS.getDuration().multipliedBy(20))
057 *         .monitor()
058 *         .withTimeout(ChronoUnit.MINUTES.getDuration().multipliedBy(2))
059 *         .await()
060 *         .proceed(() -> System.out.println("do something"))
061 *         .onFailure(failure -> System.out.println("failed " + failure))
062 *         .onSuccess(() -> System.out.println("succeed"));*
063 * </pre>
064 * </p>
065 *
066 * @since 8.1
067 */
068public class ServicePassivator {
069
070    public static Passivator passivate() {
071        return new Passivator();
072    }
073
074    public static Termination proceed(Runnable runnable) {
075        return passivate()
076                .monitor()
077                .await()
078                .proceed(runnable);
079    }
080
081    /**
082     * Intercepts service lookups for implementing the quiet logic.
083     */
084    public static class Passivator {
085
086        Passivator() {
087            run();
088        }
089
090        final CountDownLatch achieved = new CountDownLatch(1);
091
092        final Accounting accounting = new Accounting();
093
094        Optional<ServiceProvider> installed = Optional.empty();
095
096        void run() {
097            installed = Optional.ofNullable(DefaultServiceProvider.getProvider());
098            ServiceProvider passthrough = installed.map(
099                    (Function<ServiceProvider, ServiceProvider>) DelegateProvider::new).orElseGet(
100                            (Supplier<ServiceProvider>) RuntimeProvider::new);
101            ServiceProvider waitfor = new WaitForProvider(achieved, passthrough);
102            PassivateProvider passivator = new PassivateProvider(Thread.currentThread(), accounting, waitfor,
103                    passthrough);
104            DefaultServiceProvider.setProvider(passivator);
105        }
106
107        void resetProvider() {
108            PassivateProvider passivator = (PassivateProvider) DefaultServiceProvider.getProvider();
109            ServiceProvider previous = passivator.passthrough instanceof DelegateProvider
110                    ? ((DelegateProvider) passivator.passthrough).next
111                    : null;
112            DefaultServiceProvider.setProvider(previous);
113
114        }
115
116        void commit() {
117            try {
118                DefaultServiceProvider.setProvider(installed.orElse(null));
119            } finally {
120                achieved.countDown();
121            }
122        }
123
124        TemporalAmount quietDelay = Duration.ofSeconds(5);
125
126        public Passivator withQuietDelay(TemporalAmount delay) {
127            quietDelay = delay;
128            return this;
129        }
130
131        public Passivator peek(Consumer<Passivator> consumer) {
132            consumer.accept(this);
133            return this;
134        }
135
136        public Monitor monitor() {
137            return new Monitor(this, quietDelay);
138        }
139
140        /**
141         * Snapshots service lookups and states about service scoping. *
142         */
143        public class Accounting {
144
145            /**
146             * Takes a snapshot of the lookup.
147             *
148             * @param typeof
149             * @return
150             */
151            Optional<InScopeOfContext> take(Class<?> serviceof) {
152                Class<?>[] callstack = dumper.dump();
153                Optional<InScopeOfContext> snapshot = inscopeof(callstack)
154                        .map(inscopeof -> new InScopeOfContext(inscopeof, Thread.currentThread(), callstack));
155                snapshot.ifPresent(this::register);
156                return snapshot;
157            }
158
159            void register(InScopeOfContext context) {
160                last = Optional.of(context);
161            }
162
163            volatile Optional<InScopeOfContext> last = Optional.empty();
164
165            public Optional<InScopeOfContext> get() {
166                return last;
167            }
168
169            Optional<InScopeOfContext> reset() {
170                try {
171                    return last;
172                } finally {
173                    last = Optional.empty();
174                }
175            }
176
177            Optional<Class<?>> inscopeof(Class<?>[] callstack) {
178                for (Class<?> typeof : callstack) {
179                    if (manager.getComponentProvidingService(typeof) != null) {
180                        return Optional.of(typeof);
181                    }
182                }
183                return Optional.empty();
184            }
185
186            final ComponentManager manager = Framework.getRuntime().getComponentManager();
187
188            final CallstackDumper dumper = new CallstackDumper();
189
190            /**
191             * Scoped service call context.
192             */
193            public class InScopeOfContext {
194
195                InScopeOfContext(Class<?> serviceof, Thread thread, Class<?>[] callstack) {
196                    this.serviceof = serviceof;
197                    this.thread = thread;
198                    this.callstack = callstack;
199                }
200
201                final Class<?> serviceof;
202
203                final Thread thread;
204
205                final Class<?>[] callstack;
206
207                @Override
208                public String toString() {
209                    StringBuilder builder = new StringBuilder().append("on ")
210                            .append(thread)
211                            .append(" in scope of ")
212                            .append(serviceof)
213                            .append(System.lineSeparator());
214                    for (Class<?> typeof : callstack) {
215                        builder = builder.append("  ").append(typeof).append(System.lineSeparator());
216                    }
217                    return builder.toString();
218                }
219            }
220
221            /**
222             * Dumps caller stack and states for a service scope
223             */
224            class CallstackDumper extends SecurityManager {
225
226                Class<?>[] dump() {
227                    return super.getClassContext();
228                }
229
230            }
231
232        }
233
234    }
235
236    /**
237     * Monitors service lookups for stating about quiet status.
238     */
239    public static class Monitor {
240
241        Monitor(Passivator passivator, TemporalAmount quietDelay) {
242            this.passivator = passivator;
243            this.quietDelay = quietDelay;
244            run();
245        }
246
247        final Passivator passivator;
248
249        final CountDownLatch passivated = new CountDownLatch(1);
250
251        final TemporalAmount quietDelay;
252
253        final Timer timer = new Timer(ServicePassivator.class.getSimpleName().toLowerCase());
254
255        final TimerTask scheduledTask = new TimerTask() {
256
257            @Override
258            public void run() {
259                Optional<InScopeOfContext> observed = passivator.accounting.reset();
260                if (observed.isPresent()) {
261                    return;
262                }
263                cancel();
264                passivated.countDown();
265            }
266        };
267
268        void run() {
269            long delay = TimeUnit.MILLISECONDS.convert(quietDelay.get(ChronoUnit.SECONDS), TimeUnit.SECONDS);
270            timer.scheduleAtFixedRate(
271                    scheduledTask,
272                    delay,
273                    delay);
274        }
275
276        /**
277         * Cancels service lookups monitoring.
278         */
279        void cancel() {
280            try {
281                timer.cancel();
282            } finally {
283                passivator.commit();
284            }
285        }
286
287        TemporalAmount timeout = Duration.ofSeconds(30);
288
289        public Monitor withTimeout(TemporalAmount timeout) {
290            this.timeout = timeout;
291            return this;
292        }
293
294        public Monitor peek(Consumer<Monitor> consumer) {
295            consumer.accept(this);
296            return this;
297        }
298
299        /**
300         * Installs the timer task which monitor the service lookups. Once there will be no more lookups in the
301         * scheduled period, notifies the runner for proceeding.
302         *
303         * @param next
304         * @return
305         */
306        public Waiter await() {
307            return new Waiter(this, timeout);
308        }
309
310    }
311
312    /**
313     * Terminates the chain by running the operation in a passivated context.
314     */
315    public static class Waiter {
316
317        Waiter(Monitor monitor, TemporalAmount timeout) {
318            this.monitor = monitor;
319            this.timeout = timeout;
320        }
321
322        final Monitor monitor;
323
324        final TemporalAmount timeout;
325
326        public Waiter peek(Consumer<Waiter> consumer) {
327            consumer.accept(this);
328            return this;
329        }
330
331        /**
332         * Terminates the chain by invoking the operation
333         * <ul>
334         * <li>waits for the runtime being passivated,</li>
335         * <li>and then runs the operation,</li>
336         * <li>and then notifies the blocked lookup to proceed.</li>
337         *
338         * @param runnable the runnable to execute
339         * @return the termination interface
340         */
341        public Termination proceed(Runnable runnable) {
342            try {
343                if (monitor.passivated.await(timeout.get(ChronoUnit.SECONDS), TimeUnit.SECONDS)) {
344                    runnable.run();
345                }
346                return monitor.passivator.accounting.last
347                        .<Termination> map(Failure::new)
348                        .orElseGet(Success::new);
349            } catch (InterruptedException cause) {
350                Thread.currentThread().interrupt();
351                throw new AssertionError("Interrupted while waiting for passivation", cause);
352            } finally {
353                monitor.cancel();
354            }
355        }
356
357    }
358
359    /**
360     * Terminates the pacification by a success or a failure action and release the lock.
361     */
362    public interface Termination {
363
364        /**
365         * Executes the runnable if the passivation was as success
366         *
367         * @param runnable
368         */
369        default Termination onSuccess(Runnable finisher) {
370            return this;
371        }
372
373        /**
374         * Recover the failure if the passivation was a failure, ie: some activity has been detected during the
375         * protected operation.
376         *
377         * @param runnable the failure action
378         */
379        default Termination onFailure(Consumer<InScopeOfContext> recoverer) {
380            return this;
381        }
382
383        default Termination peek(Consumer<Termination> consumer) {
384            consumer.accept(this);
385            return this;
386        }
387
388        class Success implements Termination {
389
390            @Override
391            public Termination onSuccess(Runnable finisher) {
392                finisher.run();
393                return this;
394            }
395
396        }
397
398        class Failure implements Termination {
399
400            Failure(InScopeOfContext snapshot) {
401                this.snapshot = snapshot;
402            }
403
404            final InScopeOfContext snapshot;
405
406            @Override
407            public Termination onFailure(Consumer<InScopeOfContext> recoverer) {
408                recoverer.accept(snapshot);
409                return this;
410            }
411        }
412    }
413
414    /**
415     * Intercepts service lookups for blocking other threads.
416     */
417    static class PassivateProvider implements ServiceProvider {
418
419        PassivateProvider(Thread ownerThread, Accounting accounting, ServiceProvider waitfor,
420                ServiceProvider passthrough) {
421            this.ownerThread = ownerThread;
422            this.accounting = accounting;
423            this.waitfor = waitfor;
424            this.passthrough = passthrough;
425        }
426
427        final Thread ownerThread;
428
429        final Accounting accounting;
430
431        final ServiceProvider passthrough;
432
433        final ServiceProvider waitfor;
434
435        @Override
436        public <T> T getService(Class<T> typeof) {
437            if (Thread.currentThread() == ownerThread) {
438                return passthrough.getService(typeof);
439            }
440            return accounting
441                    .take(typeof)
442                    .map(snapshot -> passthrough)
443                    .orElse(waitfor)
444                    .getService(typeof);
445        }
446    }
447
448    /**
449     * Delegates the lookup to the previously installed service provider.
450     */
451    static class DelegateProvider implements ServiceProvider {
452
453        DelegateProvider(ServiceProvider provider) {
454            next = provider;
455        }
456
457        final ServiceProvider next;
458
459        @Override
460        public <T> T getService(Class<T> serviceClass) {
461            return next.getService(serviceClass);
462        }
463
464    }
465
466    /**
467     * Let runtime resolve the service.
468     */
469    static class RuntimeProvider implements ServiceProvider {
470
471        @Override
472        public <T> T getService(Class<T> serviceClass) {
473            return Framework.getRuntime().getService(serviceClass);
474        }
475
476    }
477
478    /**
479     * Waits for the condition before invoking the effective lookup.
480     */
481    static class WaitForProvider implements ServiceProvider {
482
483        WaitForProvider(CountDownLatch condition, ServiceProvider passthrough) {
484            this.condition = condition;
485            this.passthrough = passthrough;
486        }
487
488        final CountDownLatch condition;
489
490        final ServiceProvider passthrough;
491
492        @Override
493        public <T> T getService(Class<T> serviceClass) {
494            try {
495                condition.await();
496            } catch (InterruptedException error) {
497                Thread.currentThread().interrupt();
498                throw new AssertionError("Interrupted while waiting for " + serviceClass);
499            }
500            return passthrough.getService(serviceClass);
501        }
502
503    }
504
505}