001/* 002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors. 003 * 004 * All rights reserved. This program and the accompanying materials 005 * are made available under the terms of the GNU Lesser General Public License 006 * (LGPL) version 2.1 which accompanies this distribution, and is available at 007 * http://www.gnu.org/licenses/lgpl.html 008 * 009 * This library is distributed in the hope that it will be useful, 010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 012 * Lesser General Public License for more details. 013 * 014 * Contributors: 015 * Vladimir Pasquier <vpasquier@nuxeo.com> 016 */ 017 018package org.nuxeo.ecm.platform.routing.api.operation; 019 020import java.io.Serializable; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.commons.lang.StringUtils; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.javasimon.SimonManager; 029import org.javasimon.Split; 030import org.nuxeo.ecm.automation.OperationContext; 031import org.nuxeo.ecm.automation.core.Constants; 032import org.nuxeo.ecm.automation.core.annotations.Context; 033import org.nuxeo.ecm.automation.core.annotations.Operation; 034import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 035import org.nuxeo.ecm.automation.core.annotations.Param; 036import org.nuxeo.ecm.core.api.CoreInstance; 037import org.nuxeo.ecm.core.api.CoreSession; 038import org.nuxeo.ecm.core.api.DocumentModel; 039import org.nuxeo.ecm.core.api.IdRef; 040import org.nuxeo.ecm.core.api.IterableQueryResult; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.platform.routing.api.DocumentRoute; 043import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService; 044import org.nuxeo.runtime.api.Framework; 045import org.nuxeo.runtime.transaction.TransactionHelper; 046 047/** 048 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id 049 * <param>workflowId</param>. If the <param> nodeId</param> parameter is specified, then only the workflows suspened on 050 * that node are restarted. 051 * 052 * @since 5.7 053 */ 054@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" }) 055public class BulkRestartWorkflow { 056 057 public static final String ID = "WorkflowModel.BulkRestartInstances"; 058 059 private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class); 060 061 @Param(name = "workflowId", required = true) 062 protected String workflowId; 063 064 @Param(name = "nodeId", required = false) 065 protected String nodeId; 066 067 @Param(name = "reinitLifecycle", required = false) 068 protected boolean reinitLifecycle; 069 070 @Param(name = "batchSize", required = false) 071 protected Integer batchSize; 072 073 @Context 074 protected OperationContext ctx; 075 076 public static final int DEFAULT_BATCH_SIZE = 1000; 077 078 @OperationMethod 079 public void run() { 080 CoreSession session = null; 081 boolean transactionStarted = false; 082 Split split = SimonManager.getStopwatch(ID).start(); 083 try { 084 session = CoreInstance.openCoreSession(null); 085 086 // Fetching all routes 087 // If the nodeId parameter is null, fetch all the workflow routes 088 // with 089 // the given workflowId 090 String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR ecm:name like '%s') and ecm:currentLifeCycleState = 'running'"; 091 String key = "ecm:uuid"; 092 if (StringUtils.isEmpty(nodeId)) { 093 if (StringUtils.isEmpty(workflowId)) { 094 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 095 return; 096 } 097 query = String.format(query, key, workflowId, workflowId); 098 } else { 099 query = "Select %s from RouteNode where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'"; 100 key = "ecm:parentId"; 101 if (StringUtils.isEmpty(nodeId)) { 102 log.error("Need to specify either the workflowModelId either the nodeId to query the workflows"); 103 return; 104 } 105 query = String.format(query, key, nodeId); 106 } 107 108 IterableQueryResult results = session.queryAndFetch(query, "NXQL"); 109 List<String> routeIds = new ArrayList<String>(); 110 for (Map<String, Serializable> result : results) { 111 routeIds.add(result.get(key).toString()); 112 } 113 results.close(); 114 DocumentRoutingService routingService = Framework.getLocalService(DocumentRoutingService.class); 115 // Batching initialization 116 if (batchSize == null) { 117 batchSize = DEFAULT_BATCH_SIZE; 118 } 119 120 if (!TransactionHelper.isTransactionActive()) { 121 TransactionHelper.startTransaction(); 122 transactionStarted = true; 123 } 124 long routesRestartedCount = 0; 125 for (String routeId : routeIds) { 126 try { 127 DocumentModel docRoute = session.getDocument(new IdRef(routeId)); 128 DocumentRoute route = docRoute.getAdapter(DocumentRoute.class); 129 List<String> relatedDocIds = route.getAttachedDocuments(); 130 route.cancel(session); 131 132 log.debug("Canceling workflow " + route.getDocument().getName()); 133 134 if (reinitLifecycle) { 135 reinitLifecycle(relatedDocIds, session); 136 } 137 routingService.createNewInstance(workflowId, relatedDocIds, session, true); 138 for (String string : relatedDocIds) { 139 log.debug("Starting workflow for " + string); 140 } 141 // removing old workflow instance 142 session.removeDocument(route.getDocument().getRef()); 143 144 routesRestartedCount++; 145 if (routesRestartedCount % batchSize == 0) { 146 session.close(); 147 TransactionHelper.commitOrRollbackTransaction(); 148 TransactionHelper.startTransaction(); 149 session = CoreInstance.openCoreSession(null); 150 } 151 } catch (NuxeoException e) { 152 Throwable t = unwrapException(e); 153 log.error(t.getClass().getSimpleName() + ": " + t.getMessage()); 154 log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount 155 + " workflows have been processed."); 156 } 157 } 158 } finally { 159 if (session != null) { 160 session.close(); 161 } 162 TransactionHelper.commitOrRollbackTransaction(); 163 if (!transactionStarted) { 164 TransactionHelper.startTransaction(); 165 } 166 split.stop(); 167 log.info(split.toString()); 168 } 169 } 170 171 public static Throwable unwrapException(Throwable t) { 172 Throwable cause = null; 173 if (t != null) { 174 cause = t.getCause(); 175 } 176 if (cause == null) { 177 return t; 178 } else { 179 return unwrapException(cause); 180 } 181 } 182 183 protected void reinitLifecycle(List<String> docIds, CoreSession session) { 184 for (String docId : docIds) { 185 session.reinitLifeCycleState(new IdRef(docId)); 186 } 187 } 188 189}