001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 *
017 * Contributors:
018 *     bdelbosc
019 */
020package org.nuxeo.runtime.stream;
021
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.TimeUnit;
028
029import org.nuxeo.common.xmap.annotation.XNode;
030import org.nuxeo.common.xmap.annotation.XNodeList;
031import org.nuxeo.common.xmap.annotation.XNodeMap;
032import org.nuxeo.common.xmap.annotation.XObject;
033import org.nuxeo.lib.stream.StreamRuntimeException;
034import org.nuxeo.lib.stream.computation.ComputationPolicy;
035import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
036import org.nuxeo.lib.stream.computation.RecordFilter;
037import org.nuxeo.runtime.api.Framework;
038import org.nuxeo.runtime.model.Descriptor;
039
040import net.jodah.failsafe.RetryPolicy;
041
042@XObject("streamProcessor")
043public class StreamProcessorDescriptor implements Descriptor {
044
045    // @since 11.1
046    public static String RECOVERY_SKIP_FIRST_FAILURES_OPTION = "nuxeo.stream.recovery.skipFirstFailures";
047
048    // @since 11.1
049    @XNode("@enabled")
050    protected boolean isEnabled = true;
051
052    // @since 11.1
053    @XNode("@start")
054    protected boolean start = true;
055
056    @XObject(value = "computation")
057    public static class ComputationDescriptor implements Descriptor {
058
059        @XNode("@name")
060        public String name;
061
062        @XNode("@concurrency")
063        public Integer concurrency = DEFAULT_CONCURRENCY;
064
065        @Override
066        public String getId() {
067            return name;
068        }
069    }
070
071    @XObject(value = "filter")
072    public static class FilterDescriptor implements Descriptor {
073
074        @XNode("@name")
075        public String name;
076
077        @Override
078        public String getId() {
079            return name;
080        }
081
082        @XNode("@class")
083        public Class<? extends RecordFilter> klass;
084
085        @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class)
086        public Map<String, String> options = new HashMap<>();
087
088        public RecordFilter getFilter() {
089            if (!RecordFilter.class.isAssignableFrom(klass)) {
090                throw new IllegalArgumentException("Cannot create filter: " + getId() + " for stream: " + this.getId()
091                        + ", class must implement Filter");
092            }
093            try {
094                RecordFilter ret = klass.getDeclaredConstructor().newInstance();
095                ret.init(options);
096                return ret;
097            } catch (ReflectiveOperationException e) {
098                throw new StreamRuntimeException("Cannot create filter: " + getId(), e);
099            }
100        }
101
102    }
103
104    @XObject(value = "stream")
105    public static class StreamDescriptor implements Descriptor {
106
107        @XNode("@name")
108        public String name;
109
110        @XNode("@partitions")
111        public Integer partitions;
112
113        @XNode("@codec")
114        public String codec;
115
116        // Stream is initialized outside of the processor
117        // @since 11.1
118        @XNode("@external")
119        public Boolean external;
120
121        @XNodeList(value = "filter", type = ArrayList.class, componentType = FilterDescriptor.class)
122        public List<FilterDescriptor> filters = new ArrayList<>();
123
124        @Override
125        public String getId() {
126            return name;
127        }
128    }
129
130    @XObject(value = "policy")
131    public static class PolicyDescriptor implements Descriptor {
132        public static final int DEFAULT_MAX_RETRIES = 0;
133
134        public static final Duration DEFAULT_DELAY = Duration.ofSeconds(1);
135
136        public static final Duration DEFAULT_MAX_DELAY = Duration.ofSeconds(10);
137
138        public static final Integer DEFAULT_BATCH_CAPACITY = 1;
139
140        public static final Duration DEFAULT_BATCH_THRESHOLD = Duration.ofSeconds(1);
141
142        @XNode("@name")
143        public String name;
144
145        @XNode("@maxRetries")
146        public Integer maxRetries = DEFAULT_MAX_RETRIES;
147
148        @XNode("@delay")
149        public Duration delay = DEFAULT_DELAY;
150
151        @XNode("@maxDelay")
152        public Duration maxDelay = DEFAULT_MAX_DELAY;
153
154        @XNode("@continueOnFailure")
155        public Boolean continueOnFailure = Boolean.FALSE;
156
157        // @since 11.1 can be used for recovery in order to skip the n first failures
158        @XNode("@skipFirstFailures")
159        public Integer skipFirstFailures = 0;
160
161        @Override
162        public String getId() {
163            return name;
164        }
165
166        // To provide a custom retry policy
167        @XNode("@class")
168        public Class<? extends StreamComputationPolicy> klass;
169
170        // Batch policy only used for computation that extends AbstractBatchComputation
171        @XNode("@batchCapacity")
172        public Integer batchCapacity = DEFAULT_BATCH_CAPACITY;
173
174        @XNode("@batchThreshold")
175        public Duration batchThreshold = DEFAULT_BATCH_THRESHOLD;
176
177        protected int getSkipFirstFailures() {
178            return Integer.parseInt(
179                    Framework.getProperty(RECOVERY_SKIP_FIRST_FAILURES_OPTION, Integer.toString(skipFirstFailures)));
180        }
181
182        public ComputationPolicyBuilder createPolicyBuilder() {
183            RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(maxRetries)
184                                                       .withBackoff(delay.toMillis(), maxDelay.toMillis(),
185                                                               TimeUnit.MILLISECONDS);
186            return new ComputationPolicyBuilder().retryPolicy(retryPolicy)
187                                                 .batchPolicy(batchCapacity, batchThreshold)
188                                                 .continueOnFailure(continueOnFailure)
189                                                 .skipFirstFailures(getSkipFirstFailures());
190        }
191    }
192
193    public static final Integer DEFAULT_CONCURRENCY = 4;
194
195    @XNode("@name")
196    public String name;
197
198    @XNode("@logConfig")
199    public String config;
200
201    @XNode("@class")
202    public Class<? extends StreamProcessorTopology> klass;
203
204    @XNode("@defaultConcurrency")
205    public Integer defaultConcurrency = DEFAULT_CONCURRENCY;
206
207    @XNode("@defaultPartitions")
208    public Integer defaultPartitions = DEFAULT_CONCURRENCY;
209
210    @XNode("@defaultCodec")
211    public String defaultCodec;
212
213    // Default for streams's external property
214    // @since 11.1
215    @XNode("@defaultExternal")
216    public boolean defaultExternal;
217
218    @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class)
219    public Map<String, String> options = new HashMap<>();
220
221    @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class)
222    public List<ComputationDescriptor> computations = new ArrayList<>();
223
224    @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class)
225    public List<StreamDescriptor> streams = new ArrayList<>();
226
227    @XNodeList(value = "policy", type = ArrayList.class, componentType = PolicyDescriptor.class)
228    public List<PolicyDescriptor> policies = new ArrayList<>();
229
230    protected ComputationPolicy defaultPolicy;
231
232    public ComputationPolicy getPolicy(String computationName) {
233        PolicyDescriptor policyDescriptor = policies.stream()
234                                                    .filter(policy -> computationName.equals(policy.getId()))
235                                                    .findFirst()
236                                                    .orElse(null);
237        if (policyDescriptor != null) {
238            return getComputationPolicy(policyDescriptor);
239        }
240        return null;
241    }
242
243    protected ComputationPolicy getComputationPolicy(PolicyDescriptor policyDescriptor) {
244        if (policyDescriptor.klass != null) {
245            if (!StreamComputationPolicy.class.isAssignableFrom(policyDescriptor.klass)) {
246                throw new IllegalArgumentException("Cannot create policy: " + policyDescriptor.getId()
247                        + " for processor: " + this.getId() + ", class must implement StreamComputationPolicy");
248            }
249            try {
250                return policyDescriptor.klass.getDeclaredConstructor().newInstance().getPolicy(policyDescriptor);
251            } catch (ReflectiveOperationException e) {
252                throw new StreamRuntimeException(
253                        "Cannot create policy: " + policyDescriptor.getId() + " for processor: " + this.getId(), e);
254            }
255        }
256        return new DefaultNuxeoComputationPolicy().getPolicy(policyDescriptor);
257    }
258
259    public ComputationPolicy getDefaultPolicy() {
260        if (defaultPolicy == null) {
261            PolicyDescriptor policyDescriptor = policies.stream()
262                                                        .filter(policy -> "default".equals(policy.getId()))
263                                                        .findFirst()
264                                                        .orElse(null);
265            if (policyDescriptor == null) {
266                defaultPolicy = ComputationPolicy.NONE;
267            } else {
268                defaultPolicy = getComputationPolicy(policyDescriptor);
269            }
270        }
271        return defaultPolicy;
272    }
273
274    // @since 11.1
275    public boolean isEnabled() {
276        return isEnabled;
277    }
278
279    // @since 11.1
280    public void setEnabled(boolean isEnabled) {
281        this.isEnabled = isEnabled;
282    }
283
284    @Override
285    public String getId() {
286        return name;
287    }
288
289    // @since 11.1
290    public boolean isStart() {
291        return start;
292    }
293
294    // @since 11.1
295    public void setStart(boolean start) {
296        this.start = start;
297    }
298}