001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.log;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022
023import java.net.InetAddress;
024import java.net.UnknownHostException;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.stream.Collectors;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.lib.stream.codec.AvroMessageCodec;
036import org.nuxeo.lib.stream.codec.Codec;
037import org.nuxeo.lib.stream.computation.Record;
038import org.nuxeo.lib.stream.computation.RecordFilter;
039import org.nuxeo.lib.stream.computation.RecordFilterChain;
040import org.nuxeo.lib.stream.computation.Settings;
041import org.nuxeo.lib.stream.computation.StreamManager;
042import org.nuxeo.lib.stream.computation.StreamProcessor;
043import org.nuxeo.lib.stream.computation.Topology;
044import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl;
045import org.nuxeo.lib.stream.log.LogManager;
046import org.nuxeo.lib.stream.log.LogOffset;
047import org.nuxeo.lib.stream.log.LogPartition;
048import org.nuxeo.lib.stream.log.LogTailer;
049import org.nuxeo.lib.stream.log.Name;
050import org.nuxeo.lib.stream.log.RebalanceListener;
051import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
052
053/**
054 * StreamManager based on a LogManager
055 *
056 * @since 11.1
057 */
058public class LogStreamManager implements StreamManager {
059    private static final Log log = LogFactory.getLog(LogStreamManager.class);
060
061    // Internal stream to describe started processors
062    // @since 11.5
063    public static final String PROCESSORS_STREAM = "internal/processors";
064
065    public static final String METRICS_STREAM = "internal/metrics";
066
067    public static final Codec<Record> INTERNAL_CODEC = new AvroMessageCodec<>(Record.class);
068
069    protected final LogManager logManager;
070
071    public LogStreamManager(LogManager logManager) {
072        this.logManager = logManager;
073        initInternalStreams();
074    }
075
076    protected void initInternalStreams() {
077        initInternalStream(Name.ofUrn(PROCESSORS_STREAM));
078        initInternalStream(Name.ofUrn(METRICS_STREAM));
079    }
080
081    protected void initInternalStream(Name stream) {
082        logManager.createIfNotExists(stream, 1);
083        logManager.getAppender(stream, INTERNAL_CODEC);
084        filters.put(stream, RecordFilterChainImpl.NONE);
085    }
086
087    protected final Map<String, Topology> topologies = new HashMap<>();
088
089    protected final Map<String, Settings> settings = new HashMap<>();
090
091    protected final Map<Name, RecordFilterChain> filters = new HashMap<>();
092
093    protected final Set<Name> streams = new HashSet<>();
094
095    @Override
096    public void register(String processorName, Topology topology, Settings settings) {
097        log.debug("Register processor: " + processorName);
098        topologies.put(processorName, topology);
099        this.settings.put(processorName, settings);
100        initStreams(topology, settings);
101        initAppenders(topology.streamsSet(), settings);
102        registerFilters(topology.streamsSet(), settings);
103    }
104
105    @Override
106    public void register(List<String> streams, Settings settings) {
107        streams.forEach(stream -> initStream(stream, settings));
108        initAppenders(streams, settings);
109        registerFilters(streams, settings);
110    }
111
112    @Override
113    public StreamProcessor createStreamProcessor(String processorName) {
114        if (!topologies.containsKey(processorName)) {
115            throw new IllegalArgumentException("Unregistered processor name: " + processorName);
116        }
117        LogStreamProcessor processor = new LogStreamProcessor(this);
118        processor.init(topologies.get(processorName), settings.get(processorName));
119        Map<String, String> meta = new HashMap<>();
120        meta.put("processorName",  processorName);
121        meta.putAll(getSystemMetadata());
122        append(PROCESSORS_STREAM, Record.of(meta.get("ip"), processor.toJson(meta).getBytes(UTF_8)));
123        return processor;
124    }
125
126    protected Map<String, String> getSystemMetadata() {
127        Map<String, String> systemMetadata = new HashMap<>();
128        try {
129            InetAddress host = InetAddress.getLocalHost();
130            systemMetadata.put("ip", host.getHostAddress());
131            systemMetadata.put("hostname", host.getHostName());
132        } catch (UnknownHostException e) {
133            systemMetadata.put("ip", "unknown");
134            systemMetadata.put("hostname", "unknown");
135        }
136        systemMetadata.put("cpuCores", String.valueOf(Runtime.getRuntime().availableProcessors()));
137        systemMetadata.put("jvmHeapSize", String.valueOf(Runtime.getRuntime().maxMemory()));
138        return systemMetadata;
139    }
140
141    public LogManager getLogManager() {
142        return logManager;
143    }
144
145    @Override
146    public LogOffset append(String streamUrn, Record record) {
147        Name stream = Name.ofUrn(streamUrn);
148        RecordFilterChain filter = filters.get(stream);
149        if (filter == null) {
150            throw new IllegalArgumentException("Unknown stream: " + stream);
151        }
152        record = filter.beforeAppend(record);
153        if (record == null) {
154            return new LogOffsetImpl(stream, 0, 0);
155        }
156        LogOffset offset = logManager.getAppender(stream).append(record.getKey(), record);
157        filter.afterAppend(record, offset);
158        return offset;
159    }
160
161    public boolean supportSubscribe() {
162        return logManager.supportSubscribe();
163    }
164
165    public LogTailer<Record> subscribe(Name computationName, Collection<Name> streams, RebalanceListener listener) {
166        Codec<Record> codec = getCodec(streams);
167        return logManager.subscribe(computationName, streams, listener, codec);
168    }
169
170    public LogTailer<Record> createTailer(Name computationName, Collection<LogPartition> streamPartitions) {
171        if (streamPartitions.isEmpty()) {
172            return logManager.createTailer(computationName, streamPartitions);
173        }
174        Codec<Record> codec = getCodec(streamPartitions.stream().map(LogPartition::name).collect(Collectors.toList()));
175        return logManager.createTailer(computationName, streamPartitions, codec);
176    }
177
178    public RecordFilter getFilter(Name stream) {
179        return filters.get(stream);
180    }
181
182    protected Codec<Record> getCodec(Collection<Name> streams) {
183        Codec<Record> codec = null;
184        for (Name stream : streams) {
185            Codec<Record> sCodec = logManager.<Record> getAppender(stream).getCodec();
186            if (codec == null) {
187                codec = sCodec;
188            } else if (!codec.getName().equals(sCodec.getName())) {
189                throw new IllegalArgumentException("Different codec on input streams are not supported " + streams);
190            }
191        }
192        return codec;
193    }
194
195    protected void initStreams(Topology topology, Settings settings) {
196        log.debug("Initializing streams");
197        topology.streamsSet().forEach(streamName -> initStream(streamName, settings));
198    }
199
200    protected void initStream(String streamName, Settings settings) {
201        Name stream = Name.ofUrn(streamName);
202        if (settings.isExternal(stream)) {
203            return;
204        }
205        if (!logManager.exists(stream)) {
206            logManager.createIfNotExists(stream, settings.getPartitions(streamName));
207        } else {
208            int size = logManager.size(stream);
209            if (settings.getPartitions(streamName) != size) {
210                log.debug(String.format(
211                        "Update settings for stream: %s defined with %d partitions but exists with %d partitions",
212                        streamName, settings.getPartitions(streamName), size));
213                settings.setPartitions(streamName, size);
214            }
215        }
216        streams.add(stream);
217    }
218
219    protected void initAppenders(Collection<String> streams, Settings settings) {
220        log.debug("Initializing source appenders so we ensure they use codec defined in the processor:");
221        streams.forEach(stream -> log.debug(stream));
222        streams.stream()
223               .filter(stream -> !settings.isExternal(Name.ofUrn(stream)))
224               .forEach(stream -> logManager.getAppender(Name.ofUrn(stream), settings.getCodec(stream)));
225    }
226
227    protected void registerFilters(Collection<String> streams, Settings settings) {
228        streams.stream()
229               .filter(stream -> !settings.isExternal(Name.ofUrn(stream)))
230               .forEach(stream -> filters.put(Name.ofUrn(stream), settings.getFilterChain(stream)));
231    }
232
233}