001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.function.Supplier; 028import java.util.stream.Collectors; 029 030import org.jgrapht.experimental.dag.DirectedAcyclicGraph; 031import org.jgrapht.graph.DefaultEdge; 032 033/** 034 * Represent a Directed Acyclic Graph (DAG) of computations. 035 * 036 * @since 9.3 037 */ 038public class Topology { 039 040 protected final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using 041 042 protected final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>(); 043 044 // dag 045 protected final Map<String, Supplier<Computation>> supplierMap = new HashMap<>(); 046 047 protected final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class); 048 049 protected Topology(Builder builder) { 050 this.supplierMap.putAll(builder.suppliersMap); 051 builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta)); 052 this.metadataList = new ArrayList<>(builder.metadataSet.size()); 053 try { 054 generateDag(builder.metadataSet); 055 } catch (DirectedAcyclicGraph.CycleFoundException e) { 056 throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e); 057 } 058 } 059 060 public static Builder builder() { 061 return new Builder(); 062 } 063 064 /** 065 * A plantuml representation of the topology. 066 */ 067 public String toPlantuml() { 068 return toPlantuml(new Settings(0, 0)); 069 } 070 071 @SuppressWarnings("SpellCheckingInspection") 072 public String toPlantuml(Settings settings) { 073 StringBuilder ret = new StringBuilder(); 074 ret.append("@startuml\n"); 075 for (Vertex vertex : dag) { 076 if (VertexType.COMPUTATION.equals(vertex.getType())) { 077 String name = vertex.getName(); 078 int concurrency = settings.getConcurrency(vertex.getName()); 079 ret.append(String.format("node %s [%s\n----\n%d threads]\n", name, name, concurrency)); 080 } else if (VertexType.STREAM.equals(vertex.getType())) { 081 String name = vertex.getName(); 082 int partitions = settings.getPartitions(vertex.getName()); 083 ret.append(String.format("queue %s [%s\n----\n%d partitions]\n", name, name, partitions)); 084 } 085 } 086 for (DefaultEdge edge : dag.edgeSet()) { 087 ret.append( 088 String.format("%s==>%s\n", dag.getEdgeSource(edge).getName(), dag.getEdgeTarget(edge).getName())); 089 } 090 ret.append("@enduml\n"); 091 return ret.toString().replace("\n1 partitions", "\n1 partition").replace("\n1 threads", "\n1 thread"); 092 } 093 094 protected void generateDag(Set<ComputationMetadataMapping> metadataSet) 095 throws DirectedAcyclicGraph.CycleFoundException { 096 for (ComputationMetadata metadata : metadataSet) { 097 Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name); 098 dag.addVertex(computationVertex); 099 if (metadata.outputStreams != null) { 100 for (String stream : metadata.outputStreams) { 101 Vertex streamVertex = new Vertex(VertexType.STREAM, stream); 102 dag.addVertex(streamVertex); 103 dag.addDagEdge(computationVertex, streamVertex); 104 } 105 } 106 if (metadata.inputStreams() != null) { 107 for (String streamName : metadata.inputStreams()) { 108 Vertex streamVertex = new Vertex(VertexType.STREAM, streamName); 109 dag.addVertex(streamVertex); 110 dag.addDagEdge(streamVertex, computationVertex); 111 } 112 } 113 } 114 for (Vertex vertex : dag) { 115 if (VertexType.COMPUTATION.equals(vertex.getType())) { 116 for (ComputationMetadataMapping metadata : metadataSet) { 117 if (vertex.getName().equals(metadata.name)) { 118 metadataList.add(metadata); 119 break; 120 } 121 } 122 } 123 } 124 } 125 126 public ComputationMetadataMapping getMetadata(String name) { 127 return metadataMap.get(name); 128 } 129 130 public Supplier<Computation> getSupplier(String name) { 131 return supplierMap.get(name); 132 } 133 134 public boolean isSource(String name) { 135 return getParents(name).isEmpty(); 136 } 137 138 public boolean isSink(String name) { 139 return getChildren(name).isEmpty(); 140 } 141 142 public Set<String> streamsSet() { 143 Set<String> ret = new HashSet<>(); 144 for (ComputationMetadata metadata : this.metadataList) { 145 ret.addAll(metadata.inputStreams()); 146 ret.addAll(metadata.outputStreams()); 147 } 148 return ret; 149 } 150 151 public Set<String> streamsSet(String root) { 152 Set<String> ret = new HashSet<>(); 153 for (String name : getDescendantComputationNames(root)) { 154 ComputationMetadataMapping meta = getMetadata(name); 155 ret.addAll(meta.inputStreams()); 156 ret.addAll(meta.outputStreams()); 157 } 158 return ret; 159 } 160 161 public List<ComputationMetadataMapping> metadataList() { 162 return metadataList; 163 } 164 165 protected Vertex getVertex(String name) { 166 Vertex ret; 167 if (metadataMap.containsKey(name)) { 168 ret = new Vertex(VertexType.COMPUTATION, name); 169 } else if (streamsSet().contains(name)) { 170 ret = new Vertex(VertexType.STREAM, name); 171 } else { 172 throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag); 173 } 174 return ret; 175 } 176 177 public Set<String> getDescendants(String name) { 178 Vertex start = getVertex(name); 179 return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 180 } 181 182 public Set<String> getDescendantComputationNames(String name) { 183 Vertex start = getVertex(name); 184 return dag.getDescendants(dag, start) 185 .stream() 186 .filter(vertex -> vertex.type == VertexType.COMPUTATION) 187 .map(vertex -> vertex.name) 188 .collect(Collectors.toSet()); 189 } 190 191 public Set<String> getChildren(String name) { 192 Vertex start = getVertex(name); 193 return dag.outgoingEdgesOf(start).stream().map(edge -> dag.getEdgeTarget(edge).getName()).collect( 194 Collectors.toSet()); 195 } 196 197 public Set<String> getChildrenComputationNames(String name) { 198 Vertex start = getVertex(name); 199 Set<String> children = getChildren(name); 200 if (start.type == VertexType.STREAM) { 201 return children; 202 } 203 Set<String> ret = new HashSet<>(); 204 children.forEach(child -> ret.addAll(getChildren(child))); 205 return ret; 206 } 207 208 public Set<String> getParents(String name) { 209 Vertex start = getVertex(name); 210 return dag.incomingEdgesOf(start).stream().map(edge -> dag.getEdgeSource(edge).getName()).collect( 211 Collectors.toSet()); 212 } 213 214 public Set<String> getParentComputationsNames(String name) { 215 Vertex start = getVertex(name); 216 Set<String> parents = getParents(name); 217 if (start.type == VertexType.STREAM) { 218 return parents; 219 } 220 Set<String> ret = new HashSet<>(); 221 parents.forEach(parent -> ret.addAll(getParents(parent))); 222 return ret; 223 } 224 225 public Set<String> getAncestorComputationNames(String name) { 226 Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name)); 227 return ancestors.stream() 228 .filter(vertex -> vertex.type == VertexType.COMPUTATION) 229 .map(vertex -> vertex.name) 230 .collect(Collectors.toSet()); 231 } 232 233 public Set<String> getAncestors(String name) { 234 Vertex start = getVertex(name); 235 return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 236 } 237 238 public Set<String> getRoots() { 239 Set<String> ret = new HashSet<>(); 240 for (Vertex vertex : dag) { 241 if (dag.getAncestors(dag, vertex).isEmpty()) { 242 ret.add(vertex.getName()); 243 } 244 } 245 return ret; 246 } 247 248 public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() { 249 return dag; 250 } 251 252 protected enum VertexType { 253 COMPUTATION, STREAM 254 } 255 256 public static class Builder { 257 final Set<ComputationMetadataMapping> metadataSet = new HashSet<>(); 258 259 final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>(); 260 261 public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) { 262 Map<String, String> map = new HashMap<>(mapping.size()); 263 mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1])); 264 ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map); 265 metadataSet.add(meta); 266 suppliersMap.put(meta.name, supplier); 267 return this; 268 } 269 270 public Topology build() { 271 return new Topology(this); 272 } 273 } 274 275 public static class Vertex { 276 protected final String name; 277 278 protected final VertexType type; 279 280 public Vertex(VertexType type, String name) { 281 this.type = type; 282 this.name = name; 283 } 284 285 public String getName() { 286 return name; 287 } 288 289 public VertexType getType() { 290 return type; 291 } 292 293 @Override 294 public boolean equals(Object o) { 295 if (this == o) 296 return true; 297 if (o == null || getClass() != o.getClass()) 298 return false; 299 300 Vertex myVertex = (Vertex) o; 301 302 return name.equals(myVertex.name) && type == myVertex.type; 303 304 } 305 306 @Override 307 public String toString() { 308 return "Vertex{" + "name='" + name + '\'' + ", type=" + type + '}'; 309 } 310 311 @Override 312 public int hashCode() { 313 int result = name.hashCode(); 314 result = 31 * result + type.hashCode(); 315 return result; 316 } 317 } 318}