001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.platform.routing.core.impl;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedHashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.commons.lang3.StringUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.automation.core.Constants;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.DocumentModelList;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.NuxeoPrincipal;
041import org.nuxeo.ecm.core.event.EventProducer;
042import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
043import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
044import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement;
045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants;
046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
047import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException;
048import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper;
049import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State;
050import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition;
051import org.nuxeo.ecm.platform.task.Task;
052import org.nuxeo.ecm.platform.task.TaskComment;
053import org.nuxeo.ecm.platform.task.TaskEventNames;
054import org.nuxeo.ecm.platform.task.TaskService;
055import org.nuxeo.runtime.api.Framework;
056
057/**
058 * Runs the proper nodes depending on the graph state.
059 *
060 * @since 5.6
061 */
062public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable {
063
064    private static final Log log = LogFactory.getLog(GraphRunner.class);
065
066    /**
067     * Maximum number of steps we do before deciding that this graph is looping.
068     */
069    public static final int MAX_LOOPS = 100;
070
071    @Override
072    public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) {
073        GraphRoute graph = (GraphRoute) element;
074        element.setRunning(session);
075        if (map != null) {
076            graph.setVariables(map);
077        }
078        runGraph(session, element, graph.getStartNode());
079    }
080
081    @Override
082    public void run(CoreSession session, DocumentRouteElement element) {
083        run(session, element, null);
084    }
085
086    @SuppressWarnings("unchecked")
087    @Override
088    public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId,
089            Map<String, Object> varData, String status) {
090        GraphRoute graph = (GraphRoute) element;
091        Task task = null;
092        if (taskId == null) {
093            if (nodeId == null) {
094                throw new DocumentRouteException("nodeId and taskId both missing");
095            }
096        } else {
097            DocumentModel taskDoc = session.getDocument(new IdRef(taskId));
098            task = taskDoc.getAdapter(Task.class);
099            if (task == null) {
100                throw new DocumentRouteException("Invalid taskId: " + taskId);
101            }
102            if (nodeId == null) {
103                nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY);
104                if (StringUtils.isEmpty(nodeId)) {
105                    throw new DocumentRouteException("No nodeId found on task: " + taskId);
106                }
107            }
108        }
109        GraphNode node = graph.getNode(nodeId);
110        if (node == null) {
111            throw new DocumentRouteException("Invalid nodeId: " + nodeId);
112        }
113        boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null
114                && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME));
115
116        if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) {
117            throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node);
118        }
119        if (!forceResume && node.getState() != State.SUSPENDED) {
120            throw new DocumentRouteException("Cannot resume on non-suspended node: " + node);
121        }
122        node.setAllVariables(varData, false);
123        if (StringUtils.isNotEmpty(status)) {
124            node.setButton(status);
125        }
126        if (task != null) {
127            finishTask(session, graph, node, task, false, status);
128            // don't delete (yet)
129            if (task != null) {
130                Map<String, Serializable> eventProperties = new HashMap<>();
131                eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
132                eventProperties.put("taskName", task.getName());
133                eventProperties.put("modelName", graph.getModelName());
134                eventProperties.put("action", status);
135                eventProperties.put("data", (Serializable) varData);
136                eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
137                eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getActingUser());
138                eventProperties.put("nodeVariables", (Serializable) node.getVariables());
139                eventProperties.put("workflowVariables", (Serializable) graph.getVariables());
140
141                // Compute duration of the task itself
142                long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId());
143                if (duration >= 0) {
144                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration);
145                }
146
147                // Then compute duration since workflow started
148                long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
149                if (timeSinceWfStarted >= 0) {
150                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
151                }
152
153                DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
154                envContext.setProperties(eventProperties);
155                EventProducer eventProducer = Framework.getService(EventProducer.class);
156                eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name()));
157            }
158        } else {
159            // cancel any remaing tasks on this node
160            node.cancelTasks();
161        }
162        if (node.hasOpenTasks()) {
163            log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now.");
164            // do nothing, the workflow is resumed only when all the tasks
165            // created from
166            // this node are processed
167            // as this is a multi-task node, reset comment if it was
168            // previously set
169            if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null
170                    && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey(
171                            GraphNode.NODE_VARIABLE_COMMENT)) {
172                node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, "");
173            }
174            return;
175        }
176        runGraph(session, element, node);
177    }
178
179    @Override
180    public void cancel(CoreSession session, DocumentRouteElement element) {
181        GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null;
182
183        Map<String, Serializable> eventProperties = new HashMap<>();
184        if (graph != null) {
185            eventProperties.put("modelId", graph.getModelId());
186            eventProperties.put("modelName", graph.getModelName());
187            eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables());
188            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
189            // Get the list of pending node
190            List<String> pendingNodeNames = new ArrayList<>();
191            for (GraphNode suspendedNode : graph.getSuspendedNodes()) {
192                pendingNodeNames.add(suspendedNode.getId());
193            }
194            eventProperties.put("pendingNodes", (Serializable) pendingNodeNames);
195        }
196        EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name());
197
198        super.cancel(session, element);
199        if (graph == null) {
200            return;
201        }
202        // also cancel tasks
203        // also cancel sub-workflows
204        for (GraphNode node : graph.getNodes()) {
205            node.cancelTasks();
206            node.cancelSubRoute();
207        }
208    }
209
210    /**
211     * Runs the graph starting with the given node.
212     *
213     * @param initialNode the initial node to run
214     */
215    protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode)
216            throws DocumentRouteException {
217        GraphRoute graph = (GraphRoute) element;
218        List<GraphNode> pendingSubRoutes = new LinkedList<>();
219        LinkedList<GraphNode> pendingNodes = new LinkedList<>();
220        pendingNodes.add(initialNode);
221        boolean done = false;
222        int count = 0;
223        while (!pendingNodes.isEmpty()) {
224            GraphNode node = pendingNodes.pop();
225            count++;
226            if (count > MAX_LOOPS) {
227                throw new DocumentRouteException("Execution is looping, node: " + node);
228            }
229            State jump = null;
230            switch (node.getState()) {
231            case READY:
232                log.debug("Doing node " + node);
233                if (node.isMerge()) {
234                    jump = State.WAITING;
235                } else {
236                    jump = State.RUNNING_INPUT;
237                }
238                break;
239            case WAITING:
240                if (node.canMerge()) {
241                    recursiveCancelInput(graph, node, pendingNodes);
242                    jump = State.RUNNING_INPUT;
243                }
244                // else leave state to WAITING
245                break;
246            case RUNNING_INPUT:
247                node.starting();
248                node.executeChain(node.getInputChain());
249                if (node.hasTask() || node.hasMultipleTasks()) {
250                    createTask(session, graph, node); // may create several
251                    node.setState(State.SUSPENDED);
252                }
253                if (node.hasSubRoute()) {
254                    if (!pendingSubRoutes.contains(node)) {
255                        pendingSubRoutes.add(node);
256                    }
257                    node.setState(State.SUSPENDED);
258                }
259                if (node.getState() != State.SUSPENDED) {
260                    jump = State.RUNNING_OUTPUT;
261                }
262                // else this node is suspended,
263                // remove it from queue of nodes to process
264                break;
265            case SUSPENDED:
266                if (node != initialNode) {
267                    throw new DocumentRouteException("Executing unexpected SUSPENDED state");
268                }
269                // actor
270                NuxeoPrincipal principal = session.getPrincipal();
271                String actor = principal.getActingUser();
272                node.setLastActor(actor);
273                // resuming, variables have been set by resumeGraph
274                jump = State.RUNNING_OUTPUT;
275                break;
276            case RUNNING_OUTPUT:
277                node.executeChain(node.getOutputChain());
278                List<Transition> trueTrans = node.evaluateTransitions();
279                node.ending();
280                node.setState(State.READY);
281                if (node.isStop()) {
282                    if (!pendingNodes.isEmpty()) {
283                        throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s",
284                                graph, pendingNodes));
285                    }
286                    done = true;
287                } else {
288                    if (trueTrans.isEmpty()) {
289                        throw new DocumentRouteException("No transition evaluated to true from node " + node);
290                    }
291                    for (Transition t : trueTrans) {
292                        node.executeTransitionChain(t);
293                        GraphNode target = graph.getNode(t.target);
294                        if (!pendingNodes.contains(target)) {
295                            pendingNodes.add(target);
296                        }
297                    }
298                }
299                break;
300            }
301            if (jump != null) {
302                node.setState(jump);
303                // loop again on this node
304                count--;
305                pendingNodes.addFirst(node);
306            }
307        }
308        if (done) {
309            element.setDone(session);
310            /*
311             * Resume the parent route if this is a sub-route.
312             */
313            if (graph.hasParentRoute()) {
314                graph.resumeParentRoute(session);
315            }
316        }
317        /*
318         * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent
319         * node (above code).
320         */
321        for (GraphNode node : pendingSubRoutes) {
322            DocumentRoute subRoute = node.startSubRoute();
323        }
324        session.save();
325    }
326
327    protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) {
328        LinkedList<GraphNode> todo = new LinkedList<>();
329        todo.add(originalNode);
330        Set<String> done = new HashSet<>();
331        while (!todo.isEmpty()) {
332            GraphNode node = todo.pop();
333            done.add(node.getId());
334            for (Transition t : node.getInputTransitions()) {
335                if (t.loop) {
336                    // don't recurse through loop transitions
337                    continue;
338                }
339                GraphNode source = t.source;
340                if (done.contains(source.getId())) {
341                    // looping somewhere TODO check it's not happening
342                    continue;
343                }
344                source.setCanceled();
345                State state = source.getState();
346                source.setState(State.READY);
347                pendingNodes.remove(node);
348                if (state == State.SUSPENDED) {
349                    // we're suspended on a task, cancel it and stop recursion
350                    source.cancelTasks();
351                } else {
352                    // else recurse
353                    todo.add(source);
354                }
355            }
356        }
357    }
358
359    protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException {
360        DocumentRouteElement routeInstance = graph;
361        Map<String, String> taskVariables = new HashMap<>();
362        taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY,
363                routeInstance.getDocument().getId());
364        taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId());
365        taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId());
366        String taskNotiftemplate = node.getTaskNotificationTemplate();
367        if (!StringUtils.isEmpty(taskNotiftemplate)) {
368            taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate);
369        } else {
370            // disable notification service
371            taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true");
372        }
373        // evaluate task assignees from taskVar if any
374        HashSet<String> actors = new LinkedHashSet<>();
375        actors.addAll(node.evaluateTaskAssignees());
376        actors.addAll(node.getTaskAssignees());
377        // evaluate taskDueDate from the taskDueDateExpr;
378        Date dueDate = node.computeTaskDueDate();
379        DocumentModelList docs = graph.getAttachedDocumentModels();
380        TaskService taskService = Framework.getService(TaskService.class);
381        DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class);
382        // TODO documents other than the first are not attached to the task
383        // (task API allows only one document)
384        // we may get several tasks if there's one per actor when the node
385        // has the property
386        // hasMultipleTasks set to true
387        List<Task> tasks = taskService.createTask(session, session.getPrincipal(), docs,
388                                                  node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(),
389                                                  new ArrayList<>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate,
390                                                  taskVariables, null, node.getWorkflowContextualInfo(session, true));
391
392        // Audit task assignment
393        for (Task task : tasks) {
394            Map<String, Serializable> eventProperties = new HashMap<>();
395            eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
396            eventProperties.put("taskName", node.getDocument().getTitle());
397            eventProperties.put("actors", actors);
398            eventProperties.put("modelId", graph.getModelId());
399            eventProperties.put("modelName", graph.getModelName());
400            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
401            eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getOriginatingUser());
402            eventProperties.put("nodeVariables", (Serializable) node.getVariables());
403            if (routeInstance instanceof GraphRoute) {
404                eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables());
405            }
406
407            // compute duration since workflow started
408            long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
409            if (timeSinceWfStarted >= 0) {
410                eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
411            }
412
413            DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
414            envContext.setProperties(eventProperties);
415            EventProducer eventProducer = Framework.getService(EventProducer.class);
416            eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name()));
417        }
418
419        // routing.makeRoutingTasks(session, tasks);
420        for (Task task : tasks) {
421            node.addTaskInfo(task.getId());
422        }
423        String taskAssigneesPermission = node.getTaskAssigneesPermission();
424        if (StringUtils.isEmpty(taskAssigneesPermission)) {
425            return;
426        }
427        for (Task task : tasks) {
428            routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task);
429        }
430    }
431
432    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete)
433            throws DocumentRouteException {
434        finishTask(session, graph, node, task, delete, null);
435    }
436
437    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete,
438            String status) throws DocumentRouteException {
439        DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class);
440        DocumentModelList docs = graph.getAttachedDocumentModels();
441        routing.removePermissionsForTaskActors(session, docs, task);
442        // delete task
443        if (delete) {
444            session.removeDocument(new IdRef(task.getId()));
445        }
446        // get the last comment on the task, if there are several:
447        // task might have been previously reassigned or delegated
448        List<TaskComment> comments = task.getComments();
449        String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : "";
450        // actor
451        NuxeoPrincipal principal = session.getPrincipal();
452        String actor = principal.getActingUser();
453        node.updateTaskInfo(task.getId(), true, status, actor, comment);
454    }
455
456}