001/* 002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Vladimir Pasquier <vpasquier@nuxeo.com> 018 */ 019 020package org.nuxeo.ecm.platform.routing.api.operation; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.commons.lang3.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.nuxeo.ecm.automation.OperationContext; 031import org.nuxeo.ecm.automation.core.Constants; 032import org.nuxeo.ecm.automation.core.annotations.Context; 033import org.nuxeo.ecm.automation.core.annotations.Operation; 034import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 035import org.nuxeo.ecm.automation.core.annotations.Param; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModel; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.IterableQueryResult; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 043import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047/** 048 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id 049 * <code>workflowId</code>. If the <code> nodeId</code> parameter is specified, then only the workflows suspended on 050 * that node are restarted. 051 * 052 * @since 5.7 053 */ 054@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" }) 055public class BulkRestartWorkflow { 056 057 public static final String ID = "WorkflowModel.BulkRestartInstances"; 058 059 private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class); 060 061 @Param(name = "workflowId") 062 protected String workflowId; 063 064 @Param(name = "nodeId", required = false) 065 protected String nodeId; 066 067 @Param(name = "reinitLifecycle", required = false) 068 protected boolean reinitLifecycle; 069 070 @Param(name = "batchSize", required = false) 071 protected Integer batchSize; 072 073 @Context 074 protected OperationContext ctx; 075 076 public static final int DEFAULT_BATCH_SIZE = 1000; 077 078 @OperationMethod 079 public void run() { 080 boolean transactionStarted = false; 081 try { 082 CoreSession session = CoreInstance.getCoreSession(null); 083 084 // Fetching all routes 085 // If the nodeId parameter is null, fetch all the workflow routes 086 // with 087 // the given workflowId 088 String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR ecm:name like '%s') and ecm:currentLifeCycleState = 'running'"; 089 String key = "ecm:uuid"; 090 if (StringUtils.isEmpty(nodeId)) { 091 if (StringUtils.isEmpty(workflowId)) { 092 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 093 return; 094 } 095 query = String.format(query, key, workflowId, workflowId); 096 } else { 097 query = "Select %s from RouteNode where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'"; 098 key = "ecm:parentId"; 099 if (StringUtils.isEmpty(nodeId)) { 100 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 101 return; 102 } 103 query = String.format(query, key, nodeId); 104 } 105 106 IterableQueryResult results = session.queryAndFetch(query, "NXQL"); 107 List<String> routeIds = new ArrayList<>(); 108 for (Map<String, Serializable> result : results) { 109 routeIds.add(result.get(key).toString()); 110 } 111 results.close(); 112 DocumentRoutingService routingService = Framework.getService(DocumentRoutingService.class); 113 // Batching initialization 114 if (batchSize == null) { 115 batchSize = DEFAULT_BATCH_SIZE; 116 } 117 118 if (!TransactionHelper.isTransactionActive()) { 119 TransactionHelper.startTransaction(); 120 transactionStarted = true; 121 } 122 long routesRestartedCount = 0; 123 for (String routeId : routeIds) { 124 try { 125 DocumentModel docRoute = session.getDocument(new IdRef(routeId)); 126 DocumentRoute route = docRoute.getAdapter(DocumentRoute.class); 127 List<String> relatedDocIds = route.getAttachedDocuments(); 128 route.cancel(session); 129 130 log.debug("Canceling workflow " + route.getDocument().getName()); 131 132 if (reinitLifecycle) { 133 reinitLifecycle(relatedDocIds, session); 134 } 135 routingService.createNewInstance(workflowId, relatedDocIds, session, true); 136 for (String string : relatedDocIds) { 137 log.debug("Starting workflow for " + string); 138 } 139 // removing old workflow instance 140 session.removeDocument(route.getDocument().getRef()); 141 142 routesRestartedCount++; 143 if (routesRestartedCount % batchSize == 0) { 144 TransactionHelper.commitOrRollbackTransaction(); 145 TransactionHelper.startTransaction(); 146 session = CoreInstance.getCoreSession(null); 147 } 148 } catch (NuxeoException e) { 149 Throwable t = unwrapException(e); 150 log.error(t.getClass().getSimpleName() + ": " + t.getMessage()); 151 log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount 152 + " workflows have been processed."); 153 } 154 } 155 } finally { 156 TransactionHelper.commitOrRollbackTransaction(); 157 if (!transactionStarted) { 158 TransactionHelper.startTransaction(); 159 } 160 } 161 } 162 163 public static Throwable unwrapException(Throwable t) { 164 Throwable cause = null; 165 if (t != null) { 166 cause = t.getCause(); 167 } 168 if (cause == null) { 169 return t; 170 } else { 171 return unwrapException(cause); 172 } 173 } 174 175 protected void reinitLifecycle(List<String> docIds, CoreSession session) { 176 for (String docId : docIds) { 177 session.reinitLifeCycleState(new IdRef(docId)); 178 } 179 } 180 181}