001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Vladimir Pasquier <vpasquier@nuxeo.com>
018 */
019
020package org.nuxeo.ecm.platform.routing.api.operation;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.javasimon.SimonManager;
031import org.javasimon.Split;
032import org.nuxeo.ecm.automation.OperationContext;
033import org.nuxeo.ecm.automation.core.Constants;
034import org.nuxeo.ecm.automation.core.annotations.Context;
035import org.nuxeo.ecm.automation.core.annotations.Operation;
036import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
037import org.nuxeo.ecm.automation.core.annotations.Param;
038import org.nuxeo.ecm.core.api.CoreInstance;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.IdRef;
042import org.nuxeo.ecm.core.api.IterableQueryResult;
043import org.nuxeo.ecm.core.api.NuxeoException;
044import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
045import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.transaction.TransactionHelper;
048
049/**
050 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id
051 * <param>workflowId</param>. If the <param> nodeId</param> parameter is specified, then only the workflows suspened on
052 * that node are restarted.
053 *
054 * @since 5.7
055 */
056@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" })
057public class BulkRestartWorkflow {
058
059    public static final String ID = "WorkflowModel.BulkRestartInstances";
060
061    private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class);
062
063    @Param(name = "workflowId", required = true)
064    protected String workflowId;
065
066    @Param(name = "nodeId", required = false)
067    protected String nodeId;
068
069    @Param(name = "reinitLifecycle", required = false)
070    protected boolean reinitLifecycle;
071
072    @Param(name = "batchSize", required = false)
073    protected Integer batchSize;
074
075    @Context
076    protected OperationContext ctx;
077
078    public static final int DEFAULT_BATCH_SIZE = 1000;
079
080    @OperationMethod
081    public void run() {
082        CoreSession session = null;
083        boolean transactionStarted = false;
084        Split split = SimonManager.getStopwatch(ID).start();
085        try {
086            session = CoreInstance.openCoreSession(null);
087
088            // Fetching all routes
089            // If the nodeId parameter is null, fetch all the workflow routes
090            // with
091            // the given workflowId
092            String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR  ecm:name like '%s') and ecm:currentLifeCycleState = 'running'";
093            String key = "ecm:uuid";
094            if (StringUtils.isEmpty(nodeId)) {
095                if (StringUtils.isEmpty(workflowId)) {
096                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
097                    return;
098                }
099                query = String.format(query, key, workflowId, workflowId);
100            } else {
101                query = "Select %s from RouteNode  where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'";
102                key = "ecm:parentId";
103                if (StringUtils.isEmpty(nodeId)) {
104                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
105                    return;
106                }
107                query = String.format(query, key, nodeId);
108            }
109
110            IterableQueryResult results = session.queryAndFetch(query, "NXQL");
111            List<String> routeIds = new ArrayList<String>();
112            for (Map<String, Serializable> result : results) {
113                routeIds.add(result.get(key).toString());
114            }
115            results.close();
116            DocumentRoutingService routingService = Framework.getLocalService(DocumentRoutingService.class);
117            // Batching initialization
118            if (batchSize == null) {
119                batchSize = DEFAULT_BATCH_SIZE;
120            }
121
122            if (!TransactionHelper.isTransactionActive()) {
123                TransactionHelper.startTransaction();
124                transactionStarted = true;
125            }
126            long routesRestartedCount = 0;
127            for (String routeId : routeIds) {
128                try {
129                    DocumentModel docRoute = session.getDocument(new IdRef(routeId));
130                    DocumentRoute route = docRoute.getAdapter(DocumentRoute.class);
131                    List<String> relatedDocIds = route.getAttachedDocuments();
132                    route.cancel(session);
133
134                    log.debug("Canceling workflow  " + route.getDocument().getName());
135
136                    if (reinitLifecycle) {
137                        reinitLifecycle(relatedDocIds, session);
138                    }
139                    routingService.createNewInstance(workflowId, relatedDocIds, session, true);
140                    for (String string : relatedDocIds) {
141                        log.debug("Starting workflow for " + string);
142                    }
143                    // removing old workflow instance
144                    session.removeDocument(route.getDocument().getRef());
145
146                    routesRestartedCount++;
147                    if (routesRestartedCount % batchSize == 0) {
148                        session.close();
149                        TransactionHelper.commitOrRollbackTransaction();
150                        TransactionHelper.startTransaction();
151                        session = CoreInstance.openCoreSession(null);
152                    }
153                } catch (NuxeoException e) {
154                    Throwable t = unwrapException(e);
155                    log.error(t.getClass().getSimpleName() + ": " + t.getMessage());
156                    log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount
157                            + " workflows have been processed.");
158                }
159            }
160        } finally {
161            if (session != null) {
162                session.close();
163            }
164            TransactionHelper.commitOrRollbackTransaction();
165            if (!transactionStarted) {
166                TransactionHelper.startTransaction();
167            }
168            split.stop();
169            log.info(split.toString());
170        }
171    }
172
173    public static Throwable unwrapException(Throwable t) {
174        Throwable cause = null;
175        if (t != null) {
176            cause = t.getCause();
177        }
178        if (cause == null) {
179            return t;
180        } else {
181            return unwrapException(cause);
182        }
183    }
184
185    protected void reinitLifecycle(List<String> docIds, CoreSession session) {
186        for (String docId : docIds) {
187            session.reinitLifeCycleState(new IdRef(docId));
188        }
189    }
190
191}