001/*
002 * (C) Copyright 2019 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.runtime.stream;
020
021import static org.nuxeo.lib.stream.computation.log.ComputationRunner.GLOBAL_FAILURE_COUNT_REGISTRY_NAME;
022import static org.nuxeo.lib.stream.computation.log.ComputationRunner.NUXEO_METRICS_REGISTRY_NAME;
023
024import java.time.Duration;
025import java.time.Instant;
026import java.time.ZoneOffset;
027import java.time.format.DateTimeFormatter;
028
029import org.nuxeo.runtime.api.Framework;
030import org.nuxeo.runtime.management.api.Probe;
031import org.nuxeo.runtime.management.api.ProbeStatus;
032import org.nuxeo.runtime.services.config.ConfigurationService;
033
034import io.dropwizard.metrics5.Counter;
035import io.dropwizard.metrics5.MetricRegistry;
036import io.dropwizard.metrics5.SharedMetricRegistries;
037
038/**
039 * A probe to detect when computation has been terminated due to failure. A delay is applied before returning the
040 * failure code.
041 *
042 * @since 11.1
043 */
044public class StreamProbe implements Probe {
045
046    public static final String STREAM_PROBE_DELAY_PROPERTY = "nuxeo.stream.health.check.delay";
047
048    public static final Duration STREAM_PROBE_DELAY_DEFAULT = Duration.ofHours(36);
049
050    protected static final String FAILURE_MESSAGE = "%d computations have been terminated after failure. "
051            + "First failure detected: %s, probe failure delayed by %s. "
052            + "This Nuxeo instance must be restarted within the stream retention period.";
053
054    protected Counter globalFailureCount;
055
056    protected Long detected;
057
058    protected Duration timeout;
059
060    protected static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ssz")
061                                                                          .withZone(ZoneOffset.UTC);
062
063    @Override
064    public ProbeStatus run() {
065        long failures = getFailures();
066        if (failures == 0) {
067            return ProbeStatus.newSuccess("No failure");
068        }
069        String dateFailure = FORMATTER.format(Instant.ofEpochMilli(detected));
070        String message = String.format(FAILURE_MESSAGE, failures, dateFailure, getTimeout());
071        if (System.currentTimeMillis() - detected < getTimeout().toMillis()) {
072            // Failure is delayed
073            return ProbeStatus.newSuccess(message);
074        }
075        return ProbeStatus.newFailure(message);
076    }
077
078    protected Duration getTimeout() {
079        if (timeout == null) {
080            ConfigurationService confService = Framework.getService(ConfigurationService.class);
081            timeout = confService.getDuration(STREAM_PROBE_DELAY_PROPERTY, STREAM_PROBE_DELAY_DEFAULT);
082        }
083        return timeout;
084    }
085
086    protected long getFailures() {
087        long failures = getCounter().getCount();
088        if (failures > 0 && detected == null) {
089            detected = System.currentTimeMillis();
090        }
091        return failures;
092    }
093
094    protected Counter getCounter() {
095        if (globalFailureCount == null) {
096            MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);
097            globalFailureCount = registry.counter(GLOBAL_FAILURE_COUNT_REGISTRY_NAME);
098        }
099        return globalFailureCount;
100    }
101
102    /**
103     * Reset failure counter for testing purpose.
104     *
105     * @since 11.1
106     */
107    public void reset() {
108        long count = getCounter().getCount();
109        if (count > 0) {
110            getCounter().dec(count);
111        }
112        detected = null;
113    }
114}