001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Objects;
026
027import org.nuxeo.lib.stream.codec.Codec;
028import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl;
029import org.nuxeo.lib.stream.log.Name;
030
031/**
032 * Settings defines stream's partitions and computation's concurrency and policy.
033 *
034 * @since 9.3
035 */
036public class Settings {
037    // streams
038    protected final int defaultPartitions;
039
040    protected final Map<Name, Integer> partitions = new HashMap<>();
041
042    protected final Codec<Record> defaultCodec;
043
044    protected final Map<Name, Codec<Record>> codecs = new HashMap<>();
045
046    protected final RecordFilterChain defaultFilter;
047
048    protected final Map<Name, RecordFilterChain> filters = new HashMap<>();
049
050    protected final Map<Name, Boolean> externals = new HashMap<>();
051
052    protected final boolean defaultExternal;
053
054    // computations
055    protected final int defaultConcurrency;
056
057    protected final Map<Name, Integer> concurrencies = new HashMap<>();
058
059    protected final ComputationPolicy defaultPolicy;
060
061    protected final Map<Name, ComputationPolicy> policies = new HashMap<>();
062
063    /**
064     * Default concurrency and partition to use if not specified explicitly.
065     */
066    public Settings(int defaultConcurrency, int defaultPartitions) {
067        this(defaultConcurrency, defaultPartitions, null, null, null);
068    }
069
070    /**
071     * Default concurrency and partition to use if not specified explicitly.
072     */
073    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec) {
074        this(defaultConcurrency, defaultPartitions, defaultCodec, null, null);
075    }
076
077    public Settings(int defaultConcurrency, int defaultPartitions, ComputationPolicy defaultPolicy) {
078        this(defaultConcurrency, defaultPartitions, null, defaultPolicy, null);
079    }
080
081    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec,
082            ComputationPolicy defaultPolicy) {
083        this(defaultConcurrency, defaultPartitions, defaultCodec, defaultPolicy, null);
084    }
085
086    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec,
087            ComputationPolicy defaultPolicy, RecordFilterChain defaultFilter) {
088        this(defaultConcurrency, defaultPartitions, defaultCodec, defaultPolicy, defaultFilter, false);
089    }
090
091    @SuppressWarnings("unchecked")
092    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec,
093            ComputationPolicy defaultPolicy, RecordFilterChain defaultFilter, boolean defaultExternal) {
094        this.defaultConcurrency = defaultConcurrency;
095        this.defaultPartitions = defaultPartitions;
096        if (defaultCodec == null) {
097            this.defaultCodec = NO_CODEC;
098        } else {
099            this.defaultCodec = defaultCodec;
100        }
101        if (defaultPolicy == null) {
102            this.defaultPolicy = ComputationPolicy.NONE;
103        } else {
104            this.defaultPolicy = defaultPolicy;
105        }
106        if (defaultFilter == null) {
107            this.defaultFilter = RecordFilterChainImpl.NONE;
108        } else {
109            this.defaultFilter = defaultFilter;
110        }
111        this.defaultExternal = defaultExternal;
112    }
113
114    /**
115     * Sets the computation thread pool size.
116     */
117    public Settings setConcurrency(String computationName, int concurrency) {
118        return setConcurrency(Name.ofUrn(computationName), concurrency);
119    }
120
121    public Settings setConcurrency(Name computationName, int concurrency) {
122        concurrencies.put(computationName, concurrency);
123        return this;
124    }
125
126    public int getConcurrency(String computationName) {
127        return getConcurrency(Name.ofUrn(computationName));
128    }
129
130    public int getConcurrency(Name computationName) {
131        return concurrencies.getOrDefault(computationName, defaultConcurrency);
132    }
133
134    /**
135     * Sets the number of partitions for a stream.
136     */
137    public Settings setPartitions(Name streamName, int partitions) {
138        this.partitions.put(streamName, partitions);
139        return this;
140    }
141
142    public Settings setPartitions(String streamName, int partitions) {
143        return setPartitions(Name.ofUrn(streamName), partitions);
144    }
145
146    public int getPartitions(Name streamName) {
147        return partitions.getOrDefault(streamName, defaultPartitions);
148    }
149
150    public int getPartitions(String streamName) {
151        return getPartitions(Name.ofUrn(streamName));
152    }
153
154    // @since 11.1
155    public Settings setExternal(Name streamName, boolean external) {
156        this.externals.put(streamName, external);
157        return this;
158    }
159
160    // @since 11.1
161    public boolean isExternal(Name streamName) {
162        return externals.getOrDefault(streamName, defaultExternal);
163    }
164
165    /**
166     * Sets the codec for a stream.
167     *
168     * @since 11.1
169     */
170    public Settings setCodec(Name streamName, Codec<Record> codec) {
171        Objects.requireNonNull(codec);
172        codecs.put(streamName, codec);
173        return this;
174    }
175
176    /**
177     * Sets the codec for a stream.
178     *
179     * @since 10.2
180     */
181    public Settings setCodec(String streamName, Codec<Record> codec) {
182        return setCodec(Name.ofUrn(streamName), codec);
183    }
184
185    /**
186     * Gets the codec for a stream.
187     *
188     * @since 10.2
189     */
190    public Codec<Record> getCodec(String streamName) {
191        return getCodec(Name.ofUrn(streamName));
192    }
193
194    /**
195     * Gets the codec for a stream.
196     *
197     * @since 11.1
198     */
199    public Codec<Record> getCodec(Name streamName) {
200        return codecs.getOrDefault(streamName, defaultCodec);
201    }
202
203    /**
204     * Sets the policy for a computation, when using default as computationName this sets the default policy for all
205     * computations in the processor.
206     *
207     * @since 11.1
208     */
209    public Settings setPolicy(Name computationName, ComputationPolicy policy) {
210        if (policy == null) {
211            policies.remove(computationName);
212        } else {
213            policies.put(computationName, policy);
214        }
215        return this;
216    }
217
218    /**
219     * Sets the policy for a computation, when using default as computationName this sets the default policy for all
220     * computations in the processor.
221     *
222     * @since 10.3
223     */
224    public Settings setPolicy(String computationName, ComputationPolicy policy) {
225        return setPolicy(Name.ofUrn(computationName), policy);
226    }
227
228    /**
229     * Gets the policy for a computation.
230     *
231     * @since 11.1
232     */
233    public ComputationPolicy getPolicy(Name computationName) {
234        return policies.getOrDefault(computationName, defaultPolicy);
235    }
236
237    /**
238     * Gets the policy for a computation.
239     *
240     * @since 10.3
241     */
242    public ComputationPolicy getPolicy(String computationName) {
243        return policies.getOrDefault(Name.ofUrn(computationName), defaultPolicy);
244    }
245
246    /**
247     * Add a filter
248     *
249     * @since 11.1
250     */
251    public Settings addFilter(Name streamName, RecordFilter filter) {
252        if (filter == null) {
253            filters.remove(streamName);
254        } else {
255            RecordFilterChain chain = filters.computeIfAbsent(streamName, k -> new RecordFilterChainImpl());
256            chain.addFilter(filter);
257        }
258        return this;
259    }
260
261    /**
262     * Add a filter
263     *
264     * @since 11.1
265     */
266    public Settings addFilter(String streamName, RecordFilter filter) {
267        return addFilter(Name.ofUrn(streamName), filter);
268    }
269
270    /**
271     * Gets the filter chain for a stream.
272     *
273     * @since 11.1
274     */
275    public RecordFilterChain getFilterChain(Name streamName) {
276        return filters.getOrDefault(streamName, defaultFilter);
277    }
278
279    /**
280     * Gets the filter chain for a stream.
281     *
282     * @since 11.1
283     */
284    public RecordFilterChain getFilterChain(String streamName) {
285        return filters.getOrDefault(Name.ofUrn(streamName), defaultFilter);
286    }
287
288}