001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * 017 * Contributors: 018 * bdelbosc 019 */ 020package org.nuxeo.runtime.stream; 021 022import java.lang.reflect.InvocationTargetException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Objects; 028 029import org.nuxeo.common.xmap.annotation.XNode; 030import org.nuxeo.common.xmap.annotation.XNodeList; 031import org.nuxeo.common.xmap.annotation.XNodeMap; 032import org.nuxeo.common.xmap.annotation.XObject; 033import org.nuxeo.lib.stream.StreamRuntimeException; 034import org.nuxeo.lib.stream.codec.Codec; 035import org.nuxeo.lib.stream.computation.Record; 036import org.nuxeo.lib.stream.computation.Settings; 037import org.nuxeo.lib.stream.computation.Topology; 038import org.nuxeo.runtime.codec.CodecService; 039 040@SuppressWarnings("CanBeFinal") 041@XObject("streamProcessor") 042public class StreamProcessorDescriptor { 043 public static final Integer DEFAULT_CONCURRENCY = 4; 044 045 @XNode("@name") 046 public String name; 047 048 @XNode("@logConfig") 049 public String config; 050 051 @XNode("@class") 052 public Class<? extends StreamProcessorTopology> klass; 053 054 @XNode("@defaultConcurrency") 055 public Integer defaultConcurrency = DEFAULT_CONCURRENCY; 056 057 @XNode("@defaultPartitions") 058 public Integer defaultPartitions = DEFAULT_CONCURRENCY; 059 060 @XNode("@defaultCodec") 061 public String defaultCodec; 062 063 @XNodeMap(value = "option", key = "@name", type = HashMap.class, componentType = String.class) 064 public Map<String, String> options = new HashMap<>(); 065 066 @XNodeList(value = "computation", type = ArrayList.class, componentType = ComputationDescriptor.class) 067 public List<ComputationDescriptor> computations = new ArrayList<>(0); 068 069 @XNodeList(value = "stream", type = ArrayList.class, componentType = StreamDescriptor.class) 070 public List<StreamDescriptor> streams = new ArrayList<>(0); 071 072 public String getName() { 073 return name; 074 } 075 076 public Settings getSettings(CodecService codecService) { 077 Settings settings = new Settings(defaultConcurrency, defaultPartitions, getDefaultCodec(codecService)); 078 computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency)); 079 streams.forEach(stream -> settings.setPartitions(stream.name, stream.partitions)); 080 streams.stream().filter(stream -> Objects.nonNull(stream.codec)).forEach( 081 stream -> settings.setCodec(stream.name, codecService.getCodec(stream.codec, Record.class))); 082 return settings; 083 } 084 085 public Codec<Record> getDefaultCodec(CodecService codecService) { 086 if (defaultCodec == null) { 087 return null; 088 } 089 return codecService.getCodec(defaultCodec, Record.class); 090 } 091 092 public Topology getTopology() { 093 try { 094 return klass.getDeclaredConstructor().newInstance().getTopology(options); 095 } catch (InstantiationException | IllegalAccessException | InvocationTargetException 096 | NoSuchMethodException e) { 097 throw new StreamRuntimeException("Can not create topology for processor: " + name, e); 098 } 099 100 } 101 102 @SuppressWarnings("CanBeFinal") 103 @XObject(value = "computation") 104 public static class ComputationDescriptor { 105 @XNode("@name") 106 public String name; 107 108 @XNode("@concurrency") 109 public Integer concurrency = DEFAULT_CONCURRENCY; 110 111 public ComputationDescriptor() { 112 // empty constructor 113 } 114 } 115 116 @SuppressWarnings("CanBeFinal") 117 @XObject(value = "stream") 118 public static class StreamDescriptor { 119 120 @XNode("@name") 121 public String name; 122 123 @XNode("@partitions") 124 public Integer partitions = DEFAULT_CONCURRENCY; 125 126 @XNode("@codec") 127 public String codec; 128 129 public StreamDescriptor() { 130 // empty constructor 131 } 132 133 } 134}