001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation;
020
021import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
022import org.jgrapht.graph.DefaultEdge;
023
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.function.Supplier;
031import java.util.stream.Collectors;
032
033
034/**
035 * @since 9.2
036 */
037public class Topology {
038
039    private enum VertexType {
040        COMPUTATION, STREAM
041    }
042
043    private final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using dag
044    private final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>();
045    private final Map<String, Supplier<Computation>> supplierMap = new HashMap<>();
046    private final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class);
047
048
049    private Topology(Builder builder) {
050        this.supplierMap.putAll(builder.suppliersMap);
051        builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta));
052        this.metadataList = new ArrayList<>(builder.metadataSet.size());
053        try {
054            generateDag(builder.metadataSet);
055        } catch (DirectedAcyclicGraph.CycleFoundException e) {
056            throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e);
057        }
058    }
059
060    /**
061     * A plantuml representation of the topology.
062     */
063    public String toPlantuml() {
064        return toPlantuml(new Settings(0, 0));
065    }
066
067    public String toPlantuml(Settings settings) {
068        StringBuilder ret = new StringBuilder();
069        ret.append("@startuml\n");
070        for (Vertex vertex : dag) {
071            if (VertexType.COMPUTATION.equals(vertex.getType())) {
072                ret.append("node " + vertex.getName() + "\n");
073                int concurrency = settings.getConcurrency(vertex.getName());
074                if (concurrency > 0) {
075                    ret.append("note right: x" + concurrency + "\n");
076                }
077            } else if (VertexType.STREAM.equals(vertex.getType())) {
078                ret.append("database " + vertex.getName() + "\n");
079            }
080        }
081        for (DefaultEdge edge : dag.edgeSet()) {
082            ret.append(dag.getEdgeSource(edge).getName() + "==>" + dag.getEdgeTarget(edge).getName() + "\n");
083        }
084        ret.append("@enduml\n");
085        return ret.toString();
086    }
087
088    private void generateDag(Set<ComputationMetadataMapping> metadataSet) throws DirectedAcyclicGraph.CycleFoundException {
089        for (ComputationMetadata metadata : metadataSet) {
090            Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name);
091            dag.addVertex(computationVertex);
092            if (metadata.ostreams != null) {
093                for (String stream : metadata.ostreams) {
094                    Vertex streamVertex = new Vertex(VertexType.STREAM, stream);
095                    dag.addVertex(streamVertex);
096                    dag.addDagEdge(computationVertex, streamVertex);
097                }
098            }
099            if (metadata.istreams != null) {
100                for (String streamName : metadata.istreams) {
101                    Vertex streamVertex = new Vertex(VertexType.STREAM, streamName);
102                    dag.addVertex(streamVertex);
103                    dag.addDagEdge(streamVertex, computationVertex);
104                }
105            }
106        }
107        for (Vertex vertex : dag) {
108            if (VertexType.COMPUTATION.equals(vertex.getType())) {
109                for (ComputationMetadataMapping metadata : metadataSet) {
110                    if (vertex.getName().equals(metadata.name)) {
111                        metadataList.add(metadata);
112                        break;
113                    }
114                }
115            }
116        }
117    }
118
119    public ComputationMetadataMapping getMetadata(String name) {
120        return metadataMap.get(name);
121    }
122
123    public Supplier<Computation> getSupplier(String name) {
124        return supplierMap.get(name);
125    }
126
127
128    public boolean isSource(String name) {
129        return getParents(name).isEmpty();
130    }
131
132    public boolean isSink(String name) {
133        return getChildren(name).isEmpty();
134    }
135
136    public Set<String> streamsSet() {
137        Set<String> ret = new HashSet<>();
138        for (ComputationMetadata metadata : this.metadataList) {
139            ret.addAll(metadata.istreams);
140            ret.addAll(metadata.ostreams);
141        }
142        return ret;
143    }
144
145    public Set<String> streamsSet(String root) {
146        Set<String> ret = new HashSet<>();
147        for (String name : getDescendantComputationNames(root)) {
148            ComputationMetadataMapping meta = getMetadata(name);
149            ret.addAll(meta.istreams);
150            ret.addAll(meta.ostreams);
151        }
152        return ret;
153    }
154
155
156    public List<ComputationMetadataMapping> metadataList() {
157        return metadataList;
158    }
159
160    public static Builder builder() {
161        return new Builder();
162    }
163
164    private Vertex getVertex(String name) {
165        Vertex ret;
166        if (metadataMap.containsKey(name)) {
167            ret = new Vertex(VertexType.COMPUTATION, name);
168        } else if (streamsSet().contains(name)) {
169            ret = new Vertex(VertexType.STREAM, name);
170        } else {
171            throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag);
172        }
173        return ret;
174    }
175
176    public Set<String> getDescendants(String name) {
177        Vertex start = getVertex(name);
178        return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
179    }
180
181    public Set<String> getDescendantComputationNames(String name) {
182        Vertex start = getVertex(name);
183        return dag.getDescendants(dag, start).stream().filter(vertex -> vertex.type == VertexType.COMPUTATION).map(vertex -> vertex.name).collect(Collectors.toSet());
184    }
185
186    public Set<String> getChildren(String name) {
187        Vertex start = getVertex(name);
188        return dag.outgoingEdgesOf(start).stream().map(edge -> dag.getEdgeTarget(edge).getName()).collect(Collectors.toSet());
189    }
190
191    public Set<String> getChildrenComputationNames(String name) {
192        Vertex start = getVertex(name);
193        Set<String> children = getChildren(name);
194        if (start.type == VertexType.STREAM) {
195            return children;
196        }
197        Set<String> ret = new HashSet<>();
198        children.forEach(child -> ret.addAll(getChildren(child)));
199        return ret;
200    }
201
202    public Set<String> getParents(String name) {
203        Vertex start = getVertex(name);
204        return dag.incomingEdgesOf(start).stream().map(edge -> dag.getEdgeSource(edge).getName()).collect(Collectors.toSet());
205    }
206
207    public Set<String> getParentComputationsNames(String name) {
208        Vertex start = getVertex(name);
209        Set<String> parents = getParents(name);
210        if (start.type == VertexType.STREAM) {
211            return parents;
212        }
213        Set<String> ret = new HashSet<>();
214        parents.forEach(parent -> ret.addAll(getParents(parent)));
215        return ret;
216    }
217
218    public Set<String> getAncestorComputationNames(String name) {
219        Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name));
220        return ancestors.stream().filter(vertex -> vertex.type == VertexType.COMPUTATION).map(vertex -> vertex.name).collect(Collectors.toSet());
221    }
222
223    public Set<String> getAncestors(String name) {
224        Vertex start = getVertex(name);
225        return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
226    }
227
228    public Set<String> getRoots() {
229        Set<String> ret = new HashSet<>();
230        for (Vertex vertex : dag) {
231            if (dag.getAncestors(dag, vertex).isEmpty()) {
232                ret.add(vertex.getName());
233            }
234        }
235        return ret;
236    }
237
238    public static class Builder {
239        final Set<ComputationMetadataMapping> metadataSet = new HashSet<>();
240        final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>();
241
242        public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) {
243            Map<String, String> map = new HashMap<>(mapping.size());
244            mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1]));
245            ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map);
246            metadataSet.add(meta);
247            suppliersMap.put(meta.name, supplier);
248            return this;
249        }
250
251        public Topology build() {
252            return new Topology(this);
253        }
254    }
255
256    public class Vertex {
257        private final String name;
258        private final VertexType type;
259
260        public Vertex(VertexType type, String name) {
261            this.type = type;
262            this.name = name;
263        }
264
265        public String getName() {
266            return name;
267        }
268
269        public VertexType getType() {
270            return type;
271        }
272
273        @Override
274        public boolean equals(Object o) {
275            if (this == o) return true;
276            if (o == null || getClass() != o.getClass()) return false;
277
278            Vertex myVertex = (Vertex) o;
279
280            if (!name.equals(myVertex.name)) return false;
281            return type == myVertex.type;
282
283        }
284
285        @Override
286        public String toString() {
287            return "Vertex{" +
288                    "name='" + name + '\'' +
289                    ", type=" + type +
290                    '}';
291        }
292
293        @Override
294        public int hashCode() {
295            int result = name.hashCode();
296            result = 31 * result + type.hashCode();
297            return result;
298        }
299    }
300
301    public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() {
302        return dag;
303    }
304}
305