001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.function.Supplier;
028import java.util.stream.Collectors;
029
030import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
031import org.jgrapht.graph.DefaultEdge;
032
033/**
034 * Represent a Directed Acyclic Graph (DAG) of computations.
035 *
036 * @since 9.3
037 */
038public class Topology {
039
040    protected final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using
041
042    protected final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>();
043
044    // dag
045    protected final Map<String, Supplier<Computation>> supplierMap = new HashMap<>();
046
047    protected final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class);
048
049    protected Topology(Builder builder) {
050        this.supplierMap.putAll(builder.suppliersMap);
051        builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta));
052        this.metadataList = new ArrayList<>(builder.metadataSet.size());
053        try {
054            generateDag(builder.metadataSet);
055        } catch (DirectedAcyclicGraph.CycleFoundException e) {
056            throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e);
057        }
058    }
059
060    public static Builder builder() {
061        return new Builder();
062    }
063
064    /**
065     * A plantuml representation of the topology.
066     */
067    public String toPlantuml() {
068        return toPlantuml(new Settings(0, 0));
069    }
070
071    @SuppressWarnings("SpellCheckingInspection")
072    public String toPlantuml(Settings settings) {
073        StringBuilder ret = new StringBuilder();
074        ret.append("@startuml\n");
075        for (Vertex vertex : dag) {
076            if (VertexType.COMPUTATION.equals(vertex.getType())) {
077                String name = vertex.getName();
078                int concurrency = settings.getConcurrency(vertex.getName());
079                ret.append(String.format("node %s [%s\n----\n%d threads]\n", name, name, concurrency));
080            } else if (VertexType.STREAM.equals(vertex.getType())) {
081                String name = vertex.getName();
082                int partitions = settings.getPartitions(vertex.getName());
083                ret.append(String.format("queue %s [%s\n----\n%d partitions]\n", name, name, partitions));
084            }
085        }
086        for (DefaultEdge edge : dag.edgeSet()) {
087            ret.append(
088                    String.format("%s==>%s\n", dag.getEdgeSource(edge).getName(), dag.getEdgeTarget(edge).getName()));
089        }
090        ret.append("@enduml\n");
091        return ret.toString().replace("\n1 partitions", "\n1 partition").replace("\n1 threads", "\n1 thread");
092    }
093
094    protected void generateDag(Set<ComputationMetadataMapping> metadataSet)
095            throws DirectedAcyclicGraph.CycleFoundException {
096        for (ComputationMetadata metadata : metadataSet) {
097            Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name);
098            dag.addVertex(computationVertex);
099            if (metadata.outputStreams != null) {
100                for (String stream : metadata.outputStreams) {
101                    Vertex streamVertex = new Vertex(VertexType.STREAM, stream);
102                    dag.addVertex(streamVertex);
103                    dag.addDagEdge(computationVertex, streamVertex);
104                }
105            }
106            if (metadata.inputStreams() != null) {
107                for (String streamName : metadata.inputStreams()) {
108                    Vertex streamVertex = new Vertex(VertexType.STREAM, streamName);
109                    dag.addVertex(streamVertex);
110                    dag.addDagEdge(streamVertex, computationVertex);
111                }
112            }
113        }
114        for (Vertex vertex : dag) {
115            if (VertexType.COMPUTATION.equals(vertex.getType())) {
116                for (ComputationMetadataMapping metadata : metadataSet) {
117                    if (vertex.getName().equals(metadata.name)) {
118                        metadataList.add(metadata);
119                        break;
120                    }
121                }
122            }
123        }
124    }
125
126    public ComputationMetadataMapping getMetadata(String name) {
127        return metadataMap.get(name);
128    }
129
130    public Supplier<Computation> getSupplier(String name) {
131        return supplierMap.get(name);
132    }
133
134    public boolean isSource(String name) {
135        return getParents(name).isEmpty();
136    }
137
138    public boolean isSink(String name) {
139        return getChildren(name).isEmpty();
140    }
141
142    public Set<String> streamsSet() {
143        Set<String> ret = new HashSet<>();
144        for (ComputationMetadata metadata : this.metadataList) {
145            ret.addAll(metadata.inputStreams());
146            ret.addAll(metadata.outputStreams());
147        }
148        return ret;
149    }
150
151    public Set<String> streamsSet(String root) {
152        Set<String> ret = new HashSet<>();
153        for (String name : getDescendantComputationNames(root)) {
154            ComputationMetadataMapping meta = getMetadata(name);
155            ret.addAll(meta.inputStreams());
156            ret.addAll(meta.outputStreams());
157        }
158        return ret;
159    }
160
161    public List<ComputationMetadataMapping> metadataList() {
162        return metadataList;
163    }
164
165    protected Vertex getVertex(String name) {
166        Vertex ret;
167        if (metadataMap.containsKey(name)) {
168            ret = new Vertex(VertexType.COMPUTATION, name);
169        } else if (streamsSet().contains(name)) {
170            ret = new Vertex(VertexType.STREAM, name);
171        } else {
172            throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag);
173        }
174        return ret;
175    }
176
177    public Set<String> getDescendants(String name) {
178        Vertex start = getVertex(name);
179        return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
180    }
181
182    public Set<String> getDescendantComputationNames(String name) {
183        Vertex start = getVertex(name);
184        return dag.getDescendants(dag, start)
185                  .stream()
186                  .filter(vertex -> vertex.type == VertexType.COMPUTATION)
187                  .map(vertex -> vertex.name)
188                  .collect(Collectors.toSet());
189    }
190
191    public Set<String> getChildren(String name) {
192        Vertex start = getVertex(name);
193        return dag.outgoingEdgesOf(start).stream().map(edge -> dag.getEdgeTarget(edge).getName()).collect(
194                Collectors.toSet());
195    }
196
197    public Set<String> getChildrenComputationNames(String name) {
198        Vertex start = getVertex(name);
199        Set<String> children = getChildren(name);
200        if (start.type == VertexType.STREAM) {
201            return children;
202        }
203        Set<String> ret = new HashSet<>();
204        children.forEach(child -> ret.addAll(getChildren(child)));
205        return ret;
206    }
207
208    public Set<String> getParents(String name) {
209        Vertex start = getVertex(name);
210        return dag.incomingEdgesOf(start).stream().map(edge -> dag.getEdgeSource(edge).getName()).collect(
211                Collectors.toSet());
212    }
213
214    public Set<String> getParentComputationsNames(String name) {
215        Vertex start = getVertex(name);
216        Set<String> parents = getParents(name);
217        if (start.type == VertexType.STREAM) {
218            return parents;
219        }
220        Set<String> ret = new HashSet<>();
221        parents.forEach(parent -> ret.addAll(getParents(parent)));
222        return ret;
223    }
224
225    public Set<String> getAncestorComputationNames(String name) {
226        Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name));
227        return ancestors.stream()
228                        .filter(vertex -> vertex.type == VertexType.COMPUTATION)
229                        .map(vertex -> vertex.name)
230                        .collect(Collectors.toSet());
231    }
232
233    public Set<String> getAncestors(String name) {
234        Vertex start = getVertex(name);
235        return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
236    }
237
238    public Set<String> getRoots() {
239        Set<String> ret = new HashSet<>();
240        for (Vertex vertex : dag) {
241            if (dag.getAncestors(dag, vertex).isEmpty()) {
242                ret.add(vertex.getName());
243            }
244        }
245        return ret;
246    }
247
248    public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() {
249        return dag;
250    }
251
252    protected enum VertexType {
253        COMPUTATION, STREAM
254    }
255
256    public static class Builder {
257        final Set<ComputationMetadataMapping> metadataSet = new HashSet<>();
258
259        final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>();
260
261        public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) {
262            Map<String, String> map = new HashMap<>(mapping.size());
263            mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1]));
264            ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map);
265            metadataSet.add(meta);
266            suppliersMap.put(meta.name, supplier);
267            return this;
268        }
269
270        public Topology build() {
271            return new Topology(this);
272        }
273    }
274
275    public static class Vertex {
276        protected final String name;
277
278        protected final VertexType type;
279
280        public Vertex(VertexType type, String name) {
281            this.type = type;
282            this.name = name;
283        }
284
285        public String getName() {
286            return name;
287        }
288
289        public VertexType getType() {
290            return type;
291        }
292
293        @Override
294        public boolean equals(Object o) {
295            if (this == o)
296                return true;
297            if (o == null || getClass() != o.getClass())
298                return false;
299
300            Vertex myVertex = (Vertex) o;
301
302            return name.equals(myVertex.name) && type == myVertex.type;
303
304        }
305
306        @Override
307        public String toString() {
308            return "Vertex{" + "name='" + name + '\'' + ", type=" + type + '}';
309        }
310
311        @Override
312        public int hashCode() {
313            int result = name.hashCode();
314            result = 31 * result + type.hashCode();
315            return result;
316        }
317    }
318}