001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Objects;
026
027import org.nuxeo.lib.stream.codec.Codec;
028
029/**
030 * Settings defines stream's partitions and computation's concurrency and policy.
031 *
032 * @since 9.3
033 */
034public class Settings {
035    protected final int defaultConcurrency;
036
037    protected final int defaultPartitions;
038
039    protected final Codec<Record> defaultCodec;
040
041    protected final ComputationPolicy defaultPolicy;
042
043    protected final Map<String, Integer> concurrencies = new HashMap<>();
044
045    protected final Map<String, Integer> partitions = new HashMap<>();
046
047    protected final Map<String, Codec<Record>> codecs = new HashMap<>();
048
049    protected final Map<String, ComputationPolicy> policies = new HashMap<>();
050
051    /**
052     * Default concurrency and partition to use if not specified explicitly.
053     */
054    public Settings(int defaultConcurrency, int defaultPartitions) {
055        this(defaultConcurrency, defaultPartitions, null, null);
056    }
057
058    /**
059     * Default concurrency and partition to use if not specified explicitly.
060     */
061    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec) {
062        this(defaultConcurrency, defaultPartitions, defaultCodec, null);
063    }
064
065    public Settings(int defaultConcurrency, int defaultPartitions, ComputationPolicy defaultPolicy) {
066        this(defaultConcurrency, defaultPartitions, null, defaultPolicy);
067    }
068
069    @SuppressWarnings("unchecked")
070    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec,
071            ComputationPolicy defaultPolicy) {
072        this.defaultConcurrency = defaultConcurrency;
073        this.defaultPartitions = defaultPartitions;
074        if (defaultCodec == null) {
075            this.defaultCodec = NO_CODEC;
076        } else {
077            this.defaultCodec = defaultCodec;
078        }
079        if (defaultPolicy == null) {
080            this.defaultPolicy = ComputationPolicy.NONE;
081        } else {
082            this.defaultPolicy = defaultPolicy;
083        }
084    }
085
086    /**
087     * Sets the computation thread pool size.
088     */
089    public Settings setConcurrency(String computationName, int concurrency) {
090        concurrencies.put(computationName, concurrency);
091        return this;
092    }
093
094    public int getConcurrency(String computationName) {
095        return concurrencies.getOrDefault(computationName, defaultConcurrency);
096    }
097
098    /**
099     * Sets the number of partitions for a stream.
100     */
101    public Settings setPartitions(String streamName, int partitions) {
102        this.partitions.put(streamName, partitions);
103        return this;
104    }
105
106    public int getPartitions(String streamName) {
107        return partitions.getOrDefault(streamName, defaultPartitions);
108    }
109
110    /**
111     * Sets the codec for a stream.
112     *
113     * @since 10.2
114     */
115    public Settings setCodec(String streamName, Codec<Record> codec) {
116        Objects.requireNonNull(codec);
117        codecs.put(streamName, codec);
118        return this;
119    }
120
121    /**
122     * Gets the codec for a stream.
123     *
124     * @since 10.2
125     */
126    public Codec<Record> getCodec(String streamName) {
127        return codecs.getOrDefault(streamName, defaultCodec);
128    }
129
130    /**
131     * Sets the policy for a computation, when using default as computationName this sets the default policy for all
132     * computations in the processor.
133     *
134     * @since 10.3
135     */
136    public Settings setPolicy(String computationName, ComputationPolicy policy) {
137        if (policy == null) {
138            policies.remove(computationName);
139        } else {
140            policies.put(computationName, policy);
141        }
142        return this;
143    }
144
145    /**
146     * Gets the policy for a computation.
147     *
148     * @since 10.3
149     */
150    public ComputationPolicy getPolicy(String computationName) {
151        return policies.getOrDefault(computationName, defaultPolicy);
152    }
153
154}