001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Florent Guillaume
016 */
017package org.nuxeo.ecm.platform.routing.core.impl;
018
019import java.io.Serializable;
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.LinkedHashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import org.apache.commons.lang.StringUtils;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.nuxeo.ecm.automation.core.Constants;
034import org.nuxeo.ecm.core.api.CoreSession;
035import org.nuxeo.ecm.core.api.DocumentModel;
036import org.nuxeo.ecm.core.api.DocumentModelList;
037import org.nuxeo.ecm.core.api.IdRef;
038import org.nuxeo.ecm.core.api.NuxeoPrincipal;
039import org.nuxeo.ecm.core.event.EventProducer;
040import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
041import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
042import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement;
043import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants;
044import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
045import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException;
046import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper;
047import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State;
048import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition;
049import org.nuxeo.ecm.platform.task.Task;
050import org.nuxeo.ecm.platform.task.TaskComment;
051import org.nuxeo.ecm.platform.task.TaskEventNames;
052import org.nuxeo.ecm.platform.task.TaskService;
053import org.nuxeo.runtime.api.Framework;
054
055/**
056 * Runs the proper nodes depending on the graph state.
057 *
058 * @since 5.6
059 */
060public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable {
061
062    private static final Log log = LogFactory.getLog(GraphRunner.class);
063
064    /**
065     * Maximum number of steps we do before deciding that this graph is looping.
066     */
067    public static final int MAX_LOOPS = 100;
068
069    @Override
070    public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) {
071        GraphRoute graph = (GraphRoute) element;
072        element.setRunning(session);
073        if (map != null) {
074            graph.setVariables(map);
075        }
076        runGraph(session, element, graph.getStartNode());
077    }
078
079    @Override
080    public void run(CoreSession session, DocumentRouteElement element) {
081        run(session, element, null);
082    }
083
084    @SuppressWarnings("unchecked")
085    @Override
086    public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId,
087            Map<String, Object> varData, String status) {
088        GraphRoute graph = (GraphRoute) element;
089        Task task = null;
090        if (taskId == null) {
091            if (nodeId == null) {
092                throw new DocumentRouteException("nodeId and taskId both missing");
093            }
094        } else {
095            DocumentModel taskDoc = session.getDocument(new IdRef(taskId));
096            task = taskDoc.getAdapter(Task.class);
097            if (task == null) {
098                throw new DocumentRouteException("Invalid taskId: " + taskId);
099            }
100            if (nodeId == null) {
101                nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY);
102                if (StringUtils.isEmpty(nodeId)) {
103                    throw new DocumentRouteException("No nodeId found on task: " + taskId);
104                }
105            }
106        }
107        GraphNode node = graph.getNode(nodeId);
108        if (node == null) {
109            throw new DocumentRouteException("Invalid nodeId: " + nodeId);
110        }
111        boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null
112                && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME));
113
114        if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) {
115            throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node);
116        }
117        if (!forceResume && node.getState() != State.SUSPENDED) {
118            throw new DocumentRouteException("Cannot resume on non-suspended node: " + node);
119        }
120        node.setAllVariables(varData, false);
121        if (StringUtils.isNotEmpty(status)) {
122            node.setButton(status);
123        }
124        if (task != null) {
125            finishTask(session, graph, node, task, false, status);
126            // don't delete (yet)
127            if (task != null) {
128                Map<String, Serializable> eventProperties = new HashMap<String, Serializable>();
129                eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
130                eventProperties.put("taskName", task.getName());
131                eventProperties.put("modelName", graph.getModelName());
132                eventProperties.put("action", status);
133                eventProperties.put("data", (Serializable) varData);
134                eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
135                eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getActingUser());
136                eventProperties.put("nodeVariables", (Serializable) node.getVariables());
137                eventProperties.put("workflowVariables", (Serializable) graph.getVariables());
138
139                // Compute duration of the task itself
140                long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId());
141                if (duration >= 0) {
142                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration);
143                }
144
145                // Then compute duration since workflow started
146                long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
147                if (timeSinceWfStarted >= 0) {
148                    eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
149                }
150
151                DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
152                envContext.setProperties(eventProperties);
153                EventProducer eventProducer = Framework.getLocalService(EventProducer.class);
154                eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name()));
155            }
156        } else {
157            // cancel any remaing tasks on this node
158            node.cancelTasks();
159        }
160        if (node.hasOpenTasks()) {
161            log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now.");
162            // do nothing, the workflow is resumed only when all the tasks
163            // created from
164            // this node are processed
165            // as this is a multi-task node, reset comment if it was
166            // previously set
167            if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null
168                    && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey(
169                            GraphNode.NODE_VARIABLE_COMMENT)) {
170                node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, "");
171            }
172            return;
173        }
174        runGraph(session, element, node);
175    }
176
177    @Override
178    public void cancel(CoreSession session, DocumentRouteElement element) {
179        GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null;
180
181        Map<String, Serializable> eventProperties = new HashMap<String, Serializable>();
182        if (graph != null) {
183            eventProperties.put("modelId", graph.getModelId());
184            eventProperties.put("modelName", graph.getModelName());
185            eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables());
186            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
187            // Get the list of pending node
188            List<String> pendingNodeNames = new ArrayList<String>();
189            for (GraphNode suspendedNode : graph.getSuspendedNodes()) {
190                pendingNodeNames.add(suspendedNode.getId());
191            }
192            eventProperties.put("pendingNodes", (Serializable) pendingNodeNames);
193        }
194        EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name());
195
196        super.cancel(session, element);
197        if (graph == null) {
198            return;
199        }
200        // also cancel tasks
201        // also cancel sub-workflows
202        for (GraphNode node : graph.getNodes()) {
203            node.cancelTasks();
204            node.cancelSubRoute();
205        }
206    }
207
208    /**
209     * Runs the graph starting with the given node.
210     *
211     * @param graph the graph
212     * @param initialNode the initial node to run
213     */
214    protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode)
215            throws DocumentRouteException {
216        GraphRoute graph = (GraphRoute) element;
217        List<GraphNode> pendingSubRoutes = new LinkedList<GraphNode>();
218        LinkedList<GraphNode> pendingNodes = new LinkedList<GraphNode>();
219        pendingNodes.add(initialNode);
220        boolean done = false;
221        int count = 0;
222        while (!pendingNodes.isEmpty()) {
223            GraphNode node = pendingNodes.pop();
224            count++;
225            if (count > MAX_LOOPS) {
226                throw new DocumentRouteException("Execution is looping, node: " + node);
227            }
228            State jump = null;
229            switch (node.getState()) {
230            case READY:
231                log.debug("Doing node " + node);
232                if (node.isMerge()) {
233                    jump = State.WAITING;
234                } else {
235                    jump = State.RUNNING_INPUT;
236                }
237                break;
238            case WAITING:
239                if (node.canMerge()) {
240                    recursiveCancelInput(graph, node, pendingNodes);
241                    jump = State.RUNNING_INPUT;
242                }
243                // else leave state to WAITING
244                break;
245            case RUNNING_INPUT:
246                node.starting();
247                node.executeChain(node.getInputChain());
248                if (node.hasTask() || node.hasMultipleTasks()) {
249                    createTask(session, graph, node); // may create several
250                    node.setState(State.SUSPENDED);
251                }
252                if (node.hasSubRoute()) {
253                    if (!pendingSubRoutes.contains(node)) {
254                        pendingSubRoutes.add(node);
255                    }
256                    node.setState(State.SUSPENDED);
257                }
258                if (node.getState() != State.SUSPENDED) {
259                    jump = State.RUNNING_OUTPUT;
260                }
261                // else this node is suspended,
262                // remove it from queue of nodes to process
263                break;
264            case SUSPENDED:
265                if (node != initialNode) {
266                    throw new DocumentRouteException("Executing unexpected SUSPENDED state");
267                }
268                // actor
269                NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal();
270                String actor = principal.getActingUser();
271                node.setLastActor(actor);
272                // resuming, variables have been set by resumeGraph
273                jump = State.RUNNING_OUTPUT;
274                break;
275            case RUNNING_OUTPUT:
276                node.executeChain(node.getOutputChain());
277                List<Transition> trueTrans = node.evaluateTransitions();
278                node.ending();
279                node.setState(State.READY);
280                if (node.isStop()) {
281                    if (!pendingNodes.isEmpty()) {
282                        throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s",
283                                graph, pendingNodes));
284                    }
285                    done = true;
286                } else {
287                    if (trueTrans.isEmpty()) {
288                        throw new DocumentRouteException("No transition evaluated to true from node " + node);
289                    }
290                    for (Transition t : trueTrans) {
291                        node.executeTransitionChain(t);
292                        GraphNode target = graph.getNode(t.target);
293                        if (!pendingNodes.contains(target)) {
294                            pendingNodes.add(target);
295                        }
296                    }
297                }
298                break;
299            }
300            if (jump != null) {
301                node.setState(jump);
302                // loop again on this node
303                count--;
304                pendingNodes.addFirst(node);
305            }
306        }
307        if (done) {
308            element.setDone(session);
309            /*
310             * Resume the parent route if this is a sub-route.
311             */
312            if (graph.hasParentRoute()) {
313                graph.resumeParentRoute(session);
314            }
315        }
316        /*
317         * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent
318         * node (above code).
319         */
320        for (GraphNode node : pendingSubRoutes) {
321            DocumentRoute subRoute = node.startSubRoute();
322        }
323        session.save();
324    }
325
326    protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) {
327        LinkedList<GraphNode> todo = new LinkedList<GraphNode>();
328        todo.add(originalNode);
329        Set<String> done = new HashSet<String>();
330        while (!todo.isEmpty()) {
331            GraphNode node = todo.pop();
332            done.add(node.getId());
333            for (Transition t : node.getInputTransitions()) {
334                if (t.loop) {
335                    // don't recurse through loop transitions
336                    continue;
337                }
338                GraphNode source = t.source;
339                if (done.contains(source.getId())) {
340                    // looping somewhere TODO check it's not happening
341                    continue;
342                }
343                source.setCanceled();
344                State state = source.getState();
345                source.setState(State.READY);
346                pendingNodes.remove(node);
347                if (state == State.SUSPENDED) {
348                    // we're suspended on a task, cancel it and stop recursion
349                    source.cancelTasks();
350                } else {
351                    // else recurse
352                    todo.add(source);
353                }
354            }
355        }
356    }
357
358    protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException {
359        DocumentRouteElement routeInstance = graph;
360        Map<String, String> taskVariables = new HashMap<String, String>();
361        taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY,
362                routeInstance.getDocument().getId());
363        taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId());
364        taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId());
365        String taskNotiftemplate = node.getTaskNotificationTemplate();
366        if (!StringUtils.isEmpty(taskNotiftemplate)) {
367            taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate);
368        } else {
369            // disable notification service
370            taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true");
371        }
372        // evaluate task assignees from taskVar if any
373        HashSet<String> actors = new LinkedHashSet<String>();
374        actors.addAll(node.evaluateTaskAssignees());
375        actors.addAll(node.getTaskAssignees());
376        // evaluate taskDueDate from the taskDueDateExpr;
377        Date dueDate = node.computeTaskDueDate();
378        DocumentModelList docs = graph.getAttachedDocumentModels();
379        TaskService taskService = Framework.getLocalService(TaskService.class);
380        DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class);
381        // TODO documents other than the first are not attached to the task
382        // (task API allows only one document)
383        // we may get several tasks if there's one per actor when the node
384        // has the property
385        // hasMultipleTasks set to true
386        List<Task> tasks = taskService.createTask(session, (NuxeoPrincipal) session.getPrincipal(), docs,
387                node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(),
388                new ArrayList<String>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate,
389                taskVariables, null, node.getWorkflowContextualInfo(session, true));
390
391        // Audit task assignment
392        for (Task task : tasks) {
393            Map<String, Serializable> eventProperties = new HashMap<String, Serializable>();
394            eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY);
395            eventProperties.put("taskName", node.getDocument().getTitle());
396            eventProperties.put("actors", actors);
397            eventProperties.put("modelId", graph.getModelId());
398            eventProperties.put("modelName", graph.getModelName());
399            eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator());
400            eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getOriginatingUser());
401            eventProperties.put("nodeVariables", (Serializable) node.getVariables());
402            if (routeInstance instanceof GraphRoute) {
403                eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables());
404            }
405
406            // compute duration since workflow started
407            long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId());
408            if (timeSinceWfStarted >= 0) {
409                eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted);
410            }
411
412            DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument());
413            envContext.setProperties(eventProperties);
414            EventProducer eventProducer = Framework.getLocalService(EventProducer.class);
415            eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name()));
416        }
417
418        // routing.makeRoutingTasks(session, tasks);
419        for (Task task : tasks) {
420            node.addTaskInfo(task.getId());
421        }
422        String taskAssigneesPermission = node.getTaskAssigneesPermission();
423        if (StringUtils.isEmpty(taskAssigneesPermission)) {
424            return;
425        }
426        for (Task task : tasks) {
427            routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task);
428        }
429    }
430
431    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete)
432            throws DocumentRouteException {
433        finishTask(session, graph, node, task, delete, null);
434    }
435
436    protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete,
437            String status) throws DocumentRouteException {
438        DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class);
439        DocumentModelList docs = graph.getAttachedDocumentModels();
440        routing.removePermissionsForTaskActors(session, docs, task);
441        // delete task
442        if (delete) {
443            session.removeDocument(new IdRef(task.getId()));
444        }
445        // get the last comment on the task, if there are several:
446        // task might have been previously reassigned or delegated
447        List<TaskComment> comments = task.getComments();
448        String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : "";
449        // actor
450        NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal();
451        String actor = principal.getActingUser();
452        node.updateTaskInfo(task.getId(), true, status, actor, comment);
453    }
454
455}