001/* 002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * 017 * Contributors: 018 * bdelbosc 019 */ 020package org.nuxeo.runtime.stream; 021 022import java.time.Duration; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.TimeUnit; 028 029import org.nuxeo.common.xmap.annotation.XNode; 030import org.nuxeo.common.xmap.annotation.XNodeList; 031import org.nuxeo.common.xmap.annotation.XNodeMap; 032import org.nuxeo.common.xmap.annotation.XObject; 033import org.nuxeo.lib.stream.StreamRuntimeException; 034import org.nuxeo.lib.stream.computation.ComputationPolicy; 035import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder; 036import org.nuxeo.runtime.model.Descriptor; 037 038import net.jodah.failsafe.RetryPolicy; 039 040@XObject("streamProcessor") 041public class StreamProcessorDescriptor implements Descriptor { 042 043 @XObject(value = "computation") 044 public static class ComputationDescriptor implements Descriptor { 045 046 @XNode("@name") 047 public String name; 048 049 @XNode("@concurrency") 050 public Integer concurrency = DEFAULT_CONCURRENCY; 051 052 @Override 053 public String getId() { 054 return name; 055 } 056 } 057 058 @XObject(value = "stream") 059 public static class StreamDescriptor implements Descriptor { 060 061 @XNode("@name") 062 public String name; 063 064 @XNode("@partitions") 065 public Integer partitions = DEFAULT_CONCURRENCY; 066 067 @XNode("@codec") 068 public String codec; 069 070 @Override 071 public String getId() { 072 return name; 073 } 074 075 } 076 077 @XObject(value = "policy") 078 public static class PolicyDescriptor implements Descriptor { 079 public static final int DEFAULT_MAX_RETRIES = 0; 080 081 public static final Duration DEFAULT_DELAY = Duration.ofSeconds(1); 082 083 public static final Duration DEFAULT_MAX_DELAY = Duration.ofSeconds(10); 084 085 public static final Integer DEFAULT_BATCH_CAPACITY = 1; 086 087 public static final Duration DEFAULT_BATCH_THRESHOLD = Duration.ofSeconds(1); 088 089 @XNode("@name") 090 public String name; 091 092 @XNode("@maxRetries") 093 public Integer maxRetries = DEFAULT_MAX_RETRIES; 094 095 @XNode("@delay") 096 public Duration delay = DEFAULT_DELAY; 097 098 @XNode("@maxDelay") 099 public Duration maxDelay = DEFAULT_MAX_DELAY; 100 101 @XNode("@continueOnFailure") 102 public Boolean continueOnFailure = Boolean.FALSE; 103 104 @Override 105 public String getId() { 106 return name; 107 } 108 109 // To provide a custom retry policy 110 @XNode("@class") 111 public Class<? extends StreamComputationPolicy> klass; 112 113 // Batch policy only used for computation that extends AbstractBatchComputation 114 @XNode("@batchCapacity") 115 public Integer batchCapacity = DEFAULT_BATCH_CAPACITY; 116 117 @XNode("@batchThreshold") 118 public Duration batchThreshold = DEFAULT_BATCH_THRESHOLD; 119 120 } 121 122 public static final Integer DEFAULT_CONCURRENCY = 4; 123 124 @XNode("@name") 125 public String name; 126 127 @XNode("@logConfig") 128 public String config; 129 130 @XNode("@class") 131 public Class<? extends StreamProcessorTopology> klass; 132 133 @XNode("@defaultConcurrency") 134 public Integer defaultConcurrency = DEFAULT_CONCURRENCY; 135 136 @XNode("@defaultPartitions") 137 public Integer defaultPartitions = DEFAULT_CONCURRENCY; 138 139 @XNode("@defaultCodec") 140 public String defaultCodec; 141 142 @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class) 143 public Map<String, String> options = new HashMap<>(); 144 145 @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class) 146 public List<ComputationDescriptor> computations = new ArrayList<>(); 147 148 @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class) 149 public List<StreamDescriptor> streams = new ArrayList<>(); 150 151 @XNodeList(value = "policy", type = ArrayList.class, componentType = PolicyDescriptor.class) 152 public List<PolicyDescriptor> policies = new ArrayList<>(); 153 154 protected ComputationPolicy defaultPolicy; 155 156 public ComputationPolicy getPolicy(String computationName) { 157 PolicyDescriptor policyDescriptor = policies.stream() 158 .filter(policy -> computationName.equals(policy.getId())) 159 .findFirst() 160 .orElse(null); 161 if (policyDescriptor != null) { 162 return getComputationPolicy(policyDescriptor); 163 } 164 return null; 165 } 166 167 protected ComputationPolicy getComputationPolicy(PolicyDescriptor policyDescriptor) { 168 if (policyDescriptor.klass != null) { 169 if (!StreamComputationPolicy.class.isAssignableFrom(policyDescriptor.klass)) { 170 throw new IllegalArgumentException("Cannot create policy: " + policyDescriptor.getId() 171 + " for processor: " + this.getId() + ", class must implement StreamComputationPolicy"); 172 } 173 try { 174 return policyDescriptor.klass.getDeclaredConstructor().newInstance().getPolicy(policyDescriptor); 175 } catch (ReflectiveOperationException e) { 176 throw new StreamRuntimeException( 177 "Cannot create policy: " + policyDescriptor.getId() + " for processor: " + this.getId(), e); 178 } 179 } 180 RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(policyDescriptor.maxRetries) 181 .withBackoff(policyDescriptor.delay.toMillis(), 182 policyDescriptor.maxDelay.toMillis(), TimeUnit.MILLISECONDS); 183 return new ComputationPolicyBuilder().retryPolicy(retryPolicy) 184 .batchPolicy(policyDescriptor.batchCapacity, 185 policyDescriptor.batchThreshold) 186 .continueOnFailure(policyDescriptor.continueOnFailure) 187 .build(); 188 } 189 190 public ComputationPolicy getDefaultPolicy() { 191 if (defaultPolicy == null) { 192 PolicyDescriptor policyDescriptor = policies.stream() 193 .filter(policy -> "default".equals(policy.getId())) 194 .findFirst() 195 .orElse(null); 196 if (policyDescriptor == null) { 197 defaultPolicy = ComputationPolicy.NONE; 198 } else { 199 defaultPolicy = getComputationPolicy(policyDescriptor); 200 } 201 } 202 return defaultPolicy; 203 } 204 205 @Override 206 public String getId() { 207 return name; 208 } 209 210}