001/* 002 * (C) Copyright 2019 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.runtime.stream; 020 021import static org.nuxeo.lib.stream.computation.log.ComputationRunner.GLOBAL_FAILURE_COUNT_REGISTRY_NAME; 022import static org.nuxeo.lib.stream.computation.log.ComputationRunner.NUXEO_METRICS_REGISTRY_NAME; 023 024import java.time.Duration; 025import java.time.Instant; 026import java.time.ZoneOffset; 027import java.time.format.DateTimeFormatter; 028 029import org.nuxeo.runtime.api.Framework; 030import org.nuxeo.runtime.management.api.Probe; 031import org.nuxeo.runtime.management.api.ProbeStatus; 032import org.nuxeo.runtime.services.config.ConfigurationService; 033 034import io.dropwizard.metrics5.Counter; 035import io.dropwizard.metrics5.MetricRegistry; 036import io.dropwizard.metrics5.SharedMetricRegistries; 037 038/** 039 * A probe to detect when computation has been terminated due to failure. A delay is applied before returning the 040 * failure code. 041 * 042 * @since 11.1 043 */ 044public class StreamProbe implements Probe { 045 046 public static final String STREAM_PROBE_DELAY_PROPERTY = "nuxeo.stream.health.check.delay"; 047 048 public static final Duration STREAM_PROBE_DELAY_DEFAULT = Duration.ofHours(36); 049 050 protected static final String FAILURE_MESSAGE = "%d computations have been terminated after failure. " 051 + "First failure detected: %s, probe failure delayed by %s. " 052 + "This Nuxeo instance must be restarted within the stream retention period."; 053 054 protected Counter globalFailureCount; 055 056 protected Long detected; 057 058 protected Duration timeout; 059 060 protected static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ssz") 061 .withZone(ZoneOffset.UTC); 062 063 @Override 064 public ProbeStatus run() { 065 long failures = getFailures(); 066 if (failures == 0) { 067 return ProbeStatus.newSuccess("No failure"); 068 } 069 String dateFailure = FORMATTER.format(Instant.ofEpochMilli(detected)); 070 String message = String.format(FAILURE_MESSAGE, failures, dateFailure, getTimeout()); 071 if (System.currentTimeMillis() - detected < getTimeout().toMillis()) { 072 // Failure is delayed 073 return ProbeStatus.newSuccess(message); 074 } 075 return ProbeStatus.newFailure(message); 076 } 077 078 protected Duration getTimeout() { 079 if (timeout == null) { 080 ConfigurationService confService = Framework.getService(ConfigurationService.class); 081 timeout = confService.getDuration(STREAM_PROBE_DELAY_PROPERTY, STREAM_PROBE_DELAY_DEFAULT); 082 } 083 return timeout; 084 } 085 086 protected long getFailures() { 087 long failures = getCounter().getCount(); 088 if (failures > 0 && detected == null) { 089 detected = System.currentTimeMillis(); 090 } 091 return failures; 092 } 093 094 protected Counter getCounter() { 095 if (globalFailureCount == null) { 096 MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME); 097 globalFailureCount = registry.counter(GLOBAL_FAILURE_COUNT_REGISTRY_NAME); 098 } 099 return globalFailureCount; 100 } 101 102 /** 103 * Reset failure counter for testing purpose. 104 * 105 * @since 11.1 106 */ 107 public void reset() { 108 long count = getCounter().getCount(); 109 if (count > 0) { 110 getCounter().dec(count); 111 } 112 detected = null; 113 } 114}