001/*
002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.core.bulk.introspection;
020
021import static java.nio.charset.StandardCharsets.UTF_8;
022import static org.apache.commons.lang3.StringUtils.isBlank;
023
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.stream.Collectors;
028
029import org.apache.logging.log4j.Logger;
030import org.nuxeo.lib.stream.computation.AbstractComputation;
031import org.nuxeo.lib.stream.computation.ComputationContext;
032import org.nuxeo.lib.stream.computation.Record;
033import org.nuxeo.runtime.api.Framework;
034import org.nuxeo.runtime.kv.KeyValueService;
035import org.nuxeo.runtime.kv.KeyValueStore;
036
037import com.fasterxml.jackson.core.JsonProcessingException;
038import com.fasterxml.jackson.databind.JsonNode;
039import com.fasterxml.jackson.databind.ObjectMapper;
040import com.fasterxml.jackson.databind.node.ArrayNode;
041import com.fasterxml.jackson.databind.node.ObjectNode;
042
043/**
044 * A computation that reads processor and metrics streams to build a representation of stream activities in the cluster.
045 * The representation is pushed to the KV Store.
046 *
047 * @since 11.5
048 */
049public class StreamIntrospectionComputation extends AbstractComputation {
050    private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(
051            StreamIntrospectionComputation.class);
052
053    public static final String NAME = "stream/introspection";
054
055    public static final String INTROSPECTION_KV_STORE = "introspection";
056
057    public static final String INTROSPECTION_KEY = "streamIntrospection";
058
059    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
060
061    protected final Map<String, JsonNode> streams = new HashMap<>();
062
063    protected final Map<String, JsonNode> processors = new HashMap<>();
064
065    protected final Map<String, JsonNode> metrics = new HashMap<>();
066
067    protected static final long TTL_SECONDS = 300;
068
069    protected String model;
070
071    public StreamIntrospectionComputation() {
072        super(NAME, 2, 0);
073    }
074
075    @Override
076    public void init(ComputationContext context) {
077        if (context.isSpareComputation()) {
078            log.info("Spare instance nothing to report");
079        } else {
080            log.warn("Instance elected to introspect Nuxeo Stream activity");
081        }
082        loadModel(getKvStore().getString(INTROSPECTION_KEY));
083    }
084
085    protected void loadModel(String modelJson) {
086        streams.clear();
087        processors.clear();
088        metrics.clear();
089        if (isBlank(modelJson)) {
090            model = null;
091            return;
092        }
093        try {
094            JsonNode modelNode = OBJECT_MAPPER.readTree(modelJson);
095            JsonNode node = modelNode.get("streams");
096            if (node.isArray()) {
097                for (JsonNode item : node) {
098                    streams.put(item.get("name").asText(), item);
099                }
100            }
101            node = modelNode.get("processors");
102            if (node.isArray()) {
103                for (JsonNode item : node) {
104                    processors.put(getProcessorKey(item), item);
105                }
106            }
107            node = modelNode.get("metrics");
108            if (node.isArray()) {
109                for (JsonNode item : node) {
110                    metrics.put(item.get("ip").asText(), item);
111                }
112            }
113            model = modelJson;
114        } catch (JsonProcessingException e) {
115            log.error("Unable to parse KV model as JSON {}", modelJson, e);
116            model = null;
117        }
118    }
119
120    @Override
121    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
122        JsonNode json = getJson(record);
123        if (json != null) {
124            if (INPUT_1.equals(inputStreamName)) {
125                updateStreamsAndProcessors(json);
126            } else if (INPUT_2.equals(inputStreamName)) {
127                if (json.has("ip")) {
128                    metrics.put(json.get("ip").asText(), json);
129                }
130            }
131        }
132        removeOldNodes();
133        buildModel();
134        updateModel();
135        context.askForCheckpoint();
136    }
137
138    protected void updateStreamsAndProcessors(JsonNode node) {
139        JsonNode streamsNode = node.get("streams");
140        if (streamsNode == null) {
141            log.warn("Invalid metric without streams field: {}", node);
142            return;
143        }
144        if (streamsNode.isArray()) {
145            for (JsonNode item : streamsNode) {
146                streams.put(item.get("name").asText(), item);
147            }
148        }
149        ((ObjectNode) node).remove("streams");
150        processors.put(getProcessorKey(node), node);
151    }
152
153    protected String getProcessorKey(JsonNode json) {
154        return json.at("/metadata/ip").asText() + ":" + json.at("/metadata/processorName").asText();
155    }
156
157    protected void updateModel() {
158        KeyValueStore kv = getKvStore();
159        kv.put(INTROSPECTION_KEY, model);
160    }
161
162    protected KeyValueStore getKvStore() {
163        return Framework.getService(KeyValueService.class).getKeyValueStore(INTROSPECTION_KV_STORE);
164    }
165
166    protected void buildModel() {
167        ObjectNode node = OBJECT_MAPPER.createObjectNode();
168        ArrayNode streamsNode = OBJECT_MAPPER.createArrayNode();
169        streamsNode.addAll(streams.values());
170        node.set("streams", streamsNode);
171        ArrayNode processorsNode = OBJECT_MAPPER.createArrayNode();
172        processorsNode.addAll(processors.values());
173        node.set("processors", processorsNode);
174        ArrayNode metricsNode = OBJECT_MAPPER.createArrayNode();
175        metricsNode.addAll(metrics.values());
176        node.set("metrics", metricsNode);
177        try {
178            model = OBJECT_MAPPER.writer().writeValueAsString(node);
179        } catch (JsonProcessingException e) {
180            log.error("Cannot build JSON model", e);
181            model = "{}";
182        }
183    }
184
185    protected void removeOldNodes() {
186        // Remove all nodes with metrics older than TTL
187        long now = System.currentTimeMillis() / 1000;
188        List<String> toRemove = metrics.values()
189                                       .stream()
190                                       .filter(json -> (now - json.get("timestamp").asLong()) > TTL_SECONDS)
191                                       .map(json -> json.get("ip").asText())
192                                       .collect(Collectors.toList());
193        log.debug("Removing nodes: {}", toRemove);
194        toRemove.forEach(metrics::remove);
195        toRemove.forEach(ip -> {
196            List<String> toRemoveProcessors = processors.keySet()
197                                                        .stream()
198                                                        .filter(key -> key.startsWith(ip))
199                                                        .collect(Collectors.toList());
200            toRemoveProcessors.forEach(processors::remove);
201        });
202    }
203
204    protected JsonNode getJson(Record record) {
205        String json = new String(record.getData(), UTF_8);
206        try {
207            return OBJECT_MAPPER.readTree(json);
208        } catch (JsonProcessingException e) {
209            log.error("Invalid JSON from record {}: {}", record, json, e);
210            return null;
211        }
212    }
213}