001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.function.Supplier;
028import java.util.stream.Collectors;
029
030import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
031import org.jgrapht.graph.DefaultEdge;
032
033/**
034 * Represent a Directed Acyclic Graph (DAG) of computations.
035 *
036 * @since 9.3
037 */
038public class Topology {
039
040    protected final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using
041
042    protected final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>();
043
044    // dag
045    protected final Map<String, Supplier<Computation>> supplierMap = new HashMap<>();
046
047    protected final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class);
048
049    protected Topology(Builder builder) {
050        this.supplierMap.putAll(builder.suppliersMap);
051        builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta));
052        this.metadataList = new ArrayList<>(builder.metadataSet.size());
053        try {
054            generateDag(builder.metadataSet);
055        } catch (DirectedAcyclicGraph.CycleFoundException e) {
056            throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e);
057        }
058    }
059
060    public static Builder builder() {
061        return new Builder();
062    }
063
064    /**
065     * A plantuml representation of the topology.
066     */
067    public String toPlantuml() {
068        return toPlantuml(new Settings(0, 0));
069    }
070
071    @SuppressWarnings("SpellCheckingInspection")
072    public String toPlantuml(Settings settings) {
073        StringBuilder ret = new StringBuilder();
074        ret.append("@startuml\n");
075        for (Vertex vertex : dag) {
076            if (VertexType.COMPUTATION.equals(vertex.getType())) {
077                String name = vertex.getName();
078                int concurrency = settings.getConcurrency(vertex.getName());
079                ret.append(String.format("node %s [%s%n----%n%d threads]%n", name, name, concurrency));
080            } else if (VertexType.STREAM.equals(vertex.getType())) {
081                String name = vertex.getName();
082                int partitions = settings.getPartitions(vertex.getName());
083                ret.append(String.format("queue %s [%s%n----%n%d partitions]%n", name, name, partitions));
084            }
085        }
086        for (DefaultEdge edge : dag.edgeSet()) {
087            ret.append(
088                    String.format("%s==>%s%n", dag.getEdgeSource(edge).getName(), dag.getEdgeTarget(edge).getName()));
089        }
090        ret.append("@enduml\n");
091        return ret.toString().replace("%n1 partitions", "%n1 partition").replace("%n1 threads", "%n1 thread");
092    }
093
094    protected void generateDag(Set<ComputationMetadataMapping> metadataSet)
095            throws DirectedAcyclicGraph.CycleFoundException {
096        for (ComputationMetadata metadata : metadataSet) {
097            Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name);
098            dag.addVertex(computationVertex);
099            if (metadata.outputStreams != null) {
100                for (String stream : metadata.outputStreams) {
101                    Vertex streamVertex = new Vertex(VertexType.STREAM, stream);
102                    dag.addVertex(streamVertex);
103                    dag.addDagEdge(computationVertex, streamVertex);
104                }
105            }
106            if (metadata.inputStreams() != null) {
107                for (String streamName : metadata.inputStreams()) {
108                    Vertex streamVertex = new Vertex(VertexType.STREAM, streamName);
109                    dag.addVertex(streamVertex);
110                    dag.addDagEdge(streamVertex, computationVertex);
111                }
112            }
113        }
114        generateMetadataMapping(metadataSet);
115    }
116
117    protected void generateMetadataMapping(Set<ComputationMetadataMapping> metadataSet) {
118        for (Vertex vertex : dag) {
119            if (VertexType.COMPUTATION.equals(vertex.getType())) {
120                for (ComputationMetadataMapping metadata : metadataSet) {
121                    if (vertex.getName().equals(metadata.name)) {
122                        metadataList.add(metadata);
123                        break;
124                    }
125                }
126            }
127        }
128    }
129
130    public ComputationMetadataMapping getMetadata(String name) {
131        return metadataMap.get(name);
132    }
133
134    public Supplier<Computation> getSupplier(String name) {
135        return supplierMap.get(name);
136    }
137
138    public boolean isSource(String name) {
139        return getParents(name).isEmpty();
140    }
141
142    public boolean isSink(String name) {
143        return getChildren(name).isEmpty();
144    }
145
146    public Set<String> streamsSet() {
147        Set<String> ret = new HashSet<>();
148        for (ComputationMetadata metadata : this.metadataList) {
149            ret.addAll(metadata.inputStreams());
150            ret.addAll(metadata.outputStreams());
151        }
152        return ret;
153    }
154
155    public Set<String> streamsSet(String root) {
156        Set<String> ret = new HashSet<>();
157        for (String name : getDescendantComputationNames(root)) {
158            ComputationMetadataMapping meta = getMetadata(name);
159            ret.addAll(meta.inputStreams());
160            ret.addAll(meta.outputStreams());
161        }
162        return ret;
163    }
164
165    public List<ComputationMetadataMapping> metadataList() {
166        return metadataList;
167    }
168
169    protected Vertex getVertex(String name) {
170        Vertex ret;
171        if (metadataMap.containsKey(name)) {
172            ret = new Vertex(VertexType.COMPUTATION, name);
173        } else if (streamsSet().contains(name)) {
174            ret = new Vertex(VertexType.STREAM, name);
175        } else {
176            throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag);
177        }
178        return ret;
179    }
180
181    public Set<String> getDescendants(String name) {
182        Vertex start = getVertex(name);
183        return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
184    }
185
186    public Set<String> getDescendantComputationNames(String name) {
187        Vertex start = getVertex(name);
188        return dag.getDescendants(dag, start)
189                  .stream()
190                  .filter(vertex -> vertex.type == VertexType.COMPUTATION)
191                  .map(vertex -> vertex.name)
192                  .collect(Collectors.toSet());
193    }
194
195    public Set<String> getChildren(String name) {
196        Vertex start = getVertex(name);
197        return dag.outgoingEdgesOf(start)
198                  .stream()
199                  .map(edge -> dag.getEdgeTarget(edge).getName())
200                  .collect(Collectors.toSet());
201    }
202
203    public Set<String> getChildrenComputationNames(String name) {
204        Vertex start = getVertex(name);
205        Set<String> children = getChildren(name);
206        if (start.type == VertexType.STREAM) {
207            return children;
208        }
209        Set<String> ret = new HashSet<>();
210        children.forEach(child -> ret.addAll(getChildren(child)));
211        return ret;
212    }
213
214    public Set<String> getParents(String name) {
215        Vertex start = getVertex(name);
216        return dag.incomingEdgesOf(start)
217                  .stream()
218                  .map(edge -> dag.getEdgeSource(edge).getName())
219                  .collect(Collectors.toSet());
220    }
221
222    public Set<String> getParentComputationsNames(String name) {
223        Vertex start = getVertex(name);
224        Set<String> parents = getParents(name);
225        if (start.type == VertexType.STREAM) {
226            return parents;
227        }
228        Set<String> ret = new HashSet<>();
229        parents.forEach(parent -> ret.addAll(getParents(parent)));
230        return ret;
231    }
232
233    public Set<String> getAncestorComputationNames(String name) {
234        Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name));
235        return ancestors.stream()
236                        .filter(vertex -> vertex.type == VertexType.COMPUTATION)
237                        .map(vertex -> vertex.name)
238                        .collect(Collectors.toSet());
239    }
240
241    public Set<String> getAncestors(String name) {
242        Vertex start = getVertex(name);
243        return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
244    }
245
246    public Set<String> getRoots() {
247        Set<String> ret = new HashSet<>();
248        for (Vertex vertex : dag) {
249            if (dag.getAncestors(dag, vertex).isEmpty()) {
250                ret.add(vertex.getName());
251            }
252        }
253        return ret;
254    }
255
256    public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() {
257        return dag;
258    }
259
260    protected enum VertexType {
261        COMPUTATION, STREAM
262    }
263
264    public static class Builder {
265        final Set<ComputationMetadataMapping> metadataSet = new HashSet<>();
266
267        final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>();
268
269        public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) {
270            Map<String, String> map = new HashMap<>(mapping.size());
271            mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1]));
272            ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map);
273            metadataSet.add(meta);
274            suppliersMap.put(meta.name, supplier);
275            return this;
276        }
277
278        public Topology build() {
279            return new Topology(this);
280        }
281    }
282
283    public static class Vertex {
284        protected final String name;
285
286        protected final VertexType type;
287
288        public Vertex(VertexType type, String name) {
289            this.type = type;
290            this.name = name;
291        }
292
293        public String getName() {
294            return name;
295        }
296
297        public VertexType getType() {
298            return type;
299        }
300
301        @Override
302        public boolean equals(Object o) {
303            if (this == o)
304                return true;
305            if (o == null || getClass() != o.getClass())
306                return false;
307
308            Vertex myVertex = (Vertex) o;
309
310            return name.equals(myVertex.name) && type == myVertex.type;
311
312        }
313
314        @Override
315        public String toString() {
316            return "Vertex{" + "name='" + name + '\'' + ", type=" + type + '}';
317        }
318
319        @Override
320        public int hashCode() {
321            int result = name.hashCode();
322            result = 31 * result + type.hashCode();
323            return result;
324        }
325    }
326}