001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Florent Guillaume 016 */ 017package org.nuxeo.ecm.platform.routing.core.impl; 018 019import java.io.Serializable; 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.LinkedHashSet; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import org.apache.commons.lang.StringUtils; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.nuxeo.ecm.automation.core.Constants; 034import org.nuxeo.ecm.core.api.CoreSession; 035import org.nuxeo.ecm.core.api.DocumentModel; 036import org.nuxeo.ecm.core.api.DocumentModelList; 037import org.nuxeo.ecm.core.api.IdRef; 038import org.nuxeo.ecm.core.api.NuxeoPrincipal; 039import org.nuxeo.ecm.core.event.EventProducer; 040import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 041import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 042import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement; 043import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants; 044import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 045import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException; 046import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper; 047import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State; 048import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition; 049import org.nuxeo.ecm.platform.task.Task; 050import org.nuxeo.ecm.platform.task.TaskComment; 051import org.nuxeo.ecm.platform.task.TaskEventNames; 052import org.nuxeo.ecm.platform.task.TaskService; 053import org.nuxeo.runtime.api.Framework; 054 055/** 056 * Runs the proper nodes depending on the graph state. 057 * 058 * @since 5.6 059 */ 060public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable { 061 062 private static final Log log = LogFactory.getLog(GraphRunner.class); 063 064 /** 065 * Maximum number of steps we do before deciding that this graph is looping. 066 */ 067 public static final int MAX_LOOPS = 100; 068 069 @Override 070 public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) { 071 GraphRoute graph = (GraphRoute) element; 072 element.setRunning(session); 073 if (map != null) { 074 graph.setVariables(map); 075 } 076 runGraph(session, element, graph.getStartNode()); 077 } 078 079 @Override 080 public void run(CoreSession session, DocumentRouteElement element) { 081 run(session, element, null); 082 } 083 084 @SuppressWarnings("unchecked") 085 @Override 086 public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId, 087 Map<String, Object> varData, String status) { 088 GraphRoute graph = (GraphRoute) element; 089 Task task = null; 090 if (taskId == null) { 091 if (nodeId == null) { 092 throw new DocumentRouteException("nodeId and taskId both missing"); 093 } 094 } else { 095 DocumentModel taskDoc = session.getDocument(new IdRef(taskId)); 096 task = taskDoc.getAdapter(Task.class); 097 if (task == null) { 098 throw new DocumentRouteException("Invalid taskId: " + taskId); 099 } 100 if (nodeId == null) { 101 nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY); 102 if (StringUtils.isEmpty(nodeId)) { 103 throw new DocumentRouteException("No nodeId found on task: " + taskId); 104 } 105 } 106 } 107 GraphNode node = graph.getNode(nodeId); 108 if (node == null) { 109 throw new DocumentRouteException("Invalid nodeId: " + nodeId); 110 } 111 boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null 112 && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME)); 113 114 if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) { 115 throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node); 116 } 117 if (!forceResume && node.getState() != State.SUSPENDED) { 118 throw new DocumentRouteException("Cannot resume on non-suspended node: " + node); 119 } 120 node.setAllVariables(varData, false); 121 if (StringUtils.isNotEmpty(status)) { 122 node.setButton(status); 123 } 124 if (task != null) { 125 finishTask(session, graph, node, task, false, status); 126 // don't delete (yet) 127 if (task != null) { 128 Map<String, Serializable> eventProperties = new HashMap<String, Serializable>(); 129 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 130 eventProperties.put("taskName", task.getName()); 131 eventProperties.put("modelName", graph.getModelName()); 132 eventProperties.put("action", status); 133 eventProperties.put("data", (Serializable) varData); 134 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 135 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getActingUser()); 136 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 137 eventProperties.put("workflowVariables", (Serializable) graph.getVariables()); 138 139 // Compute duration of the task itself 140 long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId()); 141 if (duration >= 0) { 142 eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration); 143 } 144 145 // Then compute duration since workflow started 146 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 147 if (timeSinceWfStarted >= 0) { 148 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 149 } 150 151 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 152 envContext.setProperties(eventProperties); 153 EventProducer eventProducer = Framework.getLocalService(EventProducer.class); 154 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name())); 155 } 156 } else { 157 // cancel any remaing tasks on this node 158 node.cancelTasks(); 159 } 160 if (node.hasOpenTasks()) { 161 log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now."); 162 // do nothing, the workflow is resumed only when all the tasks 163 // created from 164 // this node are processed 165 // as this is a multi-task node, reset comment if it was 166 // previously set 167 if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null 168 && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey( 169 GraphNode.NODE_VARIABLE_COMMENT)) { 170 node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, ""); 171 } 172 return; 173 } 174 runGraph(session, element, node); 175 } 176 177 @Override 178 public void cancel(CoreSession session, DocumentRouteElement element) { 179 GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null; 180 181 Map<String, Serializable> eventProperties = new HashMap<String, Serializable>(); 182 if (graph != null) { 183 eventProperties.put("modelId", graph.getModelId()); 184 eventProperties.put("modelName", graph.getModelName()); 185 eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables()); 186 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 187 // Get the list of pending node 188 List<String> pendingNodeNames = new ArrayList<String>(); 189 for (GraphNode suspendedNode : graph.getSuspendedNodes()) { 190 pendingNodeNames.add(suspendedNode.getId()); 191 } 192 eventProperties.put("pendingNodes", (Serializable) pendingNodeNames); 193 } 194 EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name()); 195 196 super.cancel(session, element); 197 if (graph == null) { 198 return; 199 } 200 // also cancel tasks 201 // also cancel sub-workflows 202 for (GraphNode node : graph.getNodes()) { 203 node.cancelTasks(); 204 node.cancelSubRoute(); 205 } 206 } 207 208 /** 209 * Runs the graph starting with the given node. 210 * 211 * @param graph the graph 212 * @param initialNode the initial node to run 213 */ 214 protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode) 215 throws DocumentRouteException { 216 GraphRoute graph = (GraphRoute) element; 217 List<GraphNode> pendingSubRoutes = new LinkedList<GraphNode>(); 218 LinkedList<GraphNode> pendingNodes = new LinkedList<GraphNode>(); 219 pendingNodes.add(initialNode); 220 boolean done = false; 221 int count = 0; 222 while (!pendingNodes.isEmpty()) { 223 GraphNode node = pendingNodes.pop(); 224 count++; 225 if (count > MAX_LOOPS) { 226 throw new DocumentRouteException("Execution is looping, node: " + node); 227 } 228 State jump = null; 229 switch (node.getState()) { 230 case READY: 231 log.debug("Doing node " + node); 232 if (node.isMerge()) { 233 jump = State.WAITING; 234 } else { 235 jump = State.RUNNING_INPUT; 236 } 237 break; 238 case WAITING: 239 if (node.canMerge()) { 240 recursiveCancelInput(graph, node, pendingNodes); 241 jump = State.RUNNING_INPUT; 242 } 243 // else leave state to WAITING 244 break; 245 case RUNNING_INPUT: 246 node.starting(); 247 node.executeChain(node.getInputChain()); 248 if (node.hasTask() || node.hasMultipleTasks()) { 249 createTask(session, graph, node); // may create several 250 node.setState(State.SUSPENDED); 251 } 252 if (node.hasSubRoute()) { 253 if (!pendingSubRoutes.contains(node)) { 254 pendingSubRoutes.add(node); 255 } 256 node.setState(State.SUSPENDED); 257 } 258 if (node.getState() != State.SUSPENDED) { 259 jump = State.RUNNING_OUTPUT; 260 } 261 // else this node is suspended, 262 // remove it from queue of nodes to process 263 break; 264 case SUSPENDED: 265 if (node != initialNode) { 266 throw new DocumentRouteException("Executing unexpected SUSPENDED state"); 267 } 268 // actor 269 NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal(); 270 String actor = principal.getActingUser(); 271 node.setLastActor(actor); 272 // resuming, variables have been set by resumeGraph 273 jump = State.RUNNING_OUTPUT; 274 break; 275 case RUNNING_OUTPUT: 276 node.executeChain(node.getOutputChain()); 277 List<Transition> trueTrans = node.evaluateTransitions(); 278 node.ending(); 279 node.setState(State.READY); 280 if (node.isStop()) { 281 if (!pendingNodes.isEmpty()) { 282 throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s", 283 graph, pendingNodes)); 284 } 285 done = true; 286 } else { 287 if (trueTrans.isEmpty()) { 288 throw new DocumentRouteException("No transition evaluated to true from node " + node); 289 } 290 for (Transition t : trueTrans) { 291 node.executeTransitionChain(t); 292 GraphNode target = graph.getNode(t.target); 293 if (!pendingNodes.contains(target)) { 294 pendingNodes.add(target); 295 } 296 } 297 } 298 break; 299 } 300 if (jump != null) { 301 node.setState(jump); 302 // loop again on this node 303 count--; 304 pendingNodes.addFirst(node); 305 } 306 } 307 if (done) { 308 element.setDone(session); 309 /* 310 * Resume the parent route if this is a sub-route. 311 */ 312 if (graph.hasParentRoute()) { 313 graph.resumeParentRoute(session); 314 } 315 } 316 /* 317 * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent 318 * node (above code). 319 */ 320 for (GraphNode node : pendingSubRoutes) { 321 DocumentRoute subRoute = node.startSubRoute(); 322 } 323 session.save(); 324 } 325 326 protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) { 327 LinkedList<GraphNode> todo = new LinkedList<GraphNode>(); 328 todo.add(originalNode); 329 Set<String> done = new HashSet<String>(); 330 while (!todo.isEmpty()) { 331 GraphNode node = todo.pop(); 332 done.add(node.getId()); 333 for (Transition t : node.getInputTransitions()) { 334 if (t.loop) { 335 // don't recurse through loop transitions 336 continue; 337 } 338 GraphNode source = t.source; 339 if (done.contains(source.getId())) { 340 // looping somewhere TODO check it's not happening 341 continue; 342 } 343 source.setCanceled(); 344 State state = source.getState(); 345 source.setState(State.READY); 346 pendingNodes.remove(node); 347 if (state == State.SUSPENDED) { 348 // we're suspended on a task, cancel it and stop recursion 349 source.cancelTasks(); 350 } else { 351 // else recurse 352 todo.add(source); 353 } 354 } 355 } 356 } 357 358 protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException { 359 DocumentRouteElement routeInstance = graph; 360 Map<String, String> taskVariables = new HashMap<String, String>(); 361 taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY, 362 routeInstance.getDocument().getId()); 363 taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId()); 364 taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId()); 365 String taskNotiftemplate = node.getTaskNotificationTemplate(); 366 if (!StringUtils.isEmpty(taskNotiftemplate)) { 367 taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate); 368 } else { 369 // disable notification service 370 taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true"); 371 } 372 // evaluate task assignees from taskVar if any 373 HashSet<String> actors = new LinkedHashSet<String>(); 374 actors.addAll(node.evaluateTaskAssignees()); 375 actors.addAll(node.getTaskAssignees()); 376 // evaluate taskDueDate from the taskDueDateExpr; 377 Date dueDate = node.computeTaskDueDate(); 378 DocumentModelList docs = graph.getAttachedDocumentModels(); 379 TaskService taskService = Framework.getLocalService(TaskService.class); 380 DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class); 381 // TODO documents other than the first are not attached to the task 382 // (task API allows only one document) 383 // we may get several tasks if there's one per actor when the node 384 // has the property 385 // hasMultipleTasks set to true 386 List<Task> tasks = taskService.createTask(session, (NuxeoPrincipal) session.getPrincipal(), docs, 387 node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(), 388 new ArrayList<String>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate, 389 taskVariables, null, node.getWorkflowContextualInfo(session, true)); 390 391 // Audit task assignment 392 for (Task task : tasks) { 393 Map<String, Serializable> eventProperties = new HashMap<String, Serializable>(); 394 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 395 eventProperties.put("taskName", node.getDocument().getTitle()); 396 eventProperties.put("actors", actors); 397 eventProperties.put("modelId", graph.getModelId()); 398 eventProperties.put("modelName", graph.getModelName()); 399 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 400 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getOriginatingUser()); 401 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 402 if (routeInstance instanceof GraphRoute) { 403 eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables()); 404 } 405 406 // compute duration since workflow started 407 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 408 if (timeSinceWfStarted >= 0) { 409 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 410 } 411 412 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 413 envContext.setProperties(eventProperties); 414 EventProducer eventProducer = Framework.getLocalService(EventProducer.class); 415 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name())); 416 } 417 418 // routing.makeRoutingTasks(session, tasks); 419 for (Task task : tasks) { 420 node.addTaskInfo(task.getId()); 421 } 422 String taskAssigneesPermission = node.getTaskAssigneesPermission(); 423 if (StringUtils.isEmpty(taskAssigneesPermission)) { 424 return; 425 } 426 for (Task task : tasks) { 427 routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task); 428 } 429 } 430 431 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete) 432 throws DocumentRouteException { 433 finishTask(session, graph, node, task, delete, null); 434 } 435 436 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete, 437 String status) throws DocumentRouteException { 438 DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class); 439 DocumentModelList docs = graph.getAttachedDocumentModels(); 440 routing.removePermissionsForTaskActors(session, docs, task); 441 // delete task 442 if (delete) { 443 session.removeDocument(new IdRef(task.getId())); 444 } 445 // get the last comment on the task, if there are several: 446 // task might have been previously reassigned or delegated 447 List<TaskComment> comments = task.getComments(); 448 String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : ""; 449 // actor 450 NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal(); 451 String actor = principal.getActingUser(); 452 node.updateTaskInfo(task.getId(), true, status, actor, comment); 453 } 454 455}