001/* 002 * (C) Copyright 2020 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.bulk.introspection; 020 021import static java.nio.charset.StandardCharsets.UTF_8; 022import static org.apache.commons.lang3.StringUtils.isBlank; 023 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.stream.Collectors; 028 029import org.apache.logging.log4j.Logger; 030import org.nuxeo.lib.stream.computation.AbstractComputation; 031import org.nuxeo.lib.stream.computation.ComputationContext; 032import org.nuxeo.lib.stream.computation.Record; 033import org.nuxeo.runtime.api.Framework; 034import org.nuxeo.runtime.kv.KeyValueService; 035import org.nuxeo.runtime.kv.KeyValueStore; 036 037import com.fasterxml.jackson.core.JsonProcessingException; 038import com.fasterxml.jackson.databind.JsonNode; 039import com.fasterxml.jackson.databind.ObjectMapper; 040import com.fasterxml.jackson.databind.node.ArrayNode; 041import com.fasterxml.jackson.databind.node.ObjectNode; 042 043/** 044 * A computation that reads processor and metrics streams to build a representation of stream activities in the cluster. 045 * The representation is pushed to the KV Store. 046 * 047 * @since 11.5 048 */ 049public class StreamIntrospectionComputation extends AbstractComputation { 050 private static final Logger log = org.apache.logging.log4j.LogManager.getLogger( 051 StreamIntrospectionComputation.class); 052 053 public static final String NAME = "stream/introspection"; 054 055 public static final String INTROSPECTION_KV_STORE = "introspection"; 056 057 public static final String INTROSPECTION_KEY = "streamIntrospection"; 058 059 protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); 060 061 protected final Map<String, JsonNode> streams = new HashMap<>(); 062 063 protected final Map<String, JsonNode> processors = new HashMap<>(); 064 065 protected final Map<String, JsonNode> metrics = new HashMap<>(); 066 067 protected static final long TTL_SECONDS = 300; 068 069 protected String model; 070 071 public StreamIntrospectionComputation() { 072 super(NAME, 2, 0); 073 } 074 075 @Override 076 public void init(ComputationContext context) { 077 if (context.isSpareComputation()) { 078 log.info("Spare instance nothing to report"); 079 } else { 080 log.warn("Instance elected to introspect Nuxeo Stream activity"); 081 } 082 loadModel(getKvStore().getString(INTROSPECTION_KEY)); 083 } 084 085 protected void loadModel(String modelJson) { 086 streams.clear(); 087 processors.clear(); 088 metrics.clear(); 089 if (isBlank(modelJson)) { 090 model = null; 091 return; 092 } 093 try { 094 JsonNode modelNode = OBJECT_MAPPER.readTree(modelJson); 095 JsonNode node = modelNode.get("streams"); 096 if (node.isArray()) { 097 for (JsonNode item : node) { 098 streams.put(item.get("name").asText(), item); 099 } 100 } 101 node = modelNode.get("processors"); 102 if (node.isArray()) { 103 for (JsonNode item : node) { 104 processors.put(getProcessorKey(item), item); 105 } 106 } 107 node = modelNode.get("metrics"); 108 if (node.isArray()) { 109 for (JsonNode item : node) { 110 metrics.put(item.get("ip").asText(), item); 111 } 112 } 113 model = modelJson; 114 } catch (JsonProcessingException e) { 115 log.error("Unable to parse KV model as JSON {}", modelJson, e); 116 model = null; 117 } 118 } 119 120 @Override 121 public void processRecord(ComputationContext context, String inputStreamName, Record record) { 122 JsonNode json = getJson(record); 123 if (json != null) { 124 if (INPUT_1.equals(inputStreamName)) { 125 updateStreamsAndProcessors(json); 126 } else if (INPUT_2.equals(inputStreamName)) { 127 if (json.has("ip")) { 128 metrics.put(json.get("ip").asText(), json); 129 } 130 } 131 } 132 removeOldNodes(); 133 buildModel(); 134 updateModel(); 135 context.askForCheckpoint(); 136 } 137 138 protected void updateStreamsAndProcessors(JsonNode node) { 139 JsonNode streamsNode = node.get("streams"); 140 if (streamsNode == null) { 141 log.warn("Invalid metric without streams field: {}", node); 142 return; 143 } 144 if (streamsNode.isArray()) { 145 for (JsonNode item : streamsNode) { 146 streams.put(item.get("name").asText(), item); 147 } 148 } 149 ((ObjectNode) node).remove("streams"); 150 processors.put(getProcessorKey(node), node); 151 } 152 153 protected String getProcessorKey(JsonNode json) { 154 return json.at("/metadata/ip").asText() + ":" + json.at("/metadata/processorName").asText(); 155 } 156 157 protected void updateModel() { 158 KeyValueStore kv = getKvStore(); 159 kv.put(INTROSPECTION_KEY, model); 160 } 161 162 protected KeyValueStore getKvStore() { 163 return Framework.getService(KeyValueService.class).getKeyValueStore(INTROSPECTION_KV_STORE); 164 } 165 166 protected void buildModel() { 167 ObjectNode node = OBJECT_MAPPER.createObjectNode(); 168 ArrayNode streamsNode = OBJECT_MAPPER.createArrayNode(); 169 streamsNode.addAll(streams.values()); 170 node.set("streams", streamsNode); 171 ArrayNode processorsNode = OBJECT_MAPPER.createArrayNode(); 172 processorsNode.addAll(processors.values()); 173 node.set("processors", processorsNode); 174 ArrayNode metricsNode = OBJECT_MAPPER.createArrayNode(); 175 metricsNode.addAll(metrics.values()); 176 node.set("metrics", metricsNode); 177 try { 178 model = OBJECT_MAPPER.writer().writeValueAsString(node); 179 } catch (JsonProcessingException e) { 180 log.error("Cannot build JSON model", e); 181 model = "{}"; 182 } 183 } 184 185 protected void removeOldNodes() { 186 // Remove all nodes with metrics older than TTL 187 long now = System.currentTimeMillis() / 1000; 188 List<String> toRemove = metrics.values() 189 .stream() 190 .filter(json -> (now - json.get("timestamp").asLong()) > TTL_SECONDS) 191 .map(json -> json.get("ip").asText()) 192 .collect(Collectors.toList()); 193 log.debug("Removing nodes: {}", toRemove); 194 toRemove.forEach(metrics::remove); 195 toRemove.forEach(ip -> { 196 List<String> toRemoveProcessors = processors.keySet() 197 .stream() 198 .filter(key -> key.startsWith(ip)) 199 .collect(Collectors.toList()); 200 toRemoveProcessors.forEach(processors::remove); 201 }); 202 } 203 204 protected JsonNode getJson(Record record) { 205 String json = new String(record.getData(), UTF_8); 206 try { 207 return OBJECT_MAPPER.readTree(json); 208 } catch (JsonProcessingException e) { 209 log.error("Invalid JSON from record {}: {}", record, json, e); 210 return null; 211 } 212 } 213}