001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.function.Supplier;
028import java.util.stream.Collectors;
029
030import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
031import org.jgrapht.graph.DefaultEdge;
032
033/**
034 * Represent a Directed Acyclic Graph (DAG) of computations.
035 *
036 * @since 9.3
037 */
038public class Topology {
039
040    protected final List<ComputationMetadataMapping> metadataList; // use a list because computation are ordered using
041
042    protected final Map<String, ComputationMetadataMapping> metadataMap = new HashMap<>();
043
044    // dag
045    protected final Map<String, Supplier<Computation>> supplierMap = new HashMap<>();
046
047    protected final DirectedAcyclicGraph<Vertex, DefaultEdge> dag = new DirectedAcyclicGraph<>(DefaultEdge.class);
048
049    protected Topology(Builder builder) {
050        this.supplierMap.putAll(builder.suppliersMap);
051        builder.metadataSet.forEach(meta -> metadataMap.put(meta.name, meta));
052        this.metadataList = new ArrayList<>(builder.metadataSet.size());
053        try {
054            generateDag(builder.metadataSet);
055        } catch (DirectedAcyclicGraph.CycleFoundException e) {
056            throw new IllegalStateException("Cycle found in topology: " + e.getMessage(), e);
057        }
058    }
059
060    public static Builder builder() {
061        return new Builder();
062    }
063
064    /**
065     * A plantuml representation of the topology.
066     */
067    public String toPlantuml() {
068        return toPlantuml(new Settings(0, 0));
069    }
070
071    @SuppressWarnings("SpellCheckingInspection")
072    public String toPlantuml(Settings settings) {
073        StringBuilder ret = new StringBuilder();
074        ret.append("@startuml\n");
075        for (Vertex vertex : dag) {
076            if (VertexType.COMPUTATION.equals(vertex.getType())) {
077                String name = vertex.getName();
078                int concurrency = settings.getConcurrency(vertex.getName());
079                ret.append(String.format("node %s [%s%n----%n%s%nConcurrency: %d threads]%n", getPumlName(vertex), name,
080                        compactPolicy(settings.getPolicy(vertex.getName())), concurrency));
081            } else if (VertexType.STREAM.equals(vertex.getType())) {
082                String name = vertex.getName();
083                int partitions = settings.getPartitions(vertex.getName());
084                ret.append(String.format("queue %s [%s%n----%n%d partitions%ncodec: %s]%n", getPumlName(vertex), name,
085                        partitions, settings.getCodec(name).getName()));
086            }
087        }
088        for (DefaultEdge edge : dag.edgeSet()) {
089            ret.append(
090                    String.format("%s==>%s%n", getPumlName(dag.getEdgeSource(edge)), getPumlName(dag.getEdgeTarget(edge))));
091        }
092        ret.append("@enduml\n");
093        return ret.toString().replace("%n1 partitions", "%n1 partition").replace("%n1 threads", "%n1 thread");
094    }
095
096    protected String compactPolicy(ComputationPolicy policy) {
097        return String.format("Continue on failure: %s%nRetries: %d, %d ms%nBatch: %d, %dms", policy.continueOnFailure(),
098                policy.getRetryPolicy().getMaxRetries(), policy.getRetryPolicy().getDelay().toMillis(), policy.getBatchCapacity(),
099                policy.getBatchThreshold().toMillis());
100    }
101
102    protected String getPumlName(Topology.Vertex vertex) {
103        if (VertexType.COMPUTATION.equals(vertex.getType())) {
104            return getPumlIdentifier(vertex.getName()) + "Comp";
105        }
106        return getPumlIdentifier(vertex.getName());
107    }
108
109    protected String getPumlIdentifier(String name) {
110        return name.replaceAll("[^a-zA-Z]", ".");
111    }
112
113    protected void generateDag(Set<ComputationMetadataMapping> metadataSet)
114            throws DirectedAcyclicGraph.CycleFoundException {
115        for (ComputationMetadata metadata : metadataSet) {
116            Vertex computationVertex = new Vertex(VertexType.COMPUTATION, metadata.name);
117            dag.addVertex(computationVertex);
118            if (metadata.outputStreams != null) {
119                for (String stream : metadata.outputStreams) {
120                    Vertex streamVertex = new Vertex(VertexType.STREAM, stream);
121                    dag.addVertex(streamVertex);
122                    dag.addDagEdge(computationVertex, streamVertex);
123                }
124            }
125            if (metadata.inputStreams() != null) {
126                for (String streamName : metadata.inputStreams()) {
127                    Vertex streamVertex = new Vertex(VertexType.STREAM, streamName);
128                    dag.addVertex(streamVertex);
129                    dag.addDagEdge(streamVertex, computationVertex);
130                }
131            }
132        }
133        generateMetadataMapping(metadataSet);
134    }
135
136    protected void generateMetadataMapping(Set<ComputationMetadataMapping> metadataSet) {
137        for (Vertex vertex : dag) {
138            if (VertexType.COMPUTATION.equals(vertex.getType())) {
139                for (ComputationMetadataMapping metadata : metadataSet) {
140                    if (vertex.getName().equals(metadata.name)) {
141                        metadataList.add(metadata);
142                        break;
143                    }
144                }
145            }
146        }
147    }
148
149    public ComputationMetadataMapping getMetadata(String name) {
150        return metadataMap.get(name);
151    }
152
153    public Supplier<Computation> getSupplier(String name) {
154        return supplierMap.get(name);
155    }
156
157    public boolean isSource(String name) {
158        return getParents(name).isEmpty();
159    }
160
161    public boolean isSink(String name) {
162        return getChildren(name).isEmpty();
163    }
164
165    public Set<String> streamsSet() {
166        Set<String> ret = new HashSet<>();
167        for (ComputationMetadata metadata : this.metadataList) {
168            ret.addAll(metadata.inputStreams());
169            ret.addAll(metadata.outputStreams());
170        }
171        return ret;
172    }
173
174    public Set<String> streamsSet(String root) {
175        Set<String> ret = new HashSet<>();
176        for (String name : getDescendantComputationNames(root)) {
177            ComputationMetadataMapping meta = getMetadata(name);
178            ret.addAll(meta.inputStreams());
179            ret.addAll(meta.outputStreams());
180        }
181        return ret;
182    }
183
184    public List<ComputationMetadataMapping> metadataList() {
185        return metadataList;
186    }
187
188    protected Vertex getVertex(String name) {
189        Vertex ret;
190        if (metadataMap.containsKey(name)) {
191            ret = new Vertex(VertexType.COMPUTATION, name);
192        } else if (streamsSet().contains(name)) {
193            ret = new Vertex(VertexType.STREAM, name);
194        } else {
195            throw new IllegalArgumentException("Unknown vertex name: " + name + " for dag: " + dag);
196        }
197        return ret;
198    }
199
200    public Set<String> getDescendants(String name) {
201        Vertex start = getVertex(name);
202        return dag.getDescendants(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
203    }
204
205    public Set<String> getDescendantComputationNames(String name) {
206        Vertex start = getVertex(name);
207        return dag.getDescendants(dag, start)
208                  .stream()
209                  .filter(vertex -> vertex.type == VertexType.COMPUTATION)
210                  .map(vertex -> vertex.name)
211                  .collect(Collectors.toSet());
212    }
213
214    public Set<String> getChildren(String name) {
215        Vertex start = getVertex(name);
216        return dag.outgoingEdgesOf(start)
217                  .stream()
218                  .map(edge -> dag.getEdgeTarget(edge).getName())
219                  .collect(Collectors.toSet());
220    }
221
222    public Set<String> getChildrenComputationNames(String name) {
223        Vertex start = getVertex(name);
224        Set<String> children = getChildren(name);
225        if (start.type == VertexType.STREAM) {
226            return children;
227        }
228        Set<String> ret = new HashSet<>();
229        children.forEach(child -> ret.addAll(getChildren(child)));
230        return ret;
231    }
232
233    public Set<String> getParents(String name) {
234        Vertex start = getVertex(name);
235        return dag.incomingEdgesOf(start)
236                  .stream()
237                  .map(edge -> dag.getEdgeSource(edge).getName())
238                  .collect(Collectors.toSet());
239    }
240
241    public Set<String> getParentComputationsNames(String name) {
242        Vertex start = getVertex(name);
243        Set<String> parents = getParents(name);
244        if (start.type == VertexType.STREAM) {
245            return parents;
246        }
247        Set<String> ret = new HashSet<>();
248        parents.forEach(parent -> ret.addAll(getParents(parent)));
249        return ret;
250    }
251
252    public Set<String> getAncestorComputationNames(String name) {
253        Set<Vertex> ancestors = dag.getAncestors(dag, new Vertex(VertexType.COMPUTATION, name));
254        return ancestors.stream()
255                        .filter(vertex -> vertex.type == VertexType.COMPUTATION)
256                        .map(vertex -> vertex.name)
257                        .collect(Collectors.toSet());
258    }
259
260    public Set<String> getAncestors(String name) {
261        Vertex start = getVertex(name);
262        return dag.getAncestors(dag, start).stream().map(Vertex::getName).collect(Collectors.toSet());
263    }
264
265    public Set<String> getRoots() {
266        Set<String> ret = new HashSet<>();
267        for (Vertex vertex : dag) {
268            if (dag.getAncestors(dag, vertex).isEmpty()) {
269                ret.add(vertex.getName());
270            }
271        }
272        return ret;
273    }
274
275    public DirectedAcyclicGraph<Vertex, DefaultEdge> getDag() {
276        return dag;
277    }
278
279    public enum VertexType {
280        COMPUTATION, STREAM
281    }
282
283    public static class Builder {
284        final Set<ComputationMetadataMapping> metadataSet = new HashSet<>();
285
286        final Map<String, Supplier<Computation>> suppliersMap = new HashMap<>();
287
288        public Builder addComputation(Supplier<Computation> supplier, List<String> mapping) {
289            Map<String, String> map = new HashMap<>(mapping.size());
290            mapping.stream().filter(m -> m.contains(":")).forEach(m -> map.put(m.split(":")[0], m.split(":")[1]));
291            ComputationMetadataMapping meta = new ComputationMetadataMapping(supplier.get().metadata(), map);
292            metadataSet.add(meta);
293            suppliersMap.put(meta.name, supplier);
294            return this;
295        }
296
297        public Topology build() {
298            return new Topology(this);
299        }
300    }
301
302    public static class Vertex {
303        protected final String name;
304
305        protected final VertexType type;
306
307        public Vertex(VertexType type, String name) {
308            this.type = type;
309            this.name = name;
310        }
311
312        public String getName() {
313            return name;
314        }
315
316        public VertexType getType() {
317            return type;
318        }
319
320        @Override
321        public boolean equals(Object o) {
322            if (this == o)
323                return true;
324            if (o == null || getClass() != o.getClass())
325                return false;
326
327            Vertex myVertex = (Vertex) o;
328
329            return name.equals(myVertex.name) && type == myVertex.type;
330
331        }
332
333        @Override
334        public String toString() {
335            return "Vertex{" + "name='" + name + '\'' + ", type=" + type + '}';
336        }
337
338        @Override
339        public int hashCode() {
340            int result = name.hashCode();
341            result = 31 * result + type.hashCode();
342            return result;
343        }
344    }
345}