001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation.log; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022 023import java.net.InetAddress; 024import java.net.UnknownHostException; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.stream.Collectors; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.lib.stream.codec.AvroMessageCodec; 036import org.nuxeo.lib.stream.codec.Codec; 037import org.nuxeo.lib.stream.computation.Record; 038import org.nuxeo.lib.stream.computation.RecordFilter; 039import org.nuxeo.lib.stream.computation.RecordFilterChain; 040import org.nuxeo.lib.stream.computation.Settings; 041import org.nuxeo.lib.stream.computation.StreamManager; 042import org.nuxeo.lib.stream.computation.StreamProcessor; 043import org.nuxeo.lib.stream.computation.Topology; 044import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl; 045import org.nuxeo.lib.stream.log.LogManager; 046import org.nuxeo.lib.stream.log.LogOffset; 047import org.nuxeo.lib.stream.log.LogPartition; 048import org.nuxeo.lib.stream.log.LogTailer; 049import org.nuxeo.lib.stream.log.Name; 050import org.nuxeo.lib.stream.log.RebalanceListener; 051import org.nuxeo.lib.stream.log.internals.LogOffsetImpl; 052 053/** 054 * StreamManager based on a LogManager 055 * 056 * @since 11.1 057 */ 058public class LogStreamManager implements StreamManager { 059 private static final Log log = LogFactory.getLog(LogStreamManager.class); 060 061 // Internal stream to describe started processors 062 // @since 11.5 063 public static final String PROCESSORS_STREAM = "internal/processors"; 064 065 public static final String METRICS_STREAM = "internal/metrics"; 066 067 public static final Codec<Record> INTERNAL_CODEC = new AvroMessageCodec<>(Record.class); 068 069 protected final LogManager logManager; 070 071 public LogStreamManager(LogManager logManager) { 072 this.logManager = logManager; 073 initInternalStreams(); 074 } 075 076 protected void initInternalStreams() { 077 initInternalStream(Name.ofUrn(PROCESSORS_STREAM)); 078 initInternalStream(Name.ofUrn(METRICS_STREAM)); 079 } 080 081 protected void initInternalStream(Name stream) { 082 logManager.createIfNotExists(stream, 1); 083 logManager.getAppender(stream, INTERNAL_CODEC); 084 filters.put(stream, RecordFilterChainImpl.NONE); 085 } 086 087 protected final Map<String, Topology> topologies = new HashMap<>(); 088 089 protected final Map<String, Settings> settings = new HashMap<>(); 090 091 protected final Map<Name, RecordFilterChain> filters = new HashMap<>(); 092 093 protected final Set<Name> streams = new HashSet<>(); 094 095 @Override 096 public void register(String processorName, Topology topology, Settings settings) { 097 log.debug("Register processor: " + processorName); 098 topologies.put(processorName, topology); 099 this.settings.put(processorName, settings); 100 initStreams(topology, settings); 101 initAppenders(topology.streamsSet(), settings); 102 registerFilters(topology.streamsSet(), settings); 103 } 104 105 @Override 106 public void register(List<String> streams, Settings settings) { 107 streams.forEach(stream -> initStream(stream, settings)); 108 initAppenders(streams, settings); 109 registerFilters(streams, settings); 110 } 111 112 @Override 113 public StreamProcessor createStreamProcessor(String processorName) { 114 if (!topologies.containsKey(processorName)) { 115 throw new IllegalArgumentException("Unregistered processor name: " + processorName); 116 } 117 LogStreamProcessor processor = new LogStreamProcessor(this); 118 processor.init(topologies.get(processorName), settings.get(processorName)); 119 Map<String, String> meta = new HashMap<>(); 120 meta.put("processorName", processorName); 121 meta.putAll(getSystemMetadata()); 122 append(PROCESSORS_STREAM, Record.of(meta.get("ip"), processor.toJson(meta).getBytes(UTF_8))); 123 return processor; 124 } 125 126 protected Map<String, String> getSystemMetadata() { 127 Map<String, String> systemMetadata = new HashMap<>(); 128 try { 129 InetAddress host = InetAddress.getLocalHost(); 130 systemMetadata.put("ip", host.getHostAddress()); 131 systemMetadata.put("hostname", host.getHostName()); 132 } catch (UnknownHostException e) { 133 systemMetadata.put("ip", "unknown"); 134 systemMetadata.put("hostname", "unknown"); 135 } 136 systemMetadata.put("cpuCores", String.valueOf(Runtime.getRuntime().availableProcessors())); 137 systemMetadata.put("jvmHeapSize", String.valueOf(Runtime.getRuntime().maxMemory())); 138 return systemMetadata; 139 } 140 141 public LogManager getLogManager() { 142 return logManager; 143 } 144 145 @Override 146 public LogOffset append(String streamUrn, Record record) { 147 Name stream = Name.ofUrn(streamUrn); 148 RecordFilterChain filter = filters.get(stream); 149 if (filter == null) { 150 throw new IllegalArgumentException("Unknown stream: " + stream); 151 } 152 record = filter.beforeAppend(record); 153 if (record == null) { 154 return new LogOffsetImpl(stream, 0, 0); 155 } 156 LogOffset offset = logManager.getAppender(stream).append(record.getKey(), record); 157 filter.afterAppend(record, offset); 158 return offset; 159 } 160 161 public boolean supportSubscribe() { 162 return logManager.supportSubscribe(); 163 } 164 165 public LogTailer<Record> subscribe(Name computationName, Collection<Name> streams, RebalanceListener listener) { 166 Codec<Record> codec = getCodec(streams); 167 return logManager.subscribe(computationName, streams, listener, codec); 168 } 169 170 public LogTailer<Record> createTailer(Name computationName, Collection<LogPartition> streamPartitions) { 171 if (streamPartitions.isEmpty()) { 172 return logManager.createTailer(computationName, streamPartitions); 173 } 174 Codec<Record> codec = getCodec(streamPartitions.stream().map(LogPartition::name).collect(Collectors.toList())); 175 return logManager.createTailer(computationName, streamPartitions, codec); 176 } 177 178 public RecordFilter getFilter(Name stream) { 179 return filters.get(stream); 180 } 181 182 protected Codec<Record> getCodec(Collection<Name> streams) { 183 Codec<Record> codec = null; 184 for (Name stream : streams) { 185 Codec<Record> sCodec = logManager.<Record> getAppender(stream).getCodec(); 186 if (codec == null) { 187 codec = sCodec; 188 } else if (!codec.getName().equals(sCodec.getName())) { 189 throw new IllegalArgumentException("Different codec on input streams are not supported " + streams); 190 } 191 } 192 return codec; 193 } 194 195 protected void initStreams(Topology topology, Settings settings) { 196 log.debug("Initializing streams"); 197 topology.streamsSet().forEach(streamName -> initStream(streamName, settings)); 198 } 199 200 protected void initStream(String streamName, Settings settings) { 201 Name stream = Name.ofUrn(streamName); 202 if (settings.isExternal(stream)) { 203 return; 204 } 205 if (!logManager.exists(stream)) { 206 logManager.createIfNotExists(stream, settings.getPartitions(streamName)); 207 } else { 208 int size = logManager.size(stream); 209 if (settings.getPartitions(streamName) != size) { 210 log.debug(String.format( 211 "Update settings for stream: %s defined with %d partitions but exists with %d partitions", 212 streamName, settings.getPartitions(streamName), size)); 213 settings.setPartitions(streamName, size); 214 } 215 } 216 streams.add(stream); 217 } 218 219 protected void initAppenders(Collection<String> streams, Settings settings) { 220 log.debug("Initializing source appenders so we ensure they use codec defined in the processor:"); 221 streams.forEach(stream -> log.debug(stream)); 222 streams.stream() 223 .filter(stream -> !settings.isExternal(Name.ofUrn(stream))) 224 .forEach(stream -> logManager.getAppender(Name.ofUrn(stream), settings.getCodec(stream))); 225 } 226 227 protected void registerFilters(Collection<String> streams, Settings settings) { 228 streams.stream() 229 .filter(stream -> !settings.isExternal(Name.ofUrn(stream))) 230 .forEach(stream -> filters.put(Name.ofUrn(stream), settings.getFilterChain(stream))); 231 } 232 233}