001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.platform.routing.core.impl; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedHashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.commons.lang3.StringUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.automation.core.Constants; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.DocumentModelList; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.NuxeoPrincipal; 041import org.nuxeo.ecm.core.event.EventProducer; 042import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 043import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 044import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement; 045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants; 046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 047import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException; 048import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper; 049import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State; 050import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition; 051import org.nuxeo.ecm.platform.task.Task; 052import org.nuxeo.ecm.platform.task.TaskComment; 053import org.nuxeo.ecm.platform.task.TaskEventNames; 054import org.nuxeo.ecm.platform.task.TaskService; 055import org.nuxeo.runtime.api.Framework; 056 057/** 058 * Runs the proper nodes depending on the graph state. 059 * 060 * @since 5.6 061 */ 062public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable { 063 064 private static final Log log = LogFactory.getLog(GraphRunner.class); 065 066 /** 067 * Maximum number of steps we do before deciding that this graph is looping. 068 */ 069 public static final int MAX_LOOPS = 100; 070 071 @Override 072 public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) { 073 GraphRoute graph = (GraphRoute) element; 074 element.setRunning(session); 075 if (map != null) { 076 graph.setVariables(map); 077 } 078 runGraph(session, element, graph.getStartNode()); 079 } 080 081 @Override 082 public void run(CoreSession session, DocumentRouteElement element) { 083 run(session, element, null); 084 } 085 086 @SuppressWarnings("unchecked") 087 @Override 088 public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId, 089 Map<String, Object> varData, String status) { 090 GraphRoute graph = (GraphRoute) element; 091 Task task = null; 092 if (taskId == null) { 093 if (nodeId == null) { 094 throw new DocumentRouteException("nodeId and taskId both missing"); 095 } 096 } else { 097 DocumentModel taskDoc = session.getDocument(new IdRef(taskId)); 098 task = taskDoc.getAdapter(Task.class); 099 if (task == null) { 100 throw new DocumentRouteException("Invalid taskId: " + taskId); 101 } 102 if (nodeId == null) { 103 nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY); 104 if (StringUtils.isEmpty(nodeId)) { 105 throw new DocumentRouteException("No nodeId found on task: " + taskId); 106 } 107 } 108 } 109 GraphNode node = graph.getNode(nodeId); 110 if (node == null) { 111 throw new DocumentRouteException("Invalid nodeId: " + nodeId); 112 } 113 boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null 114 && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME)); 115 116 if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) { 117 throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node); 118 } 119 if (!forceResume && node.getState() != State.SUSPENDED) { 120 throw new DocumentRouteException("Cannot resume on non-suspended node: " + node); 121 } 122 node.setAllVariables(varData, false); 123 if (StringUtils.isNotEmpty(status)) { 124 node.setButton(status); 125 } 126 if (task != null) { 127 finishTask(session, graph, node, task, false, status); 128 // don't delete (yet) 129 if (task != null) { 130 Map<String, Serializable> eventProperties = new HashMap<>(); 131 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 132 eventProperties.put("taskName", task.getName()); 133 eventProperties.put("modelName", graph.getModelName()); 134 eventProperties.put("action", status); 135 eventProperties.put("data", (Serializable) varData); 136 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 137 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getActingUser()); 138 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 139 eventProperties.put("workflowVariables", (Serializable) graph.getVariables()); 140 141 // Compute duration of the task itself 142 long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId()); 143 if (duration >= 0) { 144 eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration); 145 } 146 147 // Then compute duration since workflow started 148 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 149 if (timeSinceWfStarted >= 0) { 150 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 151 } 152 153 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 154 envContext.setProperties(eventProperties); 155 EventProducer eventProducer = Framework.getService(EventProducer.class); 156 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name())); 157 } 158 } else { 159 // cancel any remaing tasks on this node 160 node.cancelTasks(); 161 } 162 if (node.hasOpenTasks()) { 163 log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now."); 164 // do nothing, the workflow is resumed only when all the tasks 165 // created from 166 // this node are processed 167 // as this is a multi-task node, reset comment if it was 168 // previously set 169 if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null 170 && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey( 171 GraphNode.NODE_VARIABLE_COMMENT)) { 172 node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, ""); 173 } 174 return; 175 } 176 runGraph(session, element, node); 177 } 178 179 @Override 180 public void cancel(CoreSession session, DocumentRouteElement element) { 181 GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null; 182 183 Map<String, Serializable> eventProperties = new HashMap<>(); 184 if (graph != null) { 185 eventProperties.put("modelId", graph.getModelId()); 186 eventProperties.put("modelName", graph.getModelName()); 187 eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables()); 188 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 189 // Get the list of pending node 190 List<String> pendingNodeNames = new ArrayList<>(); 191 for (GraphNode suspendedNode : graph.getSuspendedNodes()) { 192 pendingNodeNames.add(suspendedNode.getId()); 193 } 194 eventProperties.put("pendingNodes", (Serializable) pendingNodeNames); 195 } 196 EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name()); 197 198 super.cancel(session, element); 199 if (graph == null) { 200 return; 201 } 202 // also cancel tasks 203 // also cancel sub-workflows 204 for (GraphNode node : graph.getNodes()) { 205 node.cancelTasks(); 206 node.cancelSubRoute(); 207 } 208 } 209 210 /** 211 * Runs the graph starting with the given node. 212 * 213 * @param initialNode the initial node to run 214 */ 215 protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode) 216 throws DocumentRouteException { 217 GraphRoute graph = (GraphRoute) element; 218 List<GraphNode> pendingSubRoutes = new LinkedList<>(); 219 LinkedList<GraphNode> pendingNodes = new LinkedList<>(); 220 pendingNodes.add(initialNode); 221 boolean done = false; 222 int count = 0; 223 while (!pendingNodes.isEmpty()) { 224 GraphNode node = pendingNodes.pop(); 225 count++; 226 if (count > MAX_LOOPS) { 227 throw new DocumentRouteException("Execution is looping, node: " + node); 228 } 229 State jump = null; 230 switch (node.getState()) { 231 case READY: 232 log.debug("Doing node " + node); 233 if (node.isMerge()) { 234 jump = State.WAITING; 235 } else { 236 jump = State.RUNNING_INPUT; 237 } 238 break; 239 case WAITING: 240 if (node.canMerge()) { 241 recursiveCancelInput(graph, node, pendingNodes); 242 jump = State.RUNNING_INPUT; 243 } 244 // else leave state to WAITING 245 break; 246 case RUNNING_INPUT: 247 node.starting(); 248 node.executeChain(node.getInputChain()); 249 if (node.hasTask() || node.hasMultipleTasks()) { 250 createTask(session, graph, node); // may create several 251 node.setState(State.SUSPENDED); 252 } 253 if (node.hasSubRoute()) { 254 if (!pendingSubRoutes.contains(node)) { 255 pendingSubRoutes.add(node); 256 } 257 node.setState(State.SUSPENDED); 258 } 259 if (node.getState() != State.SUSPENDED) { 260 jump = State.RUNNING_OUTPUT; 261 } 262 // else this node is suspended, 263 // remove it from queue of nodes to process 264 break; 265 case SUSPENDED: 266 if (node != initialNode) { 267 throw new DocumentRouteException("Executing unexpected SUSPENDED state"); 268 } 269 // actor 270 NuxeoPrincipal principal = session.getPrincipal(); 271 String actor = principal.getActingUser(); 272 node.setLastActor(actor); 273 // resuming, variables have been set by resumeGraph 274 jump = State.RUNNING_OUTPUT; 275 break; 276 case RUNNING_OUTPUT: 277 node.executeChain(node.getOutputChain()); 278 List<Transition> trueTrans = node.evaluateTransitions(); 279 node.ending(); 280 node.setState(State.READY); 281 if (node.isStop()) { 282 if (!pendingNodes.isEmpty()) { 283 throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s", 284 graph, pendingNodes)); 285 } 286 done = true; 287 } else { 288 if (trueTrans.isEmpty()) { 289 throw new DocumentRouteException("No transition evaluated to true from node " + node); 290 } 291 for (Transition t : trueTrans) { 292 node.executeTransitionChain(t); 293 GraphNode target = graph.getNode(t.target); 294 if (!pendingNodes.contains(target)) { 295 pendingNodes.add(target); 296 } 297 } 298 } 299 break; 300 } 301 if (jump != null) { 302 node.setState(jump); 303 // loop again on this node 304 count--; 305 pendingNodes.addFirst(node); 306 } 307 } 308 if (done) { 309 element.setDone(session); 310 /* 311 * Resume the parent route if this is a sub-route. 312 */ 313 if (graph.hasParentRoute()) { 314 graph.resumeParentRoute(session); 315 } 316 } 317 /* 318 * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent 319 * node (above code). 320 */ 321 for (GraphNode node : pendingSubRoutes) { 322 DocumentRoute subRoute = node.startSubRoute(); 323 } 324 session.save(); 325 } 326 327 protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) { 328 LinkedList<GraphNode> todo = new LinkedList<>(); 329 todo.add(originalNode); 330 Set<String> done = new HashSet<>(); 331 while (!todo.isEmpty()) { 332 GraphNode node = todo.pop(); 333 done.add(node.getId()); 334 for (Transition t : node.getInputTransitions()) { 335 if (t.loop) { 336 // don't recurse through loop transitions 337 continue; 338 } 339 GraphNode source = t.source; 340 if (done.contains(source.getId())) { 341 // looping somewhere TODO check it's not happening 342 continue; 343 } 344 source.setCanceled(); 345 State state = source.getState(); 346 source.setState(State.READY); 347 pendingNodes.remove(node); 348 if (state == State.SUSPENDED) { 349 // we're suspended on a task, cancel it and stop recursion 350 source.cancelTasks(); 351 } else { 352 // else recurse 353 todo.add(source); 354 } 355 } 356 } 357 } 358 359 protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException { 360 DocumentRouteElement routeInstance = graph; 361 Map<String, String> taskVariables = new HashMap<>(); 362 taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY, 363 routeInstance.getDocument().getId()); 364 taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId()); 365 taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId()); 366 String taskNotiftemplate = node.getTaskNotificationTemplate(); 367 if (!StringUtils.isEmpty(taskNotiftemplate)) { 368 taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate); 369 } else { 370 // disable notification service 371 taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true"); 372 } 373 // evaluate task assignees from taskVar if any 374 HashSet<String> actors = new LinkedHashSet<>(); 375 actors.addAll(node.evaluateTaskAssignees()); 376 actors.addAll(node.getTaskAssignees()); 377 // evaluate taskDueDate from the taskDueDateExpr; 378 Date dueDate = node.computeTaskDueDate(); 379 DocumentModelList docs = graph.getAttachedDocumentModels(); 380 TaskService taskService = Framework.getService(TaskService.class); 381 DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class); 382 // TODO documents other than the first are not attached to the task 383 // (task API allows only one document) 384 // we may get several tasks if there's one per actor when the node 385 // has the property 386 // hasMultipleTasks set to true 387 List<Task> tasks = taskService.createTask(session, session.getPrincipal(), docs, 388 node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(), 389 new ArrayList<>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate, 390 taskVariables, null, node.getWorkflowContextualInfo(session, true)); 391 392 // Audit task assignment 393 for (Task task : tasks) { 394 Map<String, Serializable> eventProperties = new HashMap<>(); 395 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 396 eventProperties.put("taskName", node.getDocument().getTitle()); 397 eventProperties.put("actors", actors); 398 eventProperties.put("modelId", graph.getModelId()); 399 eventProperties.put("modelName", graph.getModelName()); 400 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 401 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getOriginatingUser()); 402 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 403 if (routeInstance instanceof GraphRoute) { 404 eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables()); 405 } 406 407 // compute duration since workflow started 408 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 409 if (timeSinceWfStarted >= 0) { 410 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 411 } 412 413 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 414 envContext.setProperties(eventProperties); 415 EventProducer eventProducer = Framework.getService(EventProducer.class); 416 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name())); 417 } 418 419 // routing.makeRoutingTasks(session, tasks); 420 for (Task task : tasks) { 421 node.addTaskInfo(task.getId()); 422 } 423 String taskAssigneesPermission = node.getTaskAssigneesPermission(); 424 if (StringUtils.isEmpty(taskAssigneesPermission)) { 425 return; 426 } 427 for (Task task : tasks) { 428 routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task); 429 } 430 } 431 432 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete) 433 throws DocumentRouteException { 434 finishTask(session, graph, node, task, delete, null); 435 } 436 437 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete, 438 String status) throws DocumentRouteException { 439 DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class); 440 DocumentModelList docs = graph.getAttachedDocumentModels(); 441 routing.removePermissionsForTaskActors(session, docs, task); 442 // delete task 443 if (delete) { 444 session.removeDocument(new IdRef(task.getId())); 445 } 446 // get the last comment on the task, if there are several: 447 // task might have been previously reassigned or delegated 448 List<TaskComment> comments = task.getComments(); 449 String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : ""; 450 // actor 451 NuxeoPrincipal principal = session.getPrincipal(); 452 String actor = principal.getActingUser(); 453 node.updateTaskInfo(task.getId(), true, status, actor, comment); 454 } 455 456}