001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * 017 * Contributors: 018 * bdelbosc 019 */ 020package org.nuxeo.runtime.stream; 021 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.TimeUnit; 028 029import org.nuxeo.common.xmap.annotation.XNode; 030import org.nuxeo.common.xmap.annotation.XNodeList; 031import org.nuxeo.common.xmap.annotation.XNodeMap; 032import org.nuxeo.common.xmap.annotation.XObject; 033import org.nuxeo.lib.stream.StreamRuntimeException; 034import org.nuxeo.lib.stream.computation.ComputationPolicy; 035import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder; 036import org.nuxeo.lib.stream.computation.RecordFilter; 037import org.nuxeo.runtime.api.Framework; 038import org.nuxeo.runtime.model.Descriptor; 039 040import net.jodah.failsafe.RetryPolicy; 041 042@XObject("streamProcessor") 043public class StreamProcessorDescriptor implements Descriptor { 044 045 // @since 11.1 046 public static String RECOVERY_SKIP_FIRST_FAILURES_OPTION = "nuxeo.stream.recovery.skipFirstFailures"; 047 048 // @since 11.1 049 @XNode("@enabled") 050 protected boolean isEnabled = true; 051 052 // @since 11.1 053 @XNode("@start") 054 protected boolean start = true; 055 056 @XObject(value = "computation") 057 public static class ComputationDescriptor implements Descriptor { 058 059 @XNode("@name") 060 public String name; 061 062 @XNode("@concurrency") 063 public Integer concurrency = DEFAULT_CONCURRENCY; 064 065 @Override 066 public String getId() { 067 return name; 068 } 069 } 070 071 @XObject(value = "filter") 072 public static class FilterDescriptor implements Descriptor { 073 074 @XNode("@name") 075 public String name; 076 077 @Override 078 public String getId() { 079 return name; 080 } 081 082 @XNode("@class") 083 public Class<? extends RecordFilter> klass; 084 085 @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class) 086 public Map<String, String> options = new HashMap<>(); 087 088 public RecordFilter getFilter() { 089 if (!RecordFilter.class.isAssignableFrom(klass)) { 090 throw new IllegalArgumentException("Cannot create filter: " + getId() + " for stream: " + this.getId() 091 + ", class must implement Filter"); 092 } 093 try { 094 RecordFilter ret = klass.getDeclaredConstructor().newInstance(); 095 ret.init(options); 096 return ret; 097 } catch (ReflectiveOperationException e) { 098 throw new StreamRuntimeException("Cannot create filter: " + getId(), e); 099 } 100 } 101 102 } 103 104 @XObject(value = "stream") 105 public static class StreamDescriptor implements Descriptor { 106 107 @XNode("@name") 108 public String name; 109 110 @XNode("@partitions") 111 public Integer partitions; 112 113 @XNode("@codec") 114 public String codec; 115 116 // Stream is initialized outside of the processor 117 // @since 11.1 118 @XNode("@external") 119 public Boolean external; 120 121 @XNodeList(value = "filter", type = ArrayList.class, componentType = FilterDescriptor.class) 122 public List<FilterDescriptor> filters = new ArrayList<>(); 123 124 @Override 125 public String getId() { 126 return name; 127 } 128 } 129 130 @XObject(value = "policy") 131 public static class PolicyDescriptor implements Descriptor { 132 public static final int DEFAULT_MAX_RETRIES = 0; 133 134 public static final Duration DEFAULT_DELAY = Duration.ofSeconds(1); 135 136 public static final Duration DEFAULT_MAX_DELAY = Duration.ofSeconds(10); 137 138 public static final Integer DEFAULT_BATCH_CAPACITY = 1; 139 140 public static final Duration DEFAULT_BATCH_THRESHOLD = Duration.ofSeconds(1); 141 142 @XNode("@name") 143 public String name; 144 145 @XNode("@maxRetries") 146 public Integer maxRetries = DEFAULT_MAX_RETRIES; 147 148 @XNode("@delay") 149 public Duration delay = DEFAULT_DELAY; 150 151 @XNode("@maxDelay") 152 public Duration maxDelay = DEFAULT_MAX_DELAY; 153 154 @XNode("@continueOnFailure") 155 public Boolean continueOnFailure = Boolean.FALSE; 156 157 // @since 11.1 can be used for recovery in order to skip the n first failures 158 @XNode("@skipFirstFailures") 159 public Integer skipFirstFailures = 0; 160 161 @Override 162 public String getId() { 163 return name; 164 } 165 166 // To provide a custom retry policy 167 @XNode("@class") 168 public Class<? extends StreamComputationPolicy> klass; 169 170 // Batch policy only used for computation that extends AbstractBatchComputation 171 @XNode("@batchCapacity") 172 public Integer batchCapacity = DEFAULT_BATCH_CAPACITY; 173 174 @XNode("@batchThreshold") 175 public Duration batchThreshold = DEFAULT_BATCH_THRESHOLD; 176 177 protected int getSkipFirstFailures() { 178 return Integer.parseInt( 179 Framework.getProperty(RECOVERY_SKIP_FIRST_FAILURES_OPTION, Integer.toString(skipFirstFailures))); 180 } 181 182 public ComputationPolicyBuilder createPolicyBuilder() { 183 RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(maxRetries) 184 .withBackoff(delay.toMillis(), maxDelay.toMillis(), 185 TimeUnit.MILLISECONDS); 186 return new ComputationPolicyBuilder().retryPolicy(retryPolicy) 187 .batchPolicy(batchCapacity, batchThreshold) 188 .continueOnFailure(continueOnFailure) 189 .skipFirstFailures(getSkipFirstFailures()); 190 } 191 } 192 193 public static final Integer DEFAULT_CONCURRENCY = 4; 194 195 @XNode("@name") 196 public String name; 197 198 @XNode("@logConfig") 199 public String config; 200 201 @XNode("@class") 202 public Class<? extends StreamProcessorTopology> klass; 203 204 @XNode("@defaultConcurrency") 205 public Integer defaultConcurrency = DEFAULT_CONCURRENCY; 206 207 @XNode("@defaultPartitions") 208 public Integer defaultPartitions = DEFAULT_CONCURRENCY; 209 210 @XNode("@defaultCodec") 211 public String defaultCodec; 212 213 // Default for streams's external property 214 // @since 11.1 215 @XNode("@defaultExternal") 216 public boolean defaultExternal; 217 218 @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class) 219 public Map<String, String> options = new HashMap<>(); 220 221 @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class) 222 public List<ComputationDescriptor> computations = new ArrayList<>(); 223 224 @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class) 225 public List<StreamDescriptor> streams = new ArrayList<>(); 226 227 @XNodeList(value = "policy", type = ArrayList.class, componentType = PolicyDescriptor.class) 228 public List<PolicyDescriptor> policies = new ArrayList<>(); 229 230 protected ComputationPolicy defaultPolicy; 231 232 public ComputationPolicy getPolicy(String computationName) { 233 PolicyDescriptor policyDescriptor = policies.stream() 234 .filter(policy -> computationName.equals(policy.getId())) 235 .findFirst() 236 .orElse(null); 237 if (policyDescriptor != null) { 238 return getComputationPolicy(policyDescriptor); 239 } 240 return null; 241 } 242 243 protected ComputationPolicy getComputationPolicy(PolicyDescriptor policyDescriptor) { 244 if (policyDescriptor.klass != null) { 245 if (!StreamComputationPolicy.class.isAssignableFrom(policyDescriptor.klass)) { 246 throw new IllegalArgumentException("Cannot create policy: " + policyDescriptor.getId() 247 + " for processor: " + this.getId() + ", class must implement StreamComputationPolicy"); 248 } 249 try { 250 return policyDescriptor.klass.getDeclaredConstructor().newInstance().getPolicy(policyDescriptor); 251 } catch (ReflectiveOperationException e) { 252 throw new StreamRuntimeException( 253 "Cannot create policy: " + policyDescriptor.getId() + " for processor: " + this.getId(), e); 254 } 255 } 256 return new DefaultNuxeoComputationPolicy().getPolicy(policyDescriptor); 257 } 258 259 public ComputationPolicy getDefaultPolicy() { 260 if (defaultPolicy == null) { 261 PolicyDescriptor policyDescriptor = policies.stream() 262 .filter(policy -> "default".equals(policy.getId())) 263 .findFirst() 264 .orElse(null); 265 if (policyDescriptor == null) { 266 defaultPolicy = ComputationPolicy.NONE; 267 } else { 268 defaultPolicy = getComputationPolicy(policyDescriptor); 269 } 270 } 271 return defaultPolicy; 272 } 273 274 // @since 11.1 275 public boolean isEnabled() { 276 return isEnabled; 277 } 278 279 // @since 11.1 280 public void setEnabled(boolean isEnabled) { 281 this.isEnabled = isEnabled; 282 } 283 284 @Override 285 public String getId() { 286 return name; 287 } 288 289 // @since 11.1 290 public boolean isStart() { 291 return start; 292 } 293 294 // @since 11.1 295 public void setStart(boolean start) { 296 this.start = start; 297 } 298}