001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.util.HashMap; 024import java.util.Map; 025 026import org.nuxeo.lib.stream.codec.Codec; 027 028/** 029 * Settings defines stream's partitions and computation's concurrency. 030 * 031 * @since 9.3 032 */ 033public class Settings { 034 protected final int defaultConcurrency; 035 036 protected final int defaultPartitions; 037 038 protected final Codec<Record> defaultCodec; 039 040 protected final Map<String, Integer> concurrencies = new HashMap<>(); 041 042 protected final Map<String, Integer> partitions = new HashMap<>(); 043 044 protected final Map<String, Codec<Record>> codecs = new HashMap<>(); 045 046 /** 047 * Default concurrency and partition to use if not specified explicitly 048 */ 049 @SuppressWarnings("unchecked") 050 public Settings(int defaultConcurrency, int defaultPartitions) { 051 this(defaultConcurrency, defaultPartitions, NO_CODEC); 052 } 053 054 /** 055 * Default concurrency and partition to use if not specified explicitly 056 */ 057 @SuppressWarnings("unchecked") 058 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec) { 059 this.defaultConcurrency = defaultConcurrency; 060 this.defaultPartitions = defaultPartitions; 061 if (defaultCodec == null) { 062 this.defaultCodec = NO_CODEC; 063 } else { 064 this.defaultCodec = defaultCodec; 065 } 066 } 067 068 /** 069 * Set the computation thread pool size. 070 */ 071 public Settings setConcurrency(String computationName, int concurrency) { 072 concurrencies.put(computationName, concurrency); 073 return this; 074 } 075 076 public int getConcurrency(String computationName) { 077 return concurrencies.getOrDefault(computationName, defaultConcurrency); 078 } 079 080 /** 081 * Set the number of partitions for a stream. 082 */ 083 public Settings setPartitions(String streamName, int partitions) { 084 this.partitions.put(streamName, partitions); 085 return this; 086 } 087 088 public int getPartitions(String streamName) { 089 return partitions.getOrDefault(streamName, defaultPartitions); 090 } 091 092 /** 093 * Set the codec for a stream 094 * 095 * @since 10.2 096 */ 097 public Settings setCodec(String streamName, Codec<Record> codec) { 098 codecs.put(streamName, codec); 099 return this; 100 } 101 102 /** 103 * Get a codec for a stream 104 * 105 * @since 10.2 106 */ 107 public Codec<Record> getCodec(String streamName) { 108 return codecs.getOrDefault(streamName, defaultCodec); 109 } 110}