001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Vladimir Pasquier <vpasquier@nuxeo.com> 018 */ 019 020package org.nuxeo.ecm.platform.routing.api.operation; 021 022import java.io.Serializable; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.javasimon.SimonManager; 031import org.javasimon.Split; 032import org.nuxeo.ecm.automation.OperationContext; 033import org.nuxeo.ecm.automation.core.Constants; 034import org.nuxeo.ecm.automation.core.annotations.Context; 035import org.nuxeo.ecm.automation.core.annotations.Operation; 036import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 037import org.nuxeo.ecm.automation.core.annotations.Param; 038import org.nuxeo.ecm.core.api.CoreInstance; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.IdRef; 042import org.nuxeo.ecm.core.api.IterableQueryResult; 043import org.nuxeo.ecm.core.api.NuxeoException; 044import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 046import org.nuxeo.runtime.api.Framework; 047import org.nuxeo.runtime.transaction.TransactionHelper; 048 049/** 050 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id 051 * <param>workflowId</param>. If the <param> nodeId</param> parameter is specified, then only the workflows suspened on 052 * that node are restarted. 053 * 054 * @since 5.7 055 */ 056@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" }) 057public class BulkRestartWorkflow { 058 059 public static final String ID = "WorkflowModel.BulkRestartInstances"; 060 061 private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class); 062 063 @Param(name = "workflowId", required = true) 064 protected String workflowId; 065 066 @Param(name = "nodeId", required = false) 067 protected String nodeId; 068 069 @Param(name = "reinitLifecycle", required = false) 070 protected boolean reinitLifecycle; 071 072 @Param(name = "batchSize", required = false) 073 protected Integer batchSize; 074 075 @Context 076 protected OperationContext ctx; 077 078 public static final int DEFAULT_BATCH_SIZE = 1000; 079 080 @OperationMethod 081 public void run() { 082 CoreSession session = null; 083 boolean transactionStarted = false; 084 Split split = SimonManager.getStopwatch(ID).start(); 085 try { 086 session = CoreInstance.openCoreSession(null); 087 088 // Fetching all routes 089 // If the nodeId parameter is null, fetch all the workflow routes 090 // with 091 // the given workflowId 092 String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR ecm:name like '%s') and ecm:currentLifeCycleState = 'running'"; 093 String key = "ecm:uuid"; 094 if (StringUtils.isEmpty(nodeId)) { 095 if (StringUtils.isEmpty(workflowId)) { 096 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 097 return; 098 } 099 query = String.format(query, key, workflowId, workflowId); 100 } else { 101 query = "Select %s from RouteNode where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'"; 102 key = "ecm:parentId"; 103 if (StringUtils.isEmpty(nodeId)) { 104 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 105 return; 106 } 107 query = String.format(query, key, nodeId); 108 } 109 110 IterableQueryResult results = session.queryAndFetch(query, "NXQL"); 111 List<String> routeIds = new ArrayList<String>(); 112 for (Map<String, Serializable> result : results) { 113 routeIds.add(result.get(key).toString()); 114 } 115 results.close(); 116 DocumentRoutingService routingService = Framework.getLocalService(DocumentRoutingService.class); 117 // Batching initialization 118 if (batchSize == null) { 119 batchSize = DEFAULT_BATCH_SIZE; 120 } 121 122 if (!TransactionHelper.isTransactionActive()) { 123 TransactionHelper.startTransaction(); 124 transactionStarted = true; 125 } 126 long routesRestartedCount = 0; 127 for (String routeId : routeIds) { 128 try { 129 DocumentModel docRoute = session.getDocument(new IdRef(routeId)); 130 DocumentRoute route = docRoute.getAdapter(DocumentRoute.class); 131 List<String> relatedDocIds = route.getAttachedDocuments(); 132 route.cancel(session); 133 134 log.debug("Canceling workflow " + route.getDocument().getName()); 135 136 if (reinitLifecycle) { 137 reinitLifecycle(relatedDocIds, session); 138 } 139 routingService.createNewInstance(workflowId, relatedDocIds, session, true); 140 for (String string : relatedDocIds) { 141 log.debug("Starting workflow for " + string); 142 } 143 // removing old workflow instance 144 session.removeDocument(route.getDocument().getRef()); 145 146 routesRestartedCount++; 147 if (routesRestartedCount % batchSize == 0) { 148 session.close(); 149 TransactionHelper.commitOrRollbackTransaction(); 150 TransactionHelper.startTransaction(); 151 session = CoreInstance.openCoreSession(null); 152 } 153 } catch (NuxeoException e) { 154 Throwable t = unwrapException(e); 155 log.error(t.getClass().getSimpleName() + ": " + t.getMessage()); 156 log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount 157 + " workflows have been processed."); 158 } 159 } 160 } finally { 161 if (session != null) { 162 session.close(); 163 } 164 TransactionHelper.commitOrRollbackTransaction(); 165 if (!transactionStarted) { 166 TransactionHelper.startTransaction(); 167 } 168 split.stop(); 169 log.info(split.toString()); 170 } 171 } 172 173 public static Throwable unwrapException(Throwable t) { 174 Throwable cause = null; 175 if (t != null) { 176 cause = t.getCause(); 177 } 178 if (cause == null) { 179 return t; 180 } else { 181 return unwrapException(cause); 182 } 183 } 184 185 protected void reinitLifecycle(List<String> docIds, CoreSession session) { 186 for (String docId : docIds) { 187 session.reinitLifeCycleState(new IdRef(docId)); 188 } 189 } 190 191}