001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.function.Supplier; 028import java.util.stream.Collectors; 029 030import org.jgrapht.experimental.dag.DirectedAcyclicGraph; 031import org.jgrapht.graph.DefaultEdge; 032 033/** 034 * Represent a Directed Acyclic Graph (DAG) of computations. 035 * 036 * @since 9.3 037 */ 038public class Topology { 039 040 protected final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using 041 042 protected final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>(); 043 044 // dag 045 protected final Map<String, Supplier<Computation>> supplierMap = new HashMap<>(); 046 047 protected final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class); 048 049 protected Topology(Builder builder) { 050 this.supplierMap.putAll(builder.suppliersMap); 051 builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta)); 052 this.metadataList = new ArrayList<>(builder.metadataSet.size()); 053 try { 054 generateDag(builder.metadataSet); 055 } catch (DirectedAcyclicGraph.CycleFoundException e) { 056 throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e); 057 } 058 } 059 060 public static Builder builder() { 061 return new Builder(); 062 } 063 064 /** 065 * A plantuml representation of the topology. 066 */ 067 public String toPlantuml() { 068 return toPlantuml(new Settings(0, 0)); 069 } 070 071 @SuppressWarnings("SpellCheckingInspection") 072 public String toPlantuml(Settings settings) { 073 StringBuilder ret = new StringBuilder(); 074 ret.append("@startuml\n"); 075 for (Vertex vertex : dag) { 076 if (VertexType.COMPUTATION.equals(vertex.getType())) { 077 String name = vertex.getName(); 078 int concurrency = settings.getConcurrency(vertex.getName()); 079 ret.append(String.format("node %s [%s%n----%n%s%nConcurrency: %d threads]%n", getPumlName(vertex), name, 080 compactPolicy(settings.getPolicy(vertex.getName())), concurrency)); 081 } else if (VertexType.STREAM.equals(vertex.getType())) { 082 String name = vertex.getName(); 083 int partitions = settings.getPartitions(vertex.getName()); 084 ret.append(String.format("queue %s [%s%n----%n%d partitions%ncodec: %s]%n", getPumlName(vertex), name, 085 partitions, settings.getCodec(name).getName())); 086 } 087 } 088 for (DefaultEdge edge : dag.edgeSet()) { 089 ret.append( 090 String.format("%s==>%s%n", getPumlName(dag.getEdgeSource(edge)), getPumlName(dag.getEdgeTarget(edge)))); 091 } 092 ret.append("@enduml\n"); 093 return ret.toString().replace("%n1 partitions", "%n1 partition").replace("%n1 threads", "%n1 thread"); 094 } 095 096 protected String compactPolicy(ComputationPolicy policy) { 097 return String.format("Continue on failure: %s%nRetries: %d, %d ms%nBatch: %d, %dms", policy.continueOnFailure(), 098 policy.getRetryPolicy().getMaxRetries(), policy.getRetryPolicy().getDelay().toMillis(), policy.getBatchCapacity(), 099 policy.getBatchThreshold().toMillis()); 100 } 101 102 protected String getPumlName(Topology.Vertex vertex) { 103 if (VertexType.COMPUTATION.equals(vertex.getType())) { 104 return getPumlIdentifier(vertex.getName()) + "Comp"; 105 } 106 return getPumlIdentifier(vertex.getName()); 107 } 108 109 protected String getPumlIdentifier(String name) { 110 return name.replaceAll("[^a-zA-Z]", "."); 111 } 112 113 protected void generateDag(Set<ComputationMetadataMapping> metadataSet) 114 throws DirectedAcyclicGraph.CycleFoundException { 115 for (ComputationMetadata metadata : metadataSet) { 116 Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name); 117 dag.addVertex(computationVertex); 118 if (metadata.outputStreams != null) { 119 for (String stream : metadata.outputStreams) { 120 Vertex streamVertex = new Vertex(VertexType.STREAM, stream); 121 dag.addVertex(streamVertex); 122 dag.addDagEdge(computationVertex, streamVertex); 123 } 124 } 125 if (metadata.inputStreams() != null) { 126 for (String streamName : metadata.inputStreams()) { 127 Vertex streamVertex = new Vertex(VertexType.STREAM, streamName); 128 dag.addVertex(streamVertex); 129 dag.addDagEdge(streamVertex, computationVertex); 130 } 131 } 132 } 133 generateMetadataMapping(metadataSet); 134 } 135 136 protected void generateMetadataMapping(Set<ComputationMetadataMapping> metadataSet) { 137 for (Vertex vertex : dag) { 138 if (VertexType.COMPUTATION.equals(vertex.getType())) { 139 for (ComputationMetadataMapping metadata : metadataSet) { 140 if (vertex.getName().equals(metadata.name)) { 141 metadataList.add(metadata); 142 break; 143 } 144 } 145 } 146 } 147 } 148 149 public ComputationMetadataMapping getMetadata(String name) { 150 return metadataMap.get(name); 151 } 152 153 public Supplier<Computation> getSupplier(String name) { 154 return supplierMap.get(name); 155 } 156 157 public boolean isSource(String name) { 158 return getParents(name).isEmpty(); 159 } 160 161 public boolean isSink(String name) { 162 return getChildren(name).isEmpty(); 163 } 164 165 public Set<String> streamsSet() { 166 Set<String> ret = new HashSet<>(); 167 for (ComputationMetadata metadata : this.metadataList) { 168 ret.addAll(metadata.inputStreams()); 169 ret.addAll(metadata.outputStreams()); 170 } 171 return ret; 172 } 173 174 public Set<String> streamsSet(String root) { 175 Set<String> ret = new HashSet<>(); 176 for (String name : getDescendantComputationNames(root)) { 177 ComputationMetadataMapping meta = getMetadata(name); 178 ret.addAll(meta.inputStreams()); 179 ret.addAll(meta.outputStreams()); 180 } 181 return ret; 182 } 183 184 public List<ComputationMetadataMapping> metadataList() { 185 return metadataList; 186 } 187 188 protected Vertex getVertex(String name) { 189 Vertex ret; 190 if (metadataMap.containsKey(name)) { 191 ret = new Vertex(VertexType.COMPUTATION, name); 192 } else if (streamsSet().contains(name)) { 193 ret = new Vertex(VertexType.STREAM, name); 194 } else { 195 throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag); 196 } 197 return ret; 198 } 199 200 public Set<String> getDescendants(String name) { 201 Vertex start = getVertex(name); 202 return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 203 } 204 205 public Set<String> getDescendantComputationNames(String name) { 206 Vertex start = getVertex(name); 207 return dag.getDescendants(dag, start) 208 .stream() 209 .filter(vertex -> vertex.type == VertexType.COMPUTATION) 210 .map(vertex -> vertex.name) 211 .collect(Collectors.toSet()); 212 } 213 214 public Set<String> getChildren(String name) { 215 Vertex start = getVertex(name); 216 return dag.outgoingEdgesOf(start) 217 .stream() 218 .map(edge -> dag.getEdgeTarget(edge).getName()) 219 .collect(Collectors.toSet()); 220 } 221 222 public Set<String> getChildrenComputationNames(String name) { 223 Vertex start = getVertex(name); 224 Set<String> children = getChildren(name); 225 if (start.type == VertexType.STREAM) { 226 return children; 227 } 228 Set<String> ret = new HashSet<>(); 229 children.forEach(child -> ret.addAll(getChildren(child))); 230 return ret; 231 } 232 233 public Set<String> getParents(String name) { 234 Vertex start = getVertex(name); 235 return dag.incomingEdgesOf(start) 236 .stream() 237 .map(edge -> dag.getEdgeSource(edge).getName()) 238 .collect(Collectors.toSet()); 239 } 240 241 public Set<String> getParentComputationsNames(String name) { 242 Vertex start = getVertex(name); 243 Set<String> parents = getParents(name); 244 if (start.type == VertexType.STREAM) { 245 return parents; 246 } 247 Set<String> ret = new HashSet<>(); 248 parents.forEach(parent -> ret.addAll(getParents(parent))); 249 return ret; 250 } 251 252 public Set<String> getAncestorComputationNames(String name) { 253 Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name)); 254 return ancestors.stream() 255 .filter(vertex -> vertex.type == VertexType.COMPUTATION) 256 .map(vertex -> vertex.name) 257 .collect(Collectors.toSet()); 258 } 259 260 public Set<String> getAncestors(String name) { 261 Vertex start = getVertex(name); 262 return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 263 } 264 265 public Set<String> getRoots() { 266 Set<String> ret = new HashSet<>(); 267 for (Vertex vertex : dag) { 268 if (dag.getAncestors(dag, vertex).isEmpty()) { 269 ret.add(vertex.getName()); 270 } 271 } 272 return ret; 273 } 274 275 public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() { 276 return dag; 277 } 278 279 public enum VertexType { 280 COMPUTATION, STREAM 281 } 282 283 public static class Builder { 284 final Set<ComputationMetadataMapping> metadataSet = new HashSet<>(); 285 286 final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>(); 287 288 public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) { 289 Map<String, String> map = new HashMap<>(mapping.size()); 290 mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1])); 291 ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map); 292 metadataSet.add(meta); 293 suppliersMap.put(meta.name, supplier); 294 return this; 295 } 296 297 public Topology build() { 298 return new Topology(this); 299 } 300 } 301 302 public static class Vertex { 303 protected final String name; 304 305 protected final VertexType type; 306 307 public Vertex(VertexType type, String name) { 308 this.type = type; 309 this.name = name; 310 } 311 312 public String getName() { 313 return name; 314 } 315 316 public VertexType getType() { 317 return type; 318 } 319 320 @Override 321 public boolean equals(Object o) { 322 if (this == o) 323 return true; 324 if (o == null || getClass() != o.getClass()) 325 return false; 326 327 Vertex myVertex = (Vertex) o; 328 329 return name.equals(myVertex.name) && type == myVertex.type; 330 331 } 332 333 @Override 334 public String toString() { 335 return "Vertex{" + "name='" + name + '\'' + ", type=" + type + '}'; 336 } 337 338 @Override 339 public int hashCode() { 340 int result = name.hashCode(); 341 result = 31 * result + type.hashCode(); 342 return result; 343 } 344 } 345}