001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.platform.routing.core.impl;
020
021import java.io.Serializable;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.LinkedHashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.commons.lang.StringUtils;
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.nuxeo.ecm.automation.core.Constants;
036import org.nuxeo.ecm.core.api.CoreSession;
037import org.nuxeo.ecm.core.api.DocumentModel;
038import org.nuxeo.ecm.core.api.DocumentModelList;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.NuxeoPrincipal;
041import org.nuxeo.ecm.core.event.EventProducer;
042import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
043import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
044import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement;
045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants;
046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
047import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException;
048import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper;
049import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State;
050import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition;
051import org.nuxeo.ecm.platform.task.Task;
052import org.nuxeo.ecm.platform.task.TaskComment;
053import org.nuxeo.ecm.platform.task.TaskEventNames;
054import org.nuxeo.ecm.platform.task.TaskService;
055import org.nuxeo.runtime.api.Framework;
056
057/**
058 * Runs the proper nodes depending on the graph state.
059 *
060 * @since 5.6
061 */
062public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable {
063
064    private static final Log log = LogFactory.getLog(GraphRunner.class);
065
066    /**
067     * Maximum number of steps we do before deciding that this graph is looping.
068     */
069    public static final int MAX_LOOPS = 100;
070
071    @Override
072    public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) {
073        GraphRoute graph = (GraphRoute) element;
074        element.setRunning(session);
075        if (map != null) {
076            graph.setVariables(map);
077        }
078        runGraph(session, element, graph.getStartNode());
079    }
080
081    @Override
082    public void run(CoreSession session, DocumentRouteElement element) {
083        run(session, element, null);
084    }
085
086    @SuppressWarnings("unchecked")
087    @Override
088    public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId,
089            Map<String, Object> varData, String status) {
090        GraphRoute graph = (GraphRoute) element;
091        Task task = null;
092        if (taskId == null) {
093            if (nodeId == null) {
094                throw new DocumentRouteException("nodeId and taskId both missing");
095            }
096        } else {
097            DocumentModel taskDoc = session.getDocument(new IdRef(taskId));
098            task = taskDoc.getAdapter(Task.class);
099            if (task == null) {
100                throw new DocumentRouteException("Invalid taskId: " + taskId);
101            }
102            if (nodeId == null) {
103                nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY);
104                if (StringUtils.isEmpty(nodeId)) {
105                    throw new DocumentRouteException("No nodeId found on task: " + taskId);
106                }
107            }
108        }
109        GraphNode node = graph.getNode(nodeId);
110        if (node == null) {
111            throw new DocumentRouteException("Invalid nodeId: " + nodeId);
112        }
113        boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null
114                && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME));
115
116        if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) {
117            throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node);
118        }
119        if (!forceResume && node.getState() != State.SUSPENDED) {
120            throw new DocumentRouteException("Cannot resume on non-suspended node: " + node);
121        }
122        node.setAllVariables(varData, false);
123        if (StringUtils.isNotEmpty(status)) {
124            node.setButton(status);
125        }
126        if (task != null) {
127            finishTask(session, graph, node, task, false, status);
128            // don't delete (yet)
129            if (task != null) {
130                Map<String, Serializable> eventProperties = new HashMap<String, Serializable>();
131                eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
132                eventProperties.put("taskName", task.getName());
133                eventProperties.put("modelName", graph.getModelName());
134                eventProperties.put("action", status);
135                eventProperties.put("data", (Serializable) varData);
136                eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
137                eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getActingUser());
138                eventProperties.put("nodeVariables", (Serializable) node.getVariables());
139                eventProperties.put("workflowVariables", (Serializable) graph.getVariables());
140
141                // Compute duration of the task itself
142                long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId());
143                if (duration >= 0) {
144                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration);
145                }
146
147                // Then compute duration since workflow started
148                long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
149                if (timeSinceWfStarted >= 0) {
150                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
151                }
152
153                DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
154                envContext.setProperties(eventProperties);
155                EventProducer eventProducer = Framework.getLocalService(EventProducer.class);
156                eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name()));
157            }
158        } else {
159            // cancel any remaing tasks on this node
160            node.cancelTasks();
161        }
162        if (node.hasOpenTasks()) {
163            log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now.");
164            // do nothing, the workflow is resumed only when all the tasks
165            // created from
166            // this node are processed
167            // as this is a multi-task node, reset comment if it was
168            // previously set
169            if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null
170                    && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey(
171                            GraphNode.NODE_VARIABLE_COMMENT)) {
172                node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, "");
173            }
174            return;
175        }
176        runGraph(session, element, node);
177    }
178
179    @Override
180    public void cancel(CoreSession session, DocumentRouteElement element) {
181        GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null;
182
183        Map<String, Serializable> eventProperties = new HashMap<String, Serializable>();
184        if (graph != null) {
185            eventProperties.put("modelId", graph.getModelId());
186            eventProperties.put("modelName", graph.getModelName());
187            eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables());
188            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
189            // Get the list of pending node
190            List<String> pendingNodeNames = new ArrayList<String>();
191            for (GraphNode suspendedNode : graph.getSuspendedNodes()) {
192                pendingNodeNames.add(suspendedNode.getId());
193            }
194            eventProperties.put("pendingNodes", (Serializable) pendingNodeNames);
195        }
196        EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name());
197
198        super.cancel(session, element);
199        if (graph == null) {
200            return;
201        }
202        // also cancel tasks
203        // also cancel sub-workflows
204        for (GraphNode node : graph.getNodes()) {
205            node.cancelTasks();
206            node.cancelSubRoute();
207        }
208    }
209
210    /**
211     * Runs the graph starting with the given node.
212     *
213     * @param graph the graph
214     * @param initialNode the initial node to run
215     */
216    protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode)
217            throws DocumentRouteException {
218        GraphRoute graph = (GraphRoute) element;
219        List<GraphNode> pendingSubRoutes = new LinkedList<GraphNode>();
220        LinkedList<GraphNode> pendingNodes = new LinkedList<GraphNode>();
221        pendingNodes.add(initialNode);
222        boolean done = false;
223        int count = 0;
224        while (!pendingNodes.isEmpty()) {
225            GraphNode node = pendingNodes.pop();
226            count++;
227            if (count > MAX_LOOPS) {
228                throw new DocumentRouteException("Execution is looping, node: " + node);
229            }
230            State jump = null;
231            switch (node.getState()) {
232            case READY:
233                log.debug("Doing node " + node);
234                if (node.isMerge()) {
235                    jump = State.WAITING;
236                } else {
237                    jump = State.RUNNING_INPUT;
238                }
239                break;
240            case WAITING:
241                if (node.canMerge()) {
242                    recursiveCancelInput(graph, node, pendingNodes);
243                    jump = State.RUNNING_INPUT;
244                }
245                // else leave state to WAITING
246                break;
247            case RUNNING_INPUT:
248                node.starting();
249                node.executeChain(node.getInputChain());
250                if (node.hasTask() || node.hasMultipleTasks()) {
251                    createTask(session, graph, node); // may create several
252                    node.setState(State.SUSPENDED);
253                }
254                if (node.hasSubRoute()) {
255                    if (!pendingSubRoutes.contains(node)) {
256                        pendingSubRoutes.add(node);
257                    }
258                    node.setState(State.SUSPENDED);
259                }
260                if (node.getState() != State.SUSPENDED) {
261                    jump = State.RUNNING_OUTPUT;
262                }
263                // else this node is suspended,
264                // remove it from queue of nodes to process
265                break;
266            case SUSPENDED:
267                if (node != initialNode) {
268                    throw new DocumentRouteException("Executing unexpected SUSPENDED state");
269                }
270                // actor
271                NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal();
272                String actor = principal.getActingUser();
273                node.setLastActor(actor);
274                // resuming, variables have been set by resumeGraph
275                jump = State.RUNNING_OUTPUT;
276                break;
277            case RUNNING_OUTPUT:
278                node.executeChain(node.getOutputChain());
279                List<Transition> trueTrans = node.evaluateTransitions();
280                node.ending();
281                node.setState(State.READY);
282                if (node.isStop()) {
283                    if (!pendingNodes.isEmpty()) {
284                        throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s",
285                                graph, pendingNodes));
286                    }
287                    done = true;
288                } else {
289                    if (trueTrans.isEmpty()) {
290                        throw new DocumentRouteException("No transition evaluated to true from node " + node);
291                    }
292                    for (Transition t : trueTrans) {
293                        node.executeTransitionChain(t);
294                        GraphNode target = graph.getNode(t.target);
295                        if (!pendingNodes.contains(target)) {
296                            pendingNodes.add(target);
297                        }
298                    }
299                }
300                break;
301            }
302            if (jump != null) {
303                node.setState(jump);
304                // loop again on this node
305                count--;
306                pendingNodes.addFirst(node);
307            }
308        }
309        if (done) {
310            element.setDone(session);
311            /*
312             * Resume the parent route if this is a sub-route.
313             */
314            if (graph.hasParentRoute()) {
315                graph.resumeParentRoute(session);
316            }
317        }
318        /*
319         * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent
320         * node (above code).
321         */
322        for (GraphNode node : pendingSubRoutes) {
323            DocumentRoute subRoute = node.startSubRoute();
324        }
325        session.save();
326    }
327
328    protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) {
329        LinkedList<GraphNode> todo = new LinkedList<GraphNode>();
330        todo.add(originalNode);
331        Set<String> done = new HashSet<String>();
332        while (!todo.isEmpty()) {
333            GraphNode node = todo.pop();
334            done.add(node.getId());
335            for (Transition t : node.getInputTransitions()) {
336                if (t.loop) {
337                    // don't recurse through loop transitions
338                    continue;
339                }
340                GraphNode source = t.source;
341                if (done.contains(source.getId())) {
342                    // looping somewhere TODO check it's not happening
343                    continue;
344                }
345                source.setCanceled();
346                State state = source.getState();
347                source.setState(State.READY);
348                pendingNodes.remove(node);
349                if (state == State.SUSPENDED) {
350                    // we're suspended on a task, cancel it and stop recursion
351                    source.cancelTasks();
352                } else {
353                    // else recurse
354                    todo.add(source);
355                }
356            }
357        }
358    }
359
360    protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException {
361        DocumentRouteElement routeInstance = graph;
362        Map<String, String> taskVariables = new HashMap<String, String>();
363        taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY,
364                routeInstance.getDocument().getId());
365        taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId());
366        taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId());
367        String taskNotiftemplate = node.getTaskNotificationTemplate();
368        if (!StringUtils.isEmpty(taskNotiftemplate)) {
369            taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate);
370        } else {
371            // disable notification service
372            taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true");
373        }
374        // evaluate task assignees from taskVar if any
375        HashSet<String> actors = new LinkedHashSet<String>();
376        actors.addAll(node.evaluateTaskAssignees());
377        actors.addAll(node.getTaskAssignees());
378        // evaluate taskDueDate from the taskDueDateExpr;
379        Date dueDate = node.computeTaskDueDate();
380        DocumentModelList docs = graph.getAttachedDocumentModels();
381        TaskService taskService = Framework.getLocalService(TaskService.class);
382        DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class);
383        // TODO documents other than the first are not attached to the task
384        // (task API allows only one document)
385        // we may get several tasks if there's one per actor when the node
386        // has the property
387        // hasMultipleTasks set to true
388        List<Task> tasks = taskService.createTask(session, (NuxeoPrincipal) session.getPrincipal(), docs,
389                node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(),
390                new ArrayList<String>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate,
391                taskVariables, null, node.getWorkflowContextualInfo(session, true));
392
393        // Audit task assignment
394        for (Task task : tasks) {
395            Map<String, Serializable> eventProperties = new HashMap<String, Serializable>();
396            eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
397            eventProperties.put("taskName", node.getDocument().getTitle());
398            eventProperties.put("actors", actors);
399            eventProperties.put("modelId", graph.getModelId());
400            eventProperties.put("modelName", graph.getModelName());
401            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
402            eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getOriginatingUser());
403            eventProperties.put("nodeVariables", (Serializable) node.getVariables());
404            if (routeInstance instanceof GraphRoute) {
405                eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables());
406            }
407
408            // compute duration since workflow started
409            long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
410            if (timeSinceWfStarted >= 0) {
411                eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
412            }
413
414            DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
415            envContext.setProperties(eventProperties);
416            EventProducer eventProducer = Framework.getLocalService(EventProducer.class);
417            eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name()));
418        }
419
420        // routing.makeRoutingTasks(session, tasks);
421        for (Task task : tasks) {
422            node.addTaskInfo(task.getId());
423        }
424        String taskAssigneesPermission = node.getTaskAssigneesPermission();
425        if (StringUtils.isEmpty(taskAssigneesPermission)) {
426            return;
427        }
428        for (Task task : tasks) {
429            routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task);
430        }
431    }
432
433    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete)
434            throws DocumentRouteException {
435        finishTask(session, graph, node, task, delete, null);
436    }
437
438    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete,
439            String status) throws DocumentRouteException {
440        DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class);
441        DocumentModelList docs = graph.getAttachedDocumentModels();
442        routing.removePermissionsForTaskActors(session, docs, task);
443        // delete task
444        if (delete) {
445            session.removeDocument(new IdRef(task.getId()));
446        }
447        // get the last comment on the task, if there are several:
448        // task might have been previously reassigned or delegated
449        List<TaskComment> comments = task.getComments();
450        String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : "";
451        // actor
452        NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal();
453        String actor = principal.getActingUser();
454        node.updateTaskInfo(task.getId(), true, status, actor, comment);
455    }
456
457}