001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.function.Supplier; 028import java.util.stream.Collectors; 029 030import org.jgrapht.experimental.dag.DirectedAcyclicGraph; 031import org.jgrapht.graph.DefaultEdge; 032 033/** 034 * Represent a Directed Acyclic Graph (DAG) of computations. 035 * 036 * @since 9.3 037 */ 038public class Topology { 039 040 protected final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using 041 042 protected final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>(); 043 044 // dag 045 protected final Map<String, Supplier<Computation>> supplierMap = new HashMap<>(); 046 047 protected final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class); 048 049 protected Topology(Builder builder) { 050 this.supplierMap.putAll(builder.suppliersMap); 051 builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta)); 052 this.metadataList = new ArrayList<>(builder.metadataSet.size()); 053 try { 054 generateDag(builder.metadataSet); 055 } catch (DirectedAcyclicGraph.CycleFoundException e) { 056 throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e); 057 } 058 } 059 060 public static Builder builder() { 061 return new Builder(); 062 } 063 064 /** 065 * A plantuml representation of the topology. 066 */ 067 public String toPlantuml() { 068 return toPlantuml(new Settings(0, 0)); 069 } 070 071 @SuppressWarnings("SpellCheckingInspection") 072 public String toPlantuml(Settings settings) { 073 StringBuilder ret = new StringBuilder(); 074 ret.append("@startuml\n"); 075 for (Vertex vertex : dag) { 076 if (VertexType.COMPUTATION.equals(vertex.getType())) { 077 String name = vertex.getName(); 078 int concurrency = settings.getConcurrency(vertex.getName()); 079 ret.append(String.format("node %s [%s%n----%n%d threads]%n", name, name, concurrency)); 080 } else if (VertexType.STREAM.equals(vertex.getType())) { 081 String name = vertex.getName(); 082 int partitions = settings.getPartitions(vertex.getName()); 083 ret.append(String.format("queue %s [%s%n----%n%d partitions]%n", name, name, partitions)); 084 } 085 } 086 for (DefaultEdge edge : dag.edgeSet()) { 087 ret.append( 088 String.format("%s==>%s%n", dag.getEdgeSource(edge).getName(), dag.getEdgeTarget(edge).getName())); 089 } 090 ret.append("@enduml\n"); 091 return ret.toString().replace("%n1 partitions", "%n1 partition").replace("%n1 threads", "%n1 thread"); 092 } 093 094 protected void generateDag(Set<ComputationMetadataMapping> metadataSet) 095 throws DirectedAcyclicGraph.CycleFoundException { 096 for (ComputationMetadata metadata : metadataSet) { 097 Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name); 098 dag.addVertex(computationVertex); 099 if (metadata.outputStreams != null) { 100 for (String stream : metadata.outputStreams) { 101 Vertex streamVertex = new Vertex(VertexType.STREAM, stream); 102 dag.addVertex(streamVertex); 103 dag.addDagEdge(computationVertex, streamVertex); 104 } 105 } 106 if (metadata.inputStreams() != null) { 107 for (String streamName : metadata.inputStreams()) { 108 Vertex streamVertex = new Vertex(VertexType.STREAM, streamName); 109 dag.addVertex(streamVertex); 110 dag.addDagEdge(streamVertex, computationVertex); 111 } 112 } 113 } 114 generateMetadataMapping(metadataSet); 115 } 116 117 protected void generateMetadataMapping(Set<ComputationMetadataMapping> metadataSet) { 118 for (Vertex vertex : dag) { 119 if (VertexType.COMPUTATION.equals(vertex.getType())) { 120 for (ComputationMetadataMapping metadata : metadataSet) { 121 if (vertex.getName().equals(metadata.name)) { 122 metadataList.add(metadata); 123 break; 124 } 125 } 126 } 127 } 128 } 129 130 public ComputationMetadataMapping getMetadata(String name) { 131 return metadataMap.get(name); 132 } 133 134 public Supplier<Computation> getSupplier(String name) { 135 return supplierMap.get(name); 136 } 137 138 public boolean isSource(String name) { 139 return getParents(name).isEmpty(); 140 } 141 142 public boolean isSink(String name) { 143 return getChildren(name).isEmpty(); 144 } 145 146 public Set<String> streamsSet() { 147 Set<String> ret = new HashSet<>(); 148 for (ComputationMetadata metadata : this.metadataList) { 149 ret.addAll(metadata.inputStreams()); 150 ret.addAll(metadata.outputStreams()); 151 } 152 return ret; 153 } 154 155 public Set<String> streamsSet(String root) { 156 Set<String> ret = new HashSet<>(); 157 for (String name : getDescendantComputationNames(root)) { 158 ComputationMetadataMapping meta = getMetadata(name); 159 ret.addAll(meta.inputStreams()); 160 ret.addAll(meta.outputStreams()); 161 } 162 return ret; 163 } 164 165 public List<ComputationMetadataMapping> metadataList() { 166 return metadataList; 167 } 168 169 protected Vertex getVertex(String name) { 170 Vertex ret; 171 if (metadataMap.containsKey(name)) { 172 ret = new Vertex(VertexType.COMPUTATION, name); 173 } else if (streamsSet().contains(name)) { 174 ret = new Vertex(VertexType.STREAM, name); 175 } else { 176 throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag); 177 } 178 return ret; 179 } 180 181 public Set<String> getDescendants(String name) { 182 Vertex start = getVertex(name); 183 return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 184 } 185 186 public Set<String> getDescendantComputationNames(String name) { 187 Vertex start = getVertex(name); 188 return dag.getDescendants(dag, start) 189 .stream() 190 .filter(vertex -> vertex.type == VertexType.COMPUTATION) 191 .map(vertex -> vertex.name) 192 .collect(Collectors.toSet()); 193 } 194 195 public Set<String> getChildren(String name) { 196 Vertex start = getVertex(name); 197 return dag.outgoingEdgesOf(start) 198 .stream() 199 .map(edge -> dag.getEdgeTarget(edge).getName()) 200 .collect(Collectors.toSet()); 201 } 202 203 public Set<String> getChildrenComputationNames(String name) { 204 Vertex start = getVertex(name); 205 Set<String> children = getChildren(name); 206 if (start.type == VertexType.STREAM) { 207 return children; 208 } 209 Set<String> ret = new HashSet<>(); 210 children.forEach(child -> ret.addAll(getChildren(child))); 211 return ret; 212 } 213 214 public Set<String> getParents(String name) { 215 Vertex start = getVertex(name); 216 return dag.incomingEdgesOf(start) 217 .stream() 218 .map(edge -> dag.getEdgeSource(edge).getName()) 219 .collect(Collectors.toSet()); 220 } 221 222 public Set<String> getParentComputationsNames(String name) { 223 Vertex start = getVertex(name); 224 Set<String> parents = getParents(name); 225 if (start.type == VertexType.STREAM) { 226 return parents; 227 } 228 Set<String> ret = new HashSet<>(); 229 parents.forEach(parent -> ret.addAll(getParents(parent))); 230 return ret; 231 } 232 233 public Set<String> getAncestorComputationNames(String name) { 234 Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name)); 235 return ancestors.stream() 236 .filter(vertex -> vertex.type == VertexType.COMPUTATION) 237 .map(vertex -> vertex.name) 238 .collect(Collectors.toSet()); 239 } 240 241 public Set<String> getAncestors(String name) { 242 Vertex start = getVertex(name); 243 return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 244 } 245 246 public Set<String> getRoots() { 247 Set<String> ret = new HashSet<>(); 248 for (Vertex vertex : dag) { 249 if (dag.getAncestors(dag, vertex).isEmpty()) { 250 ret.add(vertex.getName()); 251 } 252 } 253 return ret; 254 } 255 256 public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() { 257 return dag; 258 } 259 260 protected enum VertexType { 261 COMPUTATION, STREAM 262 } 263 264 public static class Builder { 265 final Set<ComputationMetadataMapping> metadataSet = new HashSet<>(); 266 267 final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>(); 268 269 public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) { 270 Map<String, String> map = new HashMap<>(mapping.size()); 271 mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1])); 272 ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map); 273 metadataSet.add(meta); 274 suppliersMap.put(meta.name, supplier); 275 return this; 276 } 277 278 public Topology build() { 279 return new Topology(this); 280 } 281 } 282 283 public static class Vertex { 284 protected final String name; 285 286 protected final VertexType type; 287 288 public Vertex(VertexType type, String name) { 289 this.type = type; 290 this.name = name; 291 } 292 293 public String getName() { 294 return name; 295 } 296 297 public VertexType getType() { 298 return type; 299 } 300 301 @Override 302 public boolean equals(Object o) { 303 if (this == o) 304 return true; 305 if (o == null || getClass() != o.getClass()) 306 return false; 307 308 Vertex myVertex = (Vertex) o; 309 310 return name.equals(myVertex.name) && type == myVertex.type; 311 312 } 313 314 @Override 315 public String toString() { 316 return "Vertex{" + "name='" + name + '\'' + ", type=" + type + '}'; 317 } 318 319 @Override 320 public int hashCode() { 321 int result = name.hashCode(); 322 result = 31 * result + type.hashCode(); 323 return result; 324 } 325 } 326}