001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.platform.routing.core.impl;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedHashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.commons.lang3.StringUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.automation.core.Constants;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.DocumentModelList;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.NuxeoPrincipal;
041import org.nuxeo.ecm.core.event.EventProducer;
042import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
043import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
044import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement;
045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants;
046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
047import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException;
048import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper;
049import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State;
050import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition;
051import org.nuxeo.ecm.platform.task.Task;
052import org.nuxeo.ecm.platform.task.TaskComment;
053import org.nuxeo.ecm.platform.task.TaskEventNames;
054import org.nuxeo.ecm.platform.task.TaskService;
055import org.nuxeo.runtime.api.Framework;
056
057/**
058 * Runs the proper nodes depending on the graph state.
059 *
060 * @since 5.6
061 */
062public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable {
063
064    private static final long serialVersionUID = 1L;
065
066    private static final Log log = LogFactory.getLog(GraphRunner.class);
067
068    /**
069     * Maximum number of steps we do before deciding that this graph is looping.
070     */
071    public static final int MAX_LOOPS = 100;
072
073    @Override
074    public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) {
075        GraphRoute graph = (GraphRoute) element;
076        element.setRunning(session);
077        if (map != null) {
078            graph.setVariables(map);
079        }
080        runGraph(session, element, graph.getStartNode());
081    }
082
083    @Override
084    public void run(CoreSession session, DocumentRouteElement element) {
085        run(session, element, null);
086    }
087
088    @SuppressWarnings("unchecked")
089    @Override
090    public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId,
091            Map<String, Object> varData, String status) {
092        GraphRoute graph = (GraphRoute) element;
093        Task task = null;
094        if (taskId == null) {
095            if (nodeId == null) {
096                throw new DocumentRouteException("nodeId and taskId both missing");
097            }
098        } else {
099            DocumentModel taskDoc = session.getDocument(new IdRef(taskId));
100            task = taskDoc.getAdapter(Task.class);
101            if (task == null) {
102                throw new DocumentRouteException("Invalid taskId: " + taskId);
103            }
104            if (nodeId == null) {
105                nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY);
106                if (StringUtils.isEmpty(nodeId)) {
107                    throw new DocumentRouteException("No nodeId found on task: " + taskId);
108                }
109            }
110        }
111        GraphNode node = graph.getNode(nodeId);
112        if (node == null) {
113            throw new DocumentRouteException("Invalid nodeId: " + nodeId);
114        }
115        boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null
116                && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME));
117
118        if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) {
119            throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node);
120        }
121        if (!forceResume && node.getState() != State.SUSPENDED) {
122            throw new DocumentRouteException("Cannot resume on non-suspended node: " + node);
123        }
124        node.setAllVariables(varData, false);
125        if (StringUtils.isNotEmpty(status)) {
126            node.setButton(status);
127        }
128        if (task != null) {
129            finishTask(session, graph, node, task, false, status);
130            // don't delete (yet)
131            if (task != null) {
132                Map<String, Serializable> eventProperties = new HashMap<>();
133                eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
134                eventProperties.put("taskName", task.getName());
135                eventProperties.put("modelName", graph.getModelName());
136                eventProperties.put("action", status);
137                eventProperties.put("data", (Serializable) varData);
138                eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
139                eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getActingUser());
140                eventProperties.put("nodeVariables", (Serializable) node.getVariables());
141                eventProperties.put("workflowVariables", (Serializable) graph.getVariables());
142
143                // Compute duration of the task itself
144                long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId());
145                if (duration >= 0) {
146                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration);
147                }
148
149                // Then compute duration since workflow started
150                long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
151                if (timeSinceWfStarted >= 0) {
152                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
153                }
154
155                DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
156                envContext.setProperties(eventProperties);
157                EventProducer eventProducer = Framework.getService(EventProducer.class);
158                eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name()));
159            }
160        } else {
161            // cancel any remaing tasks on this node
162            node.cancelTasks();
163        }
164        if (node.hasOpenTasks()) {
165            log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now.");
166            // do nothing, the workflow is resumed only when all the tasks
167            // created from
168            // this node are processed
169            // as this is a multi-task node, reset comment if it was
170            // previously set
171            if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null
172                    && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey(
173                            GraphNode.NODE_VARIABLE_COMMENT)) {
174                node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, "");
175            }
176            return;
177        }
178        runGraph(session, element, node);
179    }
180
181    @Override
182    public void cancel(CoreSession session, DocumentRouteElement element) {
183        GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null;
184
185        Map<String, Serializable> eventProperties = new HashMap<>();
186        if (graph != null) {
187            eventProperties.put("modelId", graph.getModelId());
188            eventProperties.put("modelName", graph.getModelName());
189            eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables());
190            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
191            // Get the list of pending node
192            List<String> pendingNodeNames = new ArrayList<>();
193            for (GraphNode suspendedNode : graph.getSuspendedNodes()) {
194                pendingNodeNames.add(suspendedNode.getId());
195            }
196            eventProperties.put("pendingNodes", (Serializable) pendingNodeNames);
197        }
198        EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name());
199
200        super.cancel(session, element);
201        if (graph == null) {
202            return;
203        }
204        // also cancel tasks
205        // also cancel sub-workflows
206        for (GraphNode node : graph.getNodes()) {
207            node.cancelTasks();
208            node.cancelSubRoute();
209        }
210    }
211
212    /**
213     * Runs the graph starting with the given node.
214     *
215     * @param initialNode the initial node to run
216     */
217    protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode)
218            throws DocumentRouteException {
219        GraphRoute graph = (GraphRoute) element;
220        List<GraphNode> pendingSubRoutes = new LinkedList<>();
221        LinkedList<GraphNode> pendingNodes = new LinkedList<>();
222        pendingNodes.add(initialNode);
223        boolean done = false;
224        int count = 0;
225        while (!pendingNodes.isEmpty()) {
226            GraphNode node = pendingNodes.pop();
227            count++;
228            if (count > MAX_LOOPS) {
229                throw new DocumentRouteException("Execution is looping, node: " + node);
230            }
231            State jump = null;
232            switch (node.getState()) {
233            case READY:
234                log.debug("Doing node " + node);
235                if (node.isMerge()) {
236                    jump = State.WAITING;
237                } else {
238                    jump = State.RUNNING_INPUT;
239                }
240                break;
241            case WAITING:
242                if (node.canMerge()) {
243                    recursiveCancelInput(graph, node, pendingNodes);
244                    jump = State.RUNNING_INPUT;
245                }
246                // else leave state to WAITING
247                break;
248            case RUNNING_INPUT:
249                node.starting();
250                node.executeChain(node.getInputChain());
251                if (node.hasTask() || node.hasMultipleTasks()) {
252                    createTask(session, graph, node); // may create several
253                    node.setState(State.SUSPENDED);
254                }
255                if (node.hasSubRoute()) {
256                    if (!pendingSubRoutes.contains(node)) {
257                        pendingSubRoutes.add(node);
258                    }
259                    node.setState(State.SUSPENDED);
260                }
261                if (node.getState() != State.SUSPENDED) {
262                    jump = State.RUNNING_OUTPUT;
263                }
264                // else this node is suspended,
265                // remove it from queue of nodes to process
266                break;
267            case SUSPENDED:
268                if (node != initialNode) {
269                    throw new DocumentRouteException("Executing unexpected SUSPENDED state");
270                }
271                // actor
272                NuxeoPrincipal principal = session.getPrincipal();
273                String actor = principal.getActingUser();
274                node.setLastActor(actor);
275                // resuming, variables have been set by resumeGraph
276                jump = State.RUNNING_OUTPUT;
277                break;
278            case RUNNING_OUTPUT:
279                node.executeChain(node.getOutputChain());
280                List<Transition> trueTrans = node.evaluateTransitions();
281                node.ending();
282                node.setState(State.READY);
283                if (node.isStop()) {
284                    if (!pendingNodes.isEmpty()) {
285                        throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s",
286                                graph, pendingNodes));
287                    }
288                    done = true;
289                } else {
290                    if (trueTrans.isEmpty()) {
291                        throw new DocumentRouteException("No transition evaluated to true from node " + node);
292                    }
293                    for (Transition t : trueTrans) {
294                        node.executeTransitionChain(t);
295                        GraphNode target = graph.getNode(t.target);
296                        if (!pendingNodes.contains(target)) {
297                            pendingNodes.add(target);
298                        }
299                    }
300                }
301                break;
302            }
303            if (jump != null) {
304                node.setState(jump);
305                // loop again on this node
306                count--;
307                pendingNodes.addFirst(node);
308            }
309        }
310        if (done) {
311            element.setDone(session);
312            /*
313             * Resume the parent route if this is a sub-route.
314             */
315            if (graph.hasParentRoute()) {
316                graph.resumeParentRoute(session);
317            }
318        }
319        /*
320         * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent
321         * node (above code).
322         */
323        for (GraphNode node : pendingSubRoutes) {
324            node.startSubRoute();
325        }
326        session.save();
327    }
328
329    protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) {
330        LinkedList<GraphNode> todo = new LinkedList<>();
331        todo.add(originalNode);
332        Set<String> done = new HashSet<>();
333        while (!todo.isEmpty()) {
334            GraphNode node = todo.pop();
335            done.add(node.getId());
336            for (Transition t : node.getInputTransitions()) {
337                if (t.loop) {
338                    // don't recurse through loop transitions
339                    continue;
340                }
341                GraphNode source = t.source;
342                if (done.contains(source.getId())) {
343                    // looping somewhere TODO check it's not happening
344                    continue;
345                }
346                source.setCanceled();
347                State state = source.getState();
348                source.setState(State.READY);
349                pendingNodes.remove(node);
350                if (state == State.SUSPENDED) {
351                    // we're suspended on a task, cancel it and stop recursion
352                    source.cancelTasks();
353                } else {
354                    // else recurse
355                    todo.add(source);
356                }
357            }
358        }
359    }
360
361    protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException {
362        DocumentRouteElement routeInstance = graph;
363        Map<String, String> taskVariables = new HashMap<>();
364        taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY,
365                routeInstance.getDocument().getId());
366        taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId());
367        taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId());
368        String taskNotiftemplate = node.getTaskNotificationTemplate();
369        if (!StringUtils.isEmpty(taskNotiftemplate)) {
370            taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate);
371        } else {
372            // disable notification service
373            taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true");
374        }
375        // evaluate task assignees from taskVar if any
376        HashSet<String> actors = new LinkedHashSet<>();
377        actors.addAll(node.evaluateTaskAssignees());
378        actors.addAll(node.getTaskAssignees());
379        // evaluate taskDueDate from the taskDueDateExpr;
380        Date dueDate = node.computeTaskDueDate();
381        DocumentModelList docs = graph.getAttachedDocumentModels();
382        TaskService taskService = Framework.getService(TaskService.class);
383        DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class);
384        // TODO documents other than the first are not attached to the task
385        // (task API allows only one document)
386        // we may get several tasks if there's one per actor when the node
387        // has the property
388        // hasMultipleTasks set to true
389        List<Task> tasks = taskService.createTask(session, session.getPrincipal(), docs,
390                                                  node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(),
391                                                  new ArrayList<>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate,
392                                                  taskVariables, null, node.getWorkflowContextualInfo(session, true));
393
394        // Audit task assignment
395        for (Task task : tasks) {
396            Map<String, Serializable> eventProperties = new HashMap<>();
397            eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
398            eventProperties.put("taskName", node.getDocument().getTitle());
399            eventProperties.put("actors", actors);
400            eventProperties.put("modelId", graph.getModelId());
401            eventProperties.put("modelName", graph.getModelName());
402            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
403            eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getOriginatingUser());
404            eventProperties.put("nodeVariables", (Serializable) node.getVariables());
405            if (routeInstance instanceof GraphRoute) {
406                eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables());
407            }
408
409            // compute duration since workflow started
410            long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
411            if (timeSinceWfStarted >= 0) {
412                eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
413            }
414
415            DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
416            envContext.setProperties(eventProperties);
417            EventProducer eventProducer = Framework.getService(EventProducer.class);
418            eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name()));
419        }
420
421        // routing.makeRoutingTasks(session, tasks);
422        for (Task task : tasks) {
423            node.addTaskInfo(task.getId());
424        }
425        String taskAssigneesPermission = node.getTaskAssigneesPermission();
426        if (StringUtils.isEmpty(taskAssigneesPermission)) {
427            return;
428        }
429        for (Task task : tasks) {
430            routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task);
431        }
432    }
433
434    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete)
435            throws DocumentRouteException {
436        finishTask(session, graph, node, task, delete, null);
437    }
438
439    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete,
440            String status) throws DocumentRouteException {
441        DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class);
442        DocumentModelList docs = graph.getAttachedDocumentModels();
443        routing.removePermissionsForTaskActors(session, docs, task);
444        // delete task
445        if (delete) {
446            session.removeDocument(new IdRef(task.getId()));
447        }
448        // get the last comment on the task, if there are several:
449        // task might have been previously reassigned or delegated
450        List<TaskComment> comments = task.getComments();
451        String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : "";
452        // actor
453        NuxeoPrincipal principal = session.getPrincipal();
454        String actor = principal.getActingUser();
455        node.updateTaskInfo(task.getId(), true, status, actor, comment);
456    }
457
458}