001/*
002 * (C) Copyright 2017-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 *
017 * Contributors:
018 *     bdelbosc
019 */
020package org.nuxeo.runtime.stream;
021
022import java.time.Duration;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.TimeUnit;
028
029import org.nuxeo.common.xmap.annotation.XNode;
030import org.nuxeo.common.xmap.annotation.XNodeList;
031import org.nuxeo.common.xmap.annotation.XNodeMap;
032import org.nuxeo.common.xmap.annotation.XObject;
033import org.nuxeo.lib.stream.StreamRuntimeException;
034import org.nuxeo.lib.stream.computation.ComputationPolicy;
035import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
036import org.nuxeo.runtime.model.Descriptor;
037
038import net.jodah.failsafe.RetryPolicy;
039
040@XObject("streamProcessor")
041public class StreamProcessorDescriptor implements Descriptor {
042
043    @XObject(value = "computation")
044    public static class ComputationDescriptor implements Descriptor {
045
046        @XNode("@name")
047        public String name;
048
049        @XNode("@concurrency")
050        public Integer concurrency = DEFAULT_CONCURRENCY;
051
052        @Override
053        public String getId() {
054            return name;
055        }
056    }
057
058    @XObject(value = "stream")
059    public static class StreamDescriptor implements Descriptor {
060
061        @XNode("@name")
062        public String name;
063
064        @XNode("@partitions")
065        public Integer partitions = DEFAULT_CONCURRENCY;
066
067        @XNode("@codec")
068        public String codec;
069
070        @Override
071        public String getId() {
072            return name;
073        }
074
075    }
076
077    @XObject(value = "policy")
078    public static class PolicyDescriptor implements Descriptor {
079        public static final int DEFAULT_MAX_RETRIES = 0;
080
081        public static final Duration DEFAULT_DELAY = Duration.ofSeconds(1);
082
083        public static final Duration DEFAULT_MAX_DELAY = Duration.ofSeconds(10);
084
085        public static final Integer DEFAULT_BATCH_CAPACITY = 1;
086
087        public static final Duration DEFAULT_BATCH_THRESHOLD = Duration.ofSeconds(1);
088
089        @XNode("@name")
090        public String name;
091
092        @XNode("@maxRetries")
093        public Integer maxRetries = DEFAULT_MAX_RETRIES;
094
095        @XNode("@delay")
096        public Duration delay = DEFAULT_DELAY;
097
098        @XNode("@maxDelay")
099        public Duration maxDelay = DEFAULT_MAX_DELAY;
100
101        @XNode("@continueOnFailure")
102        public Boolean continueOnFailure = Boolean.FALSE;
103
104        @Override
105        public String getId() {
106            return name;
107        }
108
109        // To provide a custom retry policy
110        @XNode("@class")
111        public Class<? extends StreamComputationPolicy> klass;
112
113        // Batch policy only used for computation that extends AbstractBatchComputation
114        @XNode("@batchCapacity")
115        public Integer batchCapacity = DEFAULT_BATCH_CAPACITY;
116
117        @XNode("@batchThreshold")
118        public Duration batchThreshold = DEFAULT_BATCH_THRESHOLD;
119
120    }
121
122    public static final Integer DEFAULT_CONCURRENCY = 4;
123
124    @XNode("@name")
125    public String name;
126
127    @XNode("@logConfig")
128    public String config;
129
130    @XNode("@class")
131    public Class<? extends StreamProcessorTopology> klass;
132
133    @XNode("@defaultConcurrency")
134    public Integer defaultConcurrency = DEFAULT_CONCURRENCY;
135
136    @XNode("@defaultPartitions")
137    public Integer defaultPartitions = DEFAULT_CONCURRENCY;
138
139    @XNode("@defaultCodec")
140    public String defaultCodec;
141
142    @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class)
143    public Map<String, String> options = new HashMap<>();
144
145    @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class)
146    public List<ComputationDescriptor> computations = new ArrayList<>();
147
148    @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class)
149    public List<StreamDescriptor> streams = new ArrayList<>();
150
151    @XNodeList(value = "policy", type = ArrayList.class, componentType = PolicyDescriptor.class)
152    public List<PolicyDescriptor> policies = new ArrayList<>();
153
154    protected ComputationPolicy defaultPolicy;
155
156    public ComputationPolicy getPolicy(String computationName) {
157        PolicyDescriptor policyDescriptor = policies.stream()
158                                                    .filter(policy -> computationName.equals(policy.getId()))
159                                                    .findFirst()
160                                                    .orElse(null);
161        if (policyDescriptor != null) {
162            return getComputationPolicy(policyDescriptor);
163        }
164        return null;
165    }
166
167    protected ComputationPolicy getComputationPolicy(PolicyDescriptor policyDescriptor) {
168        if (policyDescriptor.klass != null) {
169            if (!StreamComputationPolicy.class.isAssignableFrom(policyDescriptor.klass)) {
170                throw new IllegalArgumentException("Cannot create policy: " + policyDescriptor.getId()
171                        + " for processor: " + this.getId() + ", class must implement StreamComputationPolicy");
172            }
173            try {
174                return policyDescriptor.klass.getDeclaredConstructor().newInstance().getPolicy(policyDescriptor);
175            } catch (ReflectiveOperationException e) {
176                throw new StreamRuntimeException(
177                        "Cannot create policy: " + policyDescriptor.getId() + " for processor: " + this.getId(), e);
178            }
179        }
180        RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(policyDescriptor.maxRetries)
181                                                   .withBackoff(policyDescriptor.delay.toMillis(),
182                                                           policyDescriptor.maxDelay.toMillis(), TimeUnit.MILLISECONDS);
183        return new ComputationPolicyBuilder().retryPolicy(retryPolicy)
184                                             .batchPolicy(policyDescriptor.batchCapacity,
185                                                     policyDescriptor.batchThreshold)
186                                             .continueOnFailure(policyDescriptor.continueOnFailure)
187                                             .build();
188    }
189
190    public ComputationPolicy getDefaultPolicy() {
191        if (defaultPolicy == null) {
192            PolicyDescriptor policyDescriptor = policies.stream()
193                                                        .filter(policy -> "default".equals(policy.getId()))
194                                                        .findFirst()
195                                                        .orElse(null);
196            if (policyDescriptor == null) {
197                defaultPolicy = ComputationPolicy.NONE;
198            } else {
199                defaultPolicy = getComputationPolicy(policyDescriptor);
200            }
201        }
202        return defaultPolicy;
203    }
204
205    @Override
206    public String getId() {
207        return name;
208    }
209
210}