001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation; 020 021import org.jgrapht.experimental.dag.DirectedAcyclicGraph; 022import org.jgrapht.graph.DefaultEdge; 023 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.function.Supplier; 031import java.util.stream.Collectors; 032 033 034/** 035 * @since 9.2 036 */ 037public class Topology { 038 039 private enum VertexType { 040 COMPUTATION, STREAM 041 } 042 043 private final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using dag 044 private final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>(); 045 private final Map<String, Supplier<Computation>> supplierMap = new HashMap<>(); 046 private final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class); 047 048 049 private Topology(Builder builder) { 050 this.supplierMap.putAll(builder.suppliersMap); 051 builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta)); 052 this.metadataList = new ArrayList<>(builder.metadataSet.size()); 053 try { 054 generateDag(builder.metadataSet); 055 } catch (DirectedAcyclicGraph.CycleFoundException e) { 056 throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e); 057 } 058 } 059 060 /** 061 * A plantuml representation of the topology. 062 */ 063 public String toPlantuml() { 064 return toPlantuml(new Settings(0, 0)); 065 } 066 067 public String toPlantuml(Settings settings) { 068 StringBuilder ret = new StringBuilder(); 069 ret.append("@startuml\n"); 070 for (Vertex vertex : dag) { 071 if (VertexType.COMPUTATION.equals(vertex.getType())) { 072 ret.append("node " + vertex.getName() + "\n"); 073 int concurrency = settings.getConcurrency(vertex.getName()); 074 if (concurrency > 0) { 075 ret.append("note right: x" + concurrency + "\n"); 076 } 077 } else if (VertexType.STREAM.equals(vertex.getType())) { 078 ret.append("database " + vertex.getName() + "\n"); 079 } 080 } 081 for (DefaultEdge edge : dag.edgeSet()) { 082 ret.append(dag.getEdgeSource(edge).getName() + "==>" + dag.getEdgeTarget(edge).getName() + "\n"); 083 } 084 ret.append("@enduml\n"); 085 return ret.toString(); 086 } 087 088 private void generateDag(Set<ComputationMetadataMapping> metadataSet) throws DirectedAcyclicGraph.CycleFoundException { 089 for (ComputationMetadata metadata : metadataSet) { 090 Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name); 091 dag.addVertex(computationVertex); 092 if (metadata.ostreams != null) { 093 for (String stream : metadata.ostreams) { 094 Vertex streamVertex = new Vertex(VertexType.STREAM, stream); 095 dag.addVertex(streamVertex); 096 dag.addDagEdge(computationVertex, streamVertex); 097 } 098 } 099 if (metadata.istreams != null) { 100 for (String streamName : metadata.istreams) { 101 Vertex streamVertex = new Vertex(VertexType.STREAM, streamName); 102 dag.addVertex(streamVertex); 103 dag.addDagEdge(streamVertex, computationVertex); 104 } 105 } 106 } 107 for (Vertex vertex : dag) { 108 if (VertexType.COMPUTATION.equals(vertex.getType())) { 109 for (ComputationMetadataMapping metadata : metadataSet) { 110 if (vertex.getName().equals(metadata.name)) { 111 metadataList.add(metadata); 112 break; 113 } 114 } 115 } 116 } 117 } 118 119 public ComputationMetadataMapping getMetadata(String name) { 120 return metadataMap.get(name); 121 } 122 123 public Supplier<Computation> getSupplier(String name) { 124 return supplierMap.get(name); 125 } 126 127 128 public boolean isSource(String name) { 129 return getParents(name).isEmpty(); 130 } 131 132 public boolean isSink(String name) { 133 return getChildren(name).isEmpty(); 134 } 135 136 public Set<String> streamsSet() { 137 Set<String> ret = new HashSet<>(); 138 for (ComputationMetadata metadata : this.metadataList) { 139 ret.addAll(metadata.istreams); 140 ret.addAll(metadata.ostreams); 141 } 142 return ret; 143 } 144 145 public Set<String> streamsSet(String root) { 146 Set<String> ret = new HashSet<>(); 147 for (String name : getDescendantComputationNames(root)) { 148 ComputationMetadataMapping meta = getMetadata(name); 149 ret.addAll(meta.istreams); 150 ret.addAll(meta.ostreams); 151 } 152 return ret; 153 } 154 155 156 public List<ComputationMetadataMapping> metadataList() { 157 return metadataList; 158 } 159 160 public static Builder builder() { 161 return new Builder(); 162 } 163 164 private Vertex getVertex(String name) { 165 Vertex ret; 166 if (metadataMap.containsKey(name)) { 167 ret = new Vertex(VertexType.COMPUTATION, name); 168 } else if (streamsSet().contains(name)) { 169 ret = new Vertex(VertexType.STREAM, name); 170 } else { 171 throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag); 172 } 173 return ret; 174 } 175 176 public Set<String> getDescendants(String name) { 177 Vertex start = getVertex(name); 178 return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 179 } 180 181 public Set<String> getDescendantComputationNames(String name) { 182 Vertex start = getVertex(name); 183 return dag.getDescendants(dag, start).stream().filter(vertex -> vertex.type == VertexType.COMPUTATION).map(vertex -> vertex.name).collect(Collectors.toSet()); 184 } 185 186 public Set<String> getChildren(String name) { 187 Vertex start = getVertex(name); 188 return dag.outgoingEdgesOf(start).stream().map(edge -> dag.getEdgeTarget(edge).getName()).collect(Collectors.toSet()); 189 } 190 191 public Set<String> getChildrenComputationNames(String name) { 192 Vertex start = getVertex(name); 193 Set<String> children = getChildren(name); 194 if (start.type == VertexType.STREAM) { 195 return children; 196 } 197 Set<String> ret = new HashSet<>(); 198 children.forEach(child -> ret.addAll(getChildren(child))); 199 return ret; 200 } 201 202 public Set<String> getParents(String name) { 203 Vertex start = getVertex(name); 204 return dag.incomingEdgesOf(start).stream().map(edge -> dag.getEdgeSource(edge).getName()).collect(Collectors.toSet()); 205 } 206 207 public Set<String> getParentComputationsNames(String name) { 208 Vertex start = getVertex(name); 209 Set<String> parents = getParents(name); 210 if (start.type == VertexType.STREAM) { 211 return parents; 212 } 213 Set<String> ret = new HashSet<>(); 214 parents.forEach(parent -> ret.addAll(getParents(parent))); 215 return ret; 216 } 217 218 public Set<String> getAncestorComputationNames(String name) { 219 Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name)); 220 return ancestors.stream().filter(vertex -> vertex.type == VertexType.COMPUTATION).map(vertex -> vertex.name).collect(Collectors.toSet()); 221 } 222 223 public Set<String> getAncestors(String name) { 224 Vertex start = getVertex(name); 225 return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet()); 226 } 227 228 public Set<String> getRoots() { 229 Set<String> ret = new HashSet<>(); 230 for (Vertex vertex : dag) { 231 if (dag.getAncestors(dag, vertex).isEmpty()) { 232 ret.add(vertex.getName()); 233 } 234 } 235 return ret; 236 } 237 238 public static class Builder { 239 final Set<ComputationMetadataMapping> metadataSet = new HashSet<>(); 240 final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>(); 241 242 public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) { 243 Map<String, String> map = new HashMap<>(mapping.size()); 244 mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1])); 245 ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map); 246 metadataSet.add(meta); 247 suppliersMap.put(meta.name, supplier); 248 return this; 249 } 250 251 public Topology build() { 252 return new Topology(this); 253 } 254 } 255 256 public class Vertex { 257 private final String name; 258 private final VertexType type; 259 260 public Vertex(VertexType type, String name) { 261 this.type = type; 262 this.name = name; 263 } 264 265 public String getName() { 266 return name; 267 } 268 269 public VertexType getType() { 270 return type; 271 } 272 273 @Override 274 public boolean equals(Object o) { 275 if (this == o) return true; 276 if (o == null || getClass() != o.getClass()) return false; 277 278 Vertex myVertex = (Vertex) o; 279 280 if (!name.equals(myVertex.name)) return false; 281 return type == myVertex.type; 282 283 } 284 285 @Override 286 public String toString() { 287 return "Vertex{" + 288 "name='" + name + '\'' + 289 ", type=" + type + 290 '}'; 291 } 292 293 @Override 294 public int hashCode() { 295 int result = name.hashCode(); 296 result = 31 * result + type.hashCode(); 297 return result; 298 } 299 } 300 301 public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() { 302 return dag; 303 } 304} 305