001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.platform.routing.core.impl; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedHashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.commons.lang.StringUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.automation.core.Constants; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.DocumentModelList; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.NuxeoPrincipal; 041import org.nuxeo.ecm.core.event.EventProducer; 042import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 043import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 044import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement; 045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants; 046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 047import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException; 048import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper; 049import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State; 050import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition; 051import org.nuxeo.ecm.platform.task.Task; 052import org.nuxeo.ecm.platform.task.TaskComment; 053import org.nuxeo.ecm.platform.task.TaskEventNames; 054import org.nuxeo.ecm.platform.task.TaskService; 055import org.nuxeo.runtime.api.Framework; 056 057/** 058 * Runs the proper nodes depending on the graph state. 059 * 060 * @since 5.6 061 */ 062public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable { 063 064 private static final Log log = LogFactory.getLog(GraphRunner.class); 065 066 /** 067 * Maximum number of steps we do before deciding that this graph is looping. 068 */ 069 public static final int MAX_LOOPS = 100; 070 071 @Override 072 public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) { 073 GraphRoute graph = (GraphRoute) element; 074 element.setRunning(session); 075 if (map != null) { 076 graph.setVariables(map); 077 } 078 runGraph(session, element, graph.getStartNode()); 079 } 080 081 @Override 082 public void run(CoreSession session, DocumentRouteElement element) { 083 run(session, element, null); 084 } 085 086 @SuppressWarnings("unchecked") 087 @Override 088 public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId, 089 Map<String, Object> varData, String status) { 090 GraphRoute graph = (GraphRoute) element; 091 Task task = null; 092 if (taskId == null) { 093 if (nodeId == null) { 094 throw new DocumentRouteException("nodeId and taskId both missing"); 095 } 096 } else { 097 DocumentModel taskDoc = session.getDocument(new IdRef(taskId)); 098 task = taskDoc.getAdapter(Task.class); 099 if (task == null) { 100 throw new DocumentRouteException("Invalid taskId: " + taskId); 101 } 102 if (nodeId == null) { 103 nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY); 104 if (StringUtils.isEmpty(nodeId)) { 105 throw new DocumentRouteException("No nodeId found on task: " + taskId); 106 } 107 } 108 } 109 GraphNode node = graph.getNode(nodeId); 110 if (node == null) { 111 throw new DocumentRouteException("Invalid nodeId: " + nodeId); 112 } 113 boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null 114 && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME)); 115 116 if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) { 117 throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node); 118 } 119 if (!forceResume && node.getState() != State.SUSPENDED) { 120 throw new DocumentRouteException("Cannot resume on non-suspended node: " + node); 121 } 122 node.setAllVariables(varData, false); 123 if (StringUtils.isNotEmpty(status)) { 124 node.setButton(status); 125 } 126 if (task != null) { 127 finishTask(session, graph, node, task, false, status); 128 // don't delete (yet) 129 if (task != null) { 130 Map<String, Serializable> eventProperties = new HashMap<String, Serializable>(); 131 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 132 eventProperties.put("taskName", task.getName()); 133 eventProperties.put("modelName", graph.getModelName()); 134 eventProperties.put("action", status); 135 eventProperties.put("data", (Serializable) varData); 136 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 137 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getActingUser()); 138 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 139 eventProperties.put("workflowVariables", (Serializable) graph.getVariables()); 140 141 // Compute duration of the task itself 142 long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId()); 143 if (duration >= 0) { 144 eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration); 145 } 146 147 // Then compute duration since workflow started 148 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 149 if (timeSinceWfStarted >= 0) { 150 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 151 } 152 153 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 154 envContext.setProperties(eventProperties); 155 EventProducer eventProducer = Framework.getLocalService(EventProducer.class); 156 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name())); 157 } 158 } else { 159 // cancel any remaing tasks on this node 160 node.cancelTasks(); 161 } 162 if (node.hasOpenTasks()) { 163 log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now."); 164 // do nothing, the workflow is resumed only when all the tasks 165 // created from 166 // this node are processed 167 // as this is a multi-task node, reset comment if it was 168 // previously set 169 if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null 170 && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey( 171 GraphNode.NODE_VARIABLE_COMMENT)) { 172 node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, ""); 173 } 174 return; 175 } 176 runGraph(session, element, node); 177 } 178 179 @Override 180 public void cancel(CoreSession session, DocumentRouteElement element) { 181 GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null; 182 183 Map<String, Serializable> eventProperties = new HashMap<String, Serializable>(); 184 if (graph != null) { 185 eventProperties.put("modelId", graph.getModelId()); 186 eventProperties.put("modelName", graph.getModelName()); 187 eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables()); 188 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 189 // Get the list of pending node 190 List<String> pendingNodeNames = new ArrayList<String>(); 191 for (GraphNode suspendedNode : graph.getSuspendedNodes()) { 192 pendingNodeNames.add(suspendedNode.getId()); 193 } 194 eventProperties.put("pendingNodes", (Serializable) pendingNodeNames); 195 } 196 EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name()); 197 198 super.cancel(session, element); 199 if (graph == null) { 200 return; 201 } 202 // also cancel tasks 203 // also cancel sub-workflows 204 for (GraphNode node : graph.getNodes()) { 205 node.cancelTasks(); 206 node.cancelSubRoute(); 207 } 208 } 209 210 /** 211 * Runs the graph starting with the given node. 212 * 213 * @param graph the graph 214 * @param initialNode the initial node to run 215 */ 216 protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode) 217 throws DocumentRouteException { 218 GraphRoute graph = (GraphRoute) element; 219 List<GraphNode> pendingSubRoutes = new LinkedList<GraphNode>(); 220 LinkedList<GraphNode> pendingNodes = new LinkedList<GraphNode>(); 221 pendingNodes.add(initialNode); 222 boolean done = false; 223 int count = 0; 224 while (!pendingNodes.isEmpty()) { 225 GraphNode node = pendingNodes.pop(); 226 count++; 227 if (count > MAX_LOOPS) { 228 throw new DocumentRouteException("Execution is looping, node: " + node); 229 } 230 State jump = null; 231 switch (node.getState()) { 232 case READY: 233 log.debug("Doing node " + node); 234 if (node.isMerge()) { 235 jump = State.WAITING; 236 } else { 237 jump = State.RUNNING_INPUT; 238 } 239 break; 240 case WAITING: 241 if (node.canMerge()) { 242 recursiveCancelInput(graph, node, pendingNodes); 243 jump = State.RUNNING_INPUT; 244 } 245 // else leave state to WAITING 246 break; 247 case RUNNING_INPUT: 248 node.starting(); 249 node.executeChain(node.getInputChain()); 250 if (node.hasTask() || node.hasMultipleTasks()) { 251 createTask(session, graph, node); // may create several 252 node.setState(State.SUSPENDED); 253 } 254 if (node.hasSubRoute()) { 255 if (!pendingSubRoutes.contains(node)) { 256 pendingSubRoutes.add(node); 257 } 258 node.setState(State.SUSPENDED); 259 } 260 if (node.getState() != State.SUSPENDED) { 261 jump = State.RUNNING_OUTPUT; 262 } 263 // else this node is suspended, 264 // remove it from queue of nodes to process 265 break; 266 case SUSPENDED: 267 if (node != initialNode) { 268 throw new DocumentRouteException("Executing unexpected SUSPENDED state"); 269 } 270 // actor 271 NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal(); 272 String actor = principal.getActingUser(); 273 node.setLastActor(actor); 274 // resuming, variables have been set by resumeGraph 275 jump = State.RUNNING_OUTPUT; 276 break; 277 case RUNNING_OUTPUT: 278 node.executeChain(node.getOutputChain()); 279 List<Transition> trueTrans = node.evaluateTransitions(); 280 node.ending(); 281 node.setState(State.READY); 282 if (node.isStop()) { 283 if (!pendingNodes.isEmpty()) { 284 throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s", 285 graph, pendingNodes)); 286 } 287 done = true; 288 } else { 289 if (trueTrans.isEmpty()) { 290 throw new DocumentRouteException("No transition evaluated to true from node " + node); 291 } 292 for (Transition t : trueTrans) { 293 node.executeTransitionChain(t); 294 GraphNode target = graph.getNode(t.target); 295 if (!pendingNodes.contains(target)) { 296 pendingNodes.add(target); 297 } 298 } 299 } 300 break; 301 } 302 if (jump != null) { 303 node.setState(jump); 304 // loop again on this node 305 count--; 306 pendingNodes.addFirst(node); 307 } 308 } 309 if (done) { 310 element.setDone(session); 311 /* 312 * Resume the parent route if this is a sub-route. 313 */ 314 if (graph.hasParentRoute()) { 315 graph.resumeParentRoute(session); 316 } 317 } 318 /* 319 * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent 320 * node (above code). 321 */ 322 for (GraphNode node : pendingSubRoutes) { 323 DocumentRoute subRoute = node.startSubRoute(); 324 } 325 session.save(); 326 } 327 328 protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) { 329 LinkedList<GraphNode> todo = new LinkedList<GraphNode>(); 330 todo.add(originalNode); 331 Set<String> done = new HashSet<String>(); 332 while (!todo.isEmpty()) { 333 GraphNode node = todo.pop(); 334 done.add(node.getId()); 335 for (Transition t : node.getInputTransitions()) { 336 if (t.loop) { 337 // don't recurse through loop transitions 338 continue; 339 } 340 GraphNode source = t.source; 341 if (done.contains(source.getId())) { 342 // looping somewhere TODO check it's not happening 343 continue; 344 } 345 source.setCanceled(); 346 State state = source.getState(); 347 source.setState(State.READY); 348 pendingNodes.remove(node); 349 if (state == State.SUSPENDED) { 350 // we're suspended on a task, cancel it and stop recursion 351 source.cancelTasks(); 352 } else { 353 // else recurse 354 todo.add(source); 355 } 356 } 357 } 358 } 359 360 protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException { 361 DocumentRouteElement routeInstance = graph; 362 Map<String, String> taskVariables = new HashMap<String, String>(); 363 taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY, 364 routeInstance.getDocument().getId()); 365 taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId()); 366 taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId()); 367 String taskNotiftemplate = node.getTaskNotificationTemplate(); 368 if (!StringUtils.isEmpty(taskNotiftemplate)) { 369 taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate); 370 } else { 371 // disable notification service 372 taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true"); 373 } 374 // evaluate task assignees from taskVar if any 375 HashSet<String> actors = new LinkedHashSet<String>(); 376 actors.addAll(node.evaluateTaskAssignees()); 377 actors.addAll(node.getTaskAssignees()); 378 // evaluate taskDueDate from the taskDueDateExpr; 379 Date dueDate = node.computeTaskDueDate(); 380 DocumentModelList docs = graph.getAttachedDocumentModels(); 381 TaskService taskService = Framework.getLocalService(TaskService.class); 382 DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class); 383 // TODO documents other than the first are not attached to the task 384 // (task API allows only one document) 385 // we may get several tasks if there's one per actor when the node 386 // has the property 387 // hasMultipleTasks set to true 388 List<Task> tasks = taskService.createTask(session, (NuxeoPrincipal) session.getPrincipal(), docs, 389 node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(), 390 new ArrayList<String>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate, 391 taskVariables, null, node.getWorkflowContextualInfo(session, true)); 392 393 // Audit task assignment 394 for (Task task : tasks) { 395 Map<String, Serializable> eventProperties = new HashMap<String, Serializable>(); 396 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 397 eventProperties.put("taskName", node.getDocument().getTitle()); 398 eventProperties.put("actors", actors); 399 eventProperties.put("modelId", graph.getModelId()); 400 eventProperties.put("modelName", graph.getModelName()); 401 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 402 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, ((NuxeoPrincipal) session.getPrincipal()).getOriginatingUser()); 403 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 404 if (routeInstance instanceof GraphRoute) { 405 eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables()); 406 } 407 408 // compute duration since workflow started 409 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 410 if (timeSinceWfStarted >= 0) { 411 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 412 } 413 414 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 415 envContext.setProperties(eventProperties); 416 EventProducer eventProducer = Framework.getLocalService(EventProducer.class); 417 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name())); 418 } 419 420 // routing.makeRoutingTasks(session, tasks); 421 for (Task task : tasks) { 422 node.addTaskInfo(task.getId()); 423 } 424 String taskAssigneesPermission = node.getTaskAssigneesPermission(); 425 if (StringUtils.isEmpty(taskAssigneesPermission)) { 426 return; 427 } 428 for (Task task : tasks) { 429 routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task); 430 } 431 } 432 433 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete) 434 throws DocumentRouteException { 435 finishTask(session, graph, node, task, delete, null); 436 } 437 438 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete, 439 String status) throws DocumentRouteException { 440 DocumentRoutingService routing = Framework.getLocalService(DocumentRoutingService.class); 441 DocumentModelList docs = graph.getAttachedDocumentModels(); 442 routing.removePermissionsForTaskActors(session, docs, task); 443 // delete task 444 if (delete) { 445 session.removeDocument(new IdRef(task.getId())); 446 } 447 // get the last comment on the task, if there are several: 448 // task might have been previously reassigned or delegated 449 List<TaskComment> comments = task.getComments(); 450 String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : ""; 451 // actor 452 NuxeoPrincipal principal = (NuxeoPrincipal) session.getPrincipal(); 453 String actor = principal.getActingUser(); 454 node.updateTaskInfo(task.getId(), true, status, actor, comment); 455 } 456 457}