001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 022 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Objects; 026 027import org.nuxeo.lib.stream.codec.Codec; 028import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl; 029import org.nuxeo.lib.stream.log.Name; 030 031/** 032 * Settings defines stream's partitions and computation's concurrency and policy. 033 * 034 * @since 9.3 035 */ 036public class Settings { 037 // streams 038 protected final int defaultPartitions; 039 040 protected final Map<Name, Integer> partitions = new HashMap<>(); 041 042 protected final Codec<Record> defaultCodec; 043 044 protected final Map<Name, Codec<Record>> codecs = new HashMap<>(); 045 046 protected final RecordFilterChain defaultFilter; 047 048 protected final Map<Name, RecordFilterChain> filters = new HashMap<>(); 049 050 protected final Map<Name, Boolean> externals = new HashMap<>(); 051 052 protected final boolean defaultExternal; 053 054 // computations 055 protected final int defaultConcurrency; 056 057 protected final Map<Name, Integer> concurrencies = new HashMap<>(); 058 059 protected final ComputationPolicy defaultPolicy; 060 061 protected final Map<Name, ComputationPolicy> policies = new HashMap<>(); 062 063 /** 064 * Default concurrency and partition to use if not specified explicitly. 065 */ 066 public Settings(int defaultConcurrency, int defaultPartitions) { 067 this(defaultConcurrency, defaultPartitions, null, null, null); 068 } 069 070 /** 071 * Default concurrency and partition to use if not specified explicitly. 072 */ 073 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec) { 074 this(defaultConcurrency, defaultPartitions, defaultCodec, null, null); 075 } 076 077 public Settings(int defaultConcurrency, int defaultPartitions, ComputationPolicy defaultPolicy) { 078 this(defaultConcurrency, defaultPartitions, null, defaultPolicy, null); 079 } 080 081 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec, 082 ComputationPolicy defaultPolicy) { 083 this(defaultConcurrency, defaultPartitions, defaultCodec, defaultPolicy, null); 084 } 085 086 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec, 087 ComputationPolicy defaultPolicy, RecordFilterChain defaultFilter) { 088 this(defaultConcurrency, defaultPartitions, defaultCodec, defaultPolicy, defaultFilter, false); 089 } 090 091 @SuppressWarnings("unchecked") 092 public Settings(int defaultConcurrency, int defaultPartitions, Codec<Record> defaultCodec, 093 ComputationPolicy defaultPolicy, RecordFilterChain defaultFilter, boolean defaultExternal) { 094 this.defaultConcurrency = defaultConcurrency; 095 this.defaultPartitions = defaultPartitions; 096 if (defaultCodec == null) { 097 this.defaultCodec = NO_CODEC; 098 } else { 099 this.defaultCodec = defaultCodec; 100 } 101 if (defaultPolicy == null) { 102 this.defaultPolicy = ComputationPolicy.NONE; 103 } else { 104 this.defaultPolicy = defaultPolicy; 105 } 106 if (defaultFilter == null) { 107 this.defaultFilter = RecordFilterChainImpl.NONE; 108 } else { 109 this.defaultFilter = defaultFilter; 110 } 111 this.defaultExternal = defaultExternal; 112 } 113 114 /** 115 * Sets the computation thread pool size. 116 */ 117 public Settings setConcurrency(String computationName, int concurrency) { 118 return setConcurrency(Name.ofUrn(computationName), concurrency); 119 } 120 121 public Settings setConcurrency(Name computationName, int concurrency) { 122 concurrencies.put(computationName, concurrency); 123 return this; 124 } 125 126 public int getConcurrency(String computationName) { 127 return getConcurrency(Name.ofUrn(computationName)); 128 } 129 130 public int getConcurrency(Name computationName) { 131 return concurrencies.getOrDefault(computationName, defaultConcurrency); 132 } 133 134 /** 135 * Sets the number of partitions for a stream. 136 */ 137 public Settings setPartitions(Name streamName, int partitions) { 138 this.partitions.put(streamName, partitions); 139 return this; 140 } 141 142 public Settings setPartitions(String streamName, int partitions) { 143 return setPartitions(Name.ofUrn(streamName), partitions); 144 } 145 146 public int getPartitions(Name streamName) { 147 return partitions.getOrDefault(streamName, defaultPartitions); 148 } 149 150 public int getPartitions(String streamName) { 151 return getPartitions(Name.ofUrn(streamName)); 152 } 153 154 // @since 11.1 155 public Settings setExternal(Name streamName, boolean external) { 156 this.externals.put(streamName, external); 157 return this; 158 } 159 160 // @since 11.1 161 public boolean isExternal(Name streamName) { 162 return externals.getOrDefault(streamName, defaultExternal); 163 } 164 165 /** 166 * Sets the codec for a stream. 167 * 168 * @since 11.1 169 */ 170 public Settings setCodec(Name streamName, Codec<Record> codec) { 171 Objects.requireNonNull(codec); 172 codecs.put(streamName, codec); 173 return this; 174 } 175 176 /** 177 * Sets the codec for a stream. 178 * 179 * @since 10.2 180 */ 181 public Settings setCodec(String streamName, Codec<Record> codec) { 182 return setCodec(Name.ofUrn(streamName), codec); 183 } 184 185 /** 186 * Gets the codec for a stream. 187 * 188 * @since 10.2 189 */ 190 public Codec<Record> getCodec(String streamName) { 191 return getCodec(Name.ofUrn(streamName)); 192 } 193 194 /** 195 * Gets the codec for a stream. 196 * 197 * @since 11.1 198 */ 199 public Codec<Record> getCodec(Name streamName) { 200 return codecs.getOrDefault(streamName, defaultCodec); 201 } 202 203 /** 204 * Sets the policy for a computation, when using default as computationName this sets the default policy for all 205 * computations in the processor. 206 * 207 * @since 11.1 208 */ 209 public Settings setPolicy(Name computationName, ComputationPolicy policy) { 210 if (policy == null) { 211 policies.remove(computationName); 212 } else { 213 policies.put(computationName, policy); 214 } 215 return this; 216 } 217 218 /** 219 * Sets the policy for a computation, when using default as computationName this sets the default policy for all 220 * computations in the processor. 221 * 222 * @since 10.3 223 */ 224 public Settings setPolicy(String computationName, ComputationPolicy policy) { 225 return setPolicy(Name.ofUrn(computationName), policy); 226 } 227 228 /** 229 * Gets the policy for a computation. 230 * 231 * @since 11.1 232 */ 233 public ComputationPolicy getPolicy(Name computationName) { 234 return policies.getOrDefault(computationName, defaultPolicy); 235 } 236 237 /** 238 * Gets the policy for a computation. 239 * 240 * @since 10.3 241 */ 242 public ComputationPolicy getPolicy(String computationName) { 243 return policies.getOrDefault(Name.ofUrn(computationName), defaultPolicy); 244 } 245 246 /** 247 * Add a filter 248 * 249 * @since 11.1 250 */ 251 public Settings addFilter(Name streamName, RecordFilter filter) { 252 if (filter == null) { 253 filters.remove(streamName); 254 } else { 255 RecordFilterChain chain = filters.computeIfAbsent(streamName, k -> new RecordFilterChainImpl()); 256 chain.addFilter(filter); 257 } 258 return this; 259 } 260 261 /** 262 * Add a filter 263 * 264 * @since 11.1 265 */ 266 public Settings addFilter(String streamName, RecordFilter filter) { 267 return addFilter(Name.ofUrn(streamName), filter); 268 } 269 270 /** 271 * Gets the filter chain for a stream. 272 * 273 * @since 11.1 274 */ 275 public RecordFilterChain getFilterChain(Name streamName) { 276 return filters.getOrDefault(streamName, defaultFilter); 277 } 278 279 /** 280 * Gets the filter chain for a stream. 281 * 282 * @since 11.1 283 */ 284 public RecordFilterChain getFilterChain(String streamName) { 285 return filters.getOrDefault(Name.ofUrn(streamName), defaultFilter); 286 } 287 288}