001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.platform.routing.core.impl; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.LinkedHashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.commons.lang3.StringUtils; 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.nuxeo.ecm.automation.core.Constants; 036import org.nuxeo.ecm.core.api.CoreSession; 037import org.nuxeo.ecm.core.api.DocumentModel; 038import org.nuxeo.ecm.core.api.DocumentModelList; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.NuxeoPrincipal; 041import org.nuxeo.ecm.core.event.EventProducer; 042import org.nuxeo.ecm.core.event.impl.DocumentEventContext; 043import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 044import org.nuxeo.ecm.platform.routing.api.DocumentRouteElement; 045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingConstants; 046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 047import org.nuxeo.ecm.platform.routing.api.exception.DocumentRouteException; 048import org.nuxeo.ecm.platform.routing.core.audit.RoutingAuditHelper; 049import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.State; 050import org.nuxeo.ecm.platform.routing.core.impl.GraphNode.Transition; 051import org.nuxeo.ecm.platform.task.Task; 052import org.nuxeo.ecm.platform.task.TaskComment; 053import org.nuxeo.ecm.platform.task.TaskEventNames; 054import org.nuxeo.ecm.platform.task.TaskService; 055import org.nuxeo.runtime.api.Framework; 056 057/** 058 * Runs the proper nodes depending on the graph state. 059 * 060 * @since 5.6 061 */ 062public class GraphRunner extends AbstractRunner implements ElementRunner, Serializable { 063 064 private static final long serialVersionUID = 1L; 065 066 private static final Log log = LogFactory.getLog(GraphRunner.class); 067 068 /** 069 * Maximum number of steps we do before deciding that this graph is looping. 070 */ 071 public static final int MAX_LOOPS = 100; 072 073 @Override 074 public void run(CoreSession session, DocumentRouteElement element, Map<String, Serializable> map) { 075 GraphRoute graph = (GraphRoute) element; 076 element.setRunning(session); 077 if (map != null) { 078 graph.setVariables(map); 079 } 080 runGraph(session, element, graph.getStartNode()); 081 } 082 083 @Override 084 public void run(CoreSession session, DocumentRouteElement element) { 085 run(session, element, null); 086 } 087 088 @SuppressWarnings("unchecked") 089 @Override 090 public void resume(CoreSession session, DocumentRouteElement element, String nodeId, String taskId, 091 Map<String, Object> varData, String status) { 092 GraphRoute graph = (GraphRoute) element; 093 Task task = null; 094 if (taskId == null) { 095 if (nodeId == null) { 096 throw new DocumentRouteException("nodeId and taskId both missing"); 097 } 098 } else { 099 DocumentModel taskDoc = session.getDocument(new IdRef(taskId)); 100 task = taskDoc.getAdapter(Task.class); 101 if (task == null) { 102 throw new DocumentRouteException("Invalid taskId: " + taskId); 103 } 104 if (nodeId == null) { 105 nodeId = task.getVariable(DocumentRoutingConstants.TASK_NODE_ID_KEY); 106 if (StringUtils.isEmpty(nodeId)) { 107 throw new DocumentRouteException("No nodeId found on task: " + taskId); 108 } 109 } 110 } 111 GraphNode node = graph.getNode(nodeId); 112 if (node == null) { 113 throw new DocumentRouteException("Invalid nodeId: " + nodeId); 114 } 115 boolean forceResume = (varData != null && varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME) != null 116 && (Boolean) varData.get(DocumentRoutingConstants.WORKFLOW_FORCE_RESUME)); 117 118 if (forceResume && node.getState() != State.SUSPENDED && node.getState() != State.WAITING) { 119 throw new DocumentRouteException("Cannot force resume on non-suspended or non-waiting node: " + node); 120 } 121 if (!forceResume && node.getState() != State.SUSPENDED) { 122 throw new DocumentRouteException("Cannot resume on non-suspended node: " + node); 123 } 124 node.setAllVariables(varData, false); 125 if (StringUtils.isNotEmpty(status)) { 126 node.setButton(status); 127 } 128 if (task != null) { 129 finishTask(session, graph, node, task, false, status); 130 // don't delete (yet) 131 if (task != null) { 132 Map<String, Serializable> eventProperties = new HashMap<>(); 133 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 134 eventProperties.put("taskName", task.getName()); 135 eventProperties.put("modelName", graph.getModelName()); 136 eventProperties.put("action", status); 137 eventProperties.put("data", (Serializable) varData); 138 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 139 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getActingUser()); 140 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 141 eventProperties.put("workflowVariables", (Serializable) graph.getVariables()); 142 143 // Compute duration of the task itself 144 long duration = RoutingAuditHelper.computeDurationSinceTaskStarted(task.getId()); 145 if (duration >= 0) { 146 eventProperties.put(RoutingAuditHelper.TIME_SINCE_TASK_STARTED, duration); 147 } 148 149 // Then compute duration since workflow started 150 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 151 if (timeSinceWfStarted >= 0) { 152 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 153 } 154 155 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 156 envContext.setProperties(eventProperties); 157 EventProducer eventProducer = Framework.getService(EventProducer.class); 158 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskEnded.name())); 159 } 160 } else { 161 // cancel any remaing tasks on this node 162 node.cancelTasks(); 163 } 164 if (node.hasOpenTasks()) { 165 log.info("Node " + node.getId() + "has open tasks, the workflow can not be resumed for now."); 166 // do nothing, the workflow is resumed only when all the tasks 167 // created from 168 // this node are processed 169 // as this is a multi-task node, reset comment if it was 170 // previously set 171 if (varData != null && varData.get(Constants.VAR_WORKFLOW_NODE) != null 172 && ((Map<String, Serializable>) varData.get(Constants.VAR_WORKFLOW_NODE)).containsKey( 173 GraphNode.NODE_VARIABLE_COMMENT)) { 174 node.setVariable(GraphNode.NODE_VARIABLE_COMMENT, ""); 175 } 176 return; 177 } 178 runGraph(session, element, node); 179 } 180 181 @Override 182 public void cancel(CoreSession session, DocumentRouteElement element) { 183 GraphRoute graph = element instanceof GraphRoute ? (GraphRoute) element : null; 184 185 Map<String, Serializable> eventProperties = new HashMap<>(); 186 if (graph != null) { 187 eventProperties.put("modelId", graph.getModelId()); 188 eventProperties.put("modelName", graph.getModelName()); 189 eventProperties.put(RoutingAuditHelper.WORKFLOW_VARIABLES, (Serializable) graph.getVariables()); 190 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 191 // Get the list of pending node 192 List<String> pendingNodeNames = new ArrayList<>(); 193 for (GraphNode suspendedNode : graph.getSuspendedNodes()) { 194 pendingNodeNames.add(suspendedNode.getId()); 195 } 196 eventProperties.put("pendingNodes", (Serializable) pendingNodeNames); 197 } 198 EventFirer.fireEvent(session, element, eventProperties, DocumentRoutingConstants.Events.beforeWorkflowCanceled.name()); 199 200 super.cancel(session, element); 201 if (graph == null) { 202 return; 203 } 204 // also cancel tasks 205 // also cancel sub-workflows 206 for (GraphNode node : graph.getNodes()) { 207 node.cancelTasks(); 208 node.cancelSubRoute(); 209 } 210 } 211 212 /** 213 * Runs the graph starting with the given node. 214 * 215 * @param initialNode the initial node to run 216 */ 217 protected void runGraph(CoreSession session, DocumentRouteElement element, GraphNode initialNode) 218 throws DocumentRouteException { 219 GraphRoute graph = (GraphRoute) element; 220 List<GraphNode> pendingSubRoutes = new LinkedList<>(); 221 LinkedList<GraphNode> pendingNodes = new LinkedList<>(); 222 pendingNodes.add(initialNode); 223 boolean done = false; 224 int count = 0; 225 while (!pendingNodes.isEmpty()) { 226 GraphNode node = pendingNodes.pop(); 227 count++; 228 if (count > MAX_LOOPS) { 229 throw new DocumentRouteException("Execution is looping, node: " + node); 230 } 231 State jump = null; 232 switch (node.getState()) { 233 case READY: 234 log.debug("Doing node " + node); 235 if (node.isMerge()) { 236 jump = State.WAITING; 237 } else { 238 jump = State.RUNNING_INPUT; 239 } 240 break; 241 case WAITING: 242 if (node.canMerge()) { 243 recursiveCancelInput(graph, node, pendingNodes); 244 jump = State.RUNNING_INPUT; 245 } 246 // else leave state to WAITING 247 break; 248 case RUNNING_INPUT: 249 node.starting(); 250 node.executeChain(node.getInputChain()); 251 if (node.hasTask() || node.hasMultipleTasks()) { 252 createTask(session, graph, node); // may create several 253 node.setState(State.SUSPENDED); 254 } 255 if (node.hasSubRoute()) { 256 if (!pendingSubRoutes.contains(node)) { 257 pendingSubRoutes.add(node); 258 } 259 node.setState(State.SUSPENDED); 260 } 261 if (node.getState() != State.SUSPENDED) { 262 jump = State.RUNNING_OUTPUT; 263 } 264 // else this node is suspended, 265 // remove it from queue of nodes to process 266 break; 267 case SUSPENDED: 268 if (node != initialNode) { 269 throw new DocumentRouteException("Executing unexpected SUSPENDED state"); 270 } 271 // actor 272 NuxeoPrincipal principal = session.getPrincipal(); 273 String actor = principal.getActingUser(); 274 node.setLastActor(actor); 275 // resuming, variables have been set by resumeGraph 276 jump = State.RUNNING_OUTPUT; 277 break; 278 case RUNNING_OUTPUT: 279 node.executeChain(node.getOutputChain()); 280 List<Transition> trueTrans = node.evaluateTransitions(); 281 node.ending(); 282 node.setState(State.READY); 283 if (node.isStop()) { 284 if (!pendingNodes.isEmpty()) { 285 throw new DocumentRouteException(String.format("Route %s stopped with still pending nodes: %s", 286 graph, pendingNodes)); 287 } 288 done = true; 289 } else { 290 if (trueTrans.isEmpty()) { 291 throw new DocumentRouteException("No transition evaluated to true from node " + node); 292 } 293 for (Transition t : trueTrans) { 294 node.executeTransitionChain(t); 295 GraphNode target = graph.getNode(t.target); 296 if (!pendingNodes.contains(target)) { 297 pendingNodes.add(target); 298 } 299 } 300 } 301 break; 302 } 303 if (jump != null) { 304 node.setState(jump); 305 // loop again on this node 306 count--; 307 pendingNodes.addFirst(node); 308 } 309 } 310 if (done) { 311 element.setDone(session); 312 /* 313 * Resume the parent route if this is a sub-route. 314 */ 315 if (graph.hasParentRoute()) { 316 graph.resumeParentRoute(session); 317 } 318 } 319 /* 320 * Now run the sub-routes. If they are done, they'll call back into the routing service to resume the parent 321 * node (above code). 322 */ 323 for (GraphNode node : pendingSubRoutes) { 324 node.startSubRoute(); 325 } 326 session.save(); 327 } 328 329 protected void recursiveCancelInput(GraphRoute graph, GraphNode originalNode, LinkedList<GraphNode> pendingNodes) { 330 LinkedList<GraphNode> todo = new LinkedList<>(); 331 todo.add(originalNode); 332 Set<String> done = new HashSet<>(); 333 while (!todo.isEmpty()) { 334 GraphNode node = todo.pop(); 335 done.add(node.getId()); 336 for (Transition t : node.getInputTransitions()) { 337 if (t.loop) { 338 // don't recurse through loop transitions 339 continue; 340 } 341 GraphNode source = t.source; 342 if (done.contains(source.getId())) { 343 // looping somewhere TODO check it's not happening 344 continue; 345 } 346 source.setCanceled(); 347 State state = source.getState(); 348 source.setState(State.READY); 349 pendingNodes.remove(node); 350 if (state == State.SUSPENDED) { 351 // we're suspended on a task, cancel it and stop recursion 352 source.cancelTasks(); 353 } else { 354 // else recurse 355 todo.add(source); 356 } 357 } 358 } 359 } 360 361 protected void createTask(CoreSession session, GraphRoute graph, GraphNode node) throws DocumentRouteException { 362 DocumentRouteElement routeInstance = graph; 363 Map<String, String> taskVariables = new HashMap<>(); 364 taskVariables.put(DocumentRoutingConstants.TASK_ROUTE_INSTANCE_DOCUMENT_ID_KEY, 365 routeInstance.getDocument().getId()); 366 taskVariables.put(DocumentRoutingConstants.TASK_NODE_ID_KEY, node.getId()); 367 taskVariables.put(DocumentRoutingConstants.OPERATION_STEP_DOCUMENT_KEY, node.getDocument().getId()); 368 String taskNotiftemplate = node.getTaskNotificationTemplate(); 369 if (!StringUtils.isEmpty(taskNotiftemplate)) { 370 taskVariables.put(DocumentRoutingConstants.TASK_ASSIGNED_NOTIFICATION_TEMPLATE, taskNotiftemplate); 371 } else { 372 // disable notification service 373 taskVariables.put(TaskEventNames.DISABLE_NOTIFICATION_SERVICE, "true"); 374 } 375 // evaluate task assignees from taskVar if any 376 HashSet<String> actors = new LinkedHashSet<>(); 377 actors.addAll(node.evaluateTaskAssignees()); 378 actors.addAll(node.getTaskAssignees()); 379 // evaluate taskDueDate from the taskDueDateExpr; 380 Date dueDate = node.computeTaskDueDate(); 381 DocumentModelList docs = graph.getAttachedDocumentModels(); 382 TaskService taskService = Framework.getService(TaskService.class); 383 DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class); 384 // TODO documents other than the first are not attached to the task 385 // (task API allows only one document) 386 // we may get several tasks if there's one per actor when the node 387 // has the property 388 // hasMultipleTasks set to true 389 List<Task> tasks = taskService.createTask(session, session.getPrincipal(), docs, 390 node.getTaskDocType(), node.getDocument().getTitle(), node.getId(), routeInstance.getDocument().getId(), 391 new ArrayList<>(actors), node.hasMultipleTasks(), node.getTaskDirective(), null, dueDate, 392 taskVariables, null, node.getWorkflowContextualInfo(session, true)); 393 394 // Audit task assignment 395 for (Task task : tasks) { 396 Map<String, Serializable> eventProperties = new HashMap<>(); 397 eventProperties.put(DocumentEventContext.CATEGORY_PROPERTY_KEY, DocumentRoutingConstants.ROUTING_CATEGORY); 398 eventProperties.put("taskName", node.getDocument().getTitle()); 399 eventProperties.put("actors", actors); 400 eventProperties.put("modelId", graph.getModelId()); 401 eventProperties.put("modelName", graph.getModelName()); 402 eventProperties.put(RoutingAuditHelper.WORKFLOW_INITATIOR, graph.getInitiator()); 403 eventProperties.put(RoutingAuditHelper.TASK_ACTOR, session.getPrincipal().getOriginatingUser()); 404 eventProperties.put("nodeVariables", (Serializable) node.getVariables()); 405 if (routeInstance instanceof GraphRoute) { 406 eventProperties.put("workflowVariables", (Serializable) ((GraphRoute) routeInstance).getVariables()); 407 } 408 409 // compute duration since workflow started 410 long timeSinceWfStarted = RoutingAuditHelper.computeDurationSinceWfStarted(task.getProcessId()); 411 if (timeSinceWfStarted >= 0) { 412 eventProperties.put(RoutingAuditHelper.TIME_SINCE_WF_STARTED, timeSinceWfStarted); 413 } 414 415 DocumentEventContext envContext = new DocumentEventContext(session, session.getPrincipal(), task.getDocument()); 416 envContext.setProperties(eventProperties); 417 EventProducer eventProducer = Framework.getService(EventProducer.class); 418 eventProducer.fireEvent(envContext.newEvent(DocumentRoutingConstants.Events.afterWorkflowTaskCreated.name())); 419 } 420 421 // routing.makeRoutingTasks(session, tasks); 422 for (Task task : tasks) { 423 node.addTaskInfo(task.getId()); 424 } 425 String taskAssigneesPermission = node.getTaskAssigneesPermission(); 426 if (StringUtils.isEmpty(taskAssigneesPermission)) { 427 return; 428 } 429 for (Task task : tasks) { 430 routing.grantPermissionToTaskAssignees(session, taskAssigneesPermission, docs, task); 431 } 432 } 433 434 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete) 435 throws DocumentRouteException { 436 finishTask(session, graph, node, task, delete, null); 437 } 438 439 protected void finishTask(CoreSession session, GraphRoute graph, GraphNode node, Task task, boolean delete, 440 String status) throws DocumentRouteException { 441 DocumentRoutingService routing = Framework.getService(DocumentRoutingService.class); 442 DocumentModelList docs = graph.getAttachedDocumentModels(); 443 routing.removePermissionsForTaskActors(session, docs, task); 444 // delete task 445 if (delete) { 446 session.removeDocument(new IdRef(task.getId())); 447 } 448 // get the last comment on the task, if there are several: 449 // task might have been previously reassigned or delegated 450 List<TaskComment> comments = task.getComments(); 451 String comment = comments.size() > 0 ? comments.get(comments.size() - 1).getText() : ""; 452 // actor 453 NuxeoPrincipal principal = session.getPrincipal(); 454 String actor = principal.getActingUser(); 455 node.updateTaskInfo(task.getId(), true, status, actor, comment); 456 } 457 458}