001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Objects; 026 027import org.nuxeo.lib.stream.codec.Codec; 028 029/** 030 * Settings defines stream's partitions and computation's concurrency and policy. 031 * 032 * @since 9.3 033 */ 034public class Settings { 035 protected final int defaultConcurrency; 036 037 protected final int defaultPartitions; 038 039 protected final Codec<Record> defaultCodec; 040 041 protected final ComputationPolicy defaultPolicy; 042 043 protected final Map<String, Integer> concurrencies = new HashMap<>(); 044 045 protected final Map<String, Integer> partitions = new HashMap<>(); 046 047 protected final Map<String, Codec<Record>> codecs = new HashMap<>(); 048 049 protected final Map<String, ComputationPolicy> policies = new HashMap<>(); 050 051 /** 052 * Default concurrency and partition to use if not specified explicitly. 053 */ 054 public Settings(int defaultConcurrency, int defaultPartitions) { 055 this(defaultConcurrency, defaultPartitions, null, null); 056 } 057 058 /** 059 * Default concurrency and partition to use if not specified explicitly. 060 */ 061 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec) { 062 this(defaultConcurrency, defaultPartitions, defaultCodec, null); 063 } 064 065 public Settings(int defaultConcurrency, int defaultPartitions, ComputationPolicy defaultPolicy) { 066 this(defaultConcurrency, defaultPartitions, null, defaultPolicy); 067 } 068 069 @SuppressWarnings("unchecked") 070 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec, 071 ComputationPolicy defaultPolicy) { 072 this.defaultConcurrency = defaultConcurrency; 073 this.defaultPartitions = defaultPartitions; 074 if (defaultCodec == null) { 075 this.defaultCodec = NO_CODEC; 076 } else { 077 this.defaultCodec = defaultCodec; 078 } 079 if (defaultPolicy == null) { 080 this.defaultPolicy = ComputationPolicy.NONE; 081 } else { 082 this.defaultPolicy = defaultPolicy; 083 } 084 } 085 086 /** 087 * Sets the computation thread pool size. 088 */ 089 public Settings setConcurrency(String computationName, int concurrency) { 090 concurrencies.put(computationName, concurrency); 091 return this; 092 } 093 094 public int getConcurrency(String computationName) { 095 return concurrencies.getOrDefault(computationName, defaultConcurrency); 096 } 097 098 /** 099 * Sets the number of partitions for a stream. 100 */ 101 public Settings setPartitions(String streamName, int partitions) { 102 this.partitions.put(streamName, partitions); 103 return this; 104 } 105 106 public int getPartitions(String streamName) { 107 return partitions.getOrDefault(streamName, defaultPartitions); 108 } 109 110 /** 111 * Sets the codec for a stream. 112 * 113 * @since 10.2 114 */ 115 public Settings setCodec(String streamName, Codec<Record> codec) { 116 Objects.requireNonNull(codec); 117 codecs.put(streamName, codec); 118 return this; 119 } 120 121 /** 122 * Gets the codec for a stream. 123 * 124 * @since 10.2 125 */ 126 public Codec<Record> getCodec(String streamName) { 127 return codecs.getOrDefault(streamName, defaultCodec); 128 } 129 130 /** 131 * Sets the policy for a computation, when using default as computationName this sets the default policy for all 132 * computations in the processor. 133 * 134 * @since 10.3 135 */ 136 public Settings setPolicy(String computationName, ComputationPolicy policy) { 137 if (policy == null) { 138 policies.remove(computationName); 139 } else { 140 policies.put(computationName, policy); 141 } 142 return this; 143 } 144 145 /** 146 * Gets the policy for a computation. 147 * 148 * @since 10.3 149 */ 150 public ComputationPolicy getPolicy(String computationName) { 151 return policies.getOrDefault(computationName, defaultPolicy); 152 } 153 154}