001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
022
023import java.util.HashMap;
024import java.util.Map;
025
026import org.nuxeo.lib.stream.codec.Codec;
027
028/**
029 * Settings defines stream's partitions and computation's concurrency.
030 *
031 * @since 9.3
032 */
033public class Settings {
034    protected final int defaultConcurrency;
035
036    protected final int defaultPartitions;
037
038    protected final Codec<Record> defaultCodec;
039
040    protected final Map<String, Integer> concurrencies = new HashMap<>();
041
042    protected final Map<String, Integer> partitions = new HashMap<>();
043
044    protected final Map<String, Codec<Record>> codecs = new HashMap<>();
045
046    /**
047     * Default concurrency and partition to use if not specified explicitly
048     */
049    @SuppressWarnings("unchecked")
050    public Settings(int defaultConcurrency, int defaultPartitions) {
051        this(defaultConcurrency, defaultPartitions, NO_CODEC);
052    }
053
054    /**
055     * Default concurrency and partition to use if not specified explicitly
056     */
057    @SuppressWarnings("unchecked")
058    public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec) {
059        this.defaultConcurrency = defaultConcurrency;
060        this.defaultPartitions = defaultPartitions;
061        if (defaultCodec == null) {
062            this.defaultCodec = NO_CODEC;
063        } else {
064            this.defaultCodec = defaultCodec;
065        }
066    }
067
068    /**
069     * Set the computation thread pool size.
070     */
071    public Settings setConcurrency(String computationName, int concurrency) {
072        concurrencies.put(computationName, concurrency);
073        return this;
074    }
075
076    public int getConcurrency(String computationName) {
077        return concurrencies.getOrDefault(computationName, defaultConcurrency);
078    }
079
080    /**
081     * Set the number of partitions for a stream.
082     */
083    public Settings setPartitions(String streamName, int partitions) {
084        this.partitions.put(streamName, partitions);
085        return this;
086    }
087
088    public int getPartitions(String streamName) {
089        return partitions.getOrDefault(streamName, defaultPartitions);
090    }
091
092    /**
093     * Set the codec for a stream
094     *
095     * @since 10.2
096     */
097    public Settings setCodec(String streamName, Codec<Record> codec) {
098        codecs.put(streamName, codec);
099        return this;
100    }
101
102    /**
103     * Get a codec for a stream
104     *
105     * @since 10.2
106     */
107    public Codec<Record> getCodec(String streamName) {
108        return codecs.getOrDefault(streamName, defaultCodec);
109    }
110}