001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Vladimir Pasquier <vpasquier@nuxeo.com>
018 */
019
020package org.nuxeo.ecm.platform.routing.api.operation;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026
027import org.apache.commons.lang3.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.javasimon.SimonManager;
031import org.javasimon.Split;
032import org.nuxeo.ecm.automation.OperationContext;
033import org.nuxeo.ecm.automation.core.Constants;
034import org.nuxeo.ecm.automation.core.annotations.Context;
035import org.nuxeo.ecm.automation.core.annotations.Operation;
036import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
037import org.nuxeo.ecm.automation.core.annotations.Param;
038import org.nuxeo.ecm.core.api.CloseableCoreSession;
039import org.nuxeo.ecm.core.api.CoreInstance;
040import org.nuxeo.ecm.core.api.CoreSession;
041import org.nuxeo.ecm.core.api.DocumentModel;
042import org.nuxeo.ecm.core.api.IdRef;
043import org.nuxeo.ecm.core.api.IterableQueryResult;
044import org.nuxeo.ecm.core.api.NuxeoException;
045import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
046import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
047import org.nuxeo.runtime.api.Framework;
048import org.nuxeo.runtime.transaction.TransactionHelper;
049
050/**
051 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id
052 * <param>workflowId</param>. If the <param> nodeId</param> parameter is specified, then only the workflows suspened on
053 * that node are restarted.
054 *
055 * @since 5.7
056 */
057@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" })
058public class BulkRestartWorkflow {
059
060    public static final String ID = "WorkflowModel.BulkRestartInstances";
061
062    private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class);
063
064    @Param(name = "workflowId", required = true)
065    protected String workflowId;
066
067    @Param(name = "nodeId", required = false)
068    protected String nodeId;
069
070    @Param(name = "reinitLifecycle", required = false)
071    protected boolean reinitLifecycle;
072
073    @Param(name = "batchSize", required = false)
074    protected Integer batchSize;
075
076    @Context
077    protected OperationContext ctx;
078
079    public static final int DEFAULT_BATCH_SIZE = 1000;
080
081    @OperationMethod
082    public void run() {
083        CloseableCoreSession session = null;
084        boolean transactionStarted = false;
085        Split split = SimonManager.getStopwatch(ID).start();
086        try {
087            session = CoreInstance.openCoreSession(null);
088
089            // Fetching all routes
090            // If the nodeId parameter is null, fetch all the workflow routes
091            // with
092            // the given workflowId
093            String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR  ecm:name like '%s') and ecm:currentLifeCycleState = 'running'";
094            String key = "ecm:uuid";
095            if (StringUtils.isEmpty(nodeId)) {
096                if (StringUtils.isEmpty(workflowId)) {
097                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
098                    return;
099                }
100                query = String.format(query, key, workflowId, workflowId);
101            } else {
102                query = "Select %s from RouteNode  where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'";
103                key = "ecm:parentId";
104                if (StringUtils.isEmpty(nodeId)) {
105                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
106                    return;
107                }
108                query = String.format(query, key, nodeId);
109            }
110
111            IterableQueryResult results = session.queryAndFetch(query, "NXQL");
112            List<String> routeIds = new ArrayList<>();
113            for (Map<String, Serializable> result : results) {
114                routeIds.add(result.get(key).toString());
115            }
116            results.close();
117            DocumentRoutingService routingService = Framework.getService(DocumentRoutingService.class);
118            // Batching initialization
119            if (batchSize == null) {
120                batchSize = DEFAULT_BATCH_SIZE;
121            }
122
123            if (!TransactionHelper.isTransactionActive()) {
124                TransactionHelper.startTransaction();
125                transactionStarted = true;
126            }
127            long routesRestartedCount = 0;
128            for (String routeId : routeIds) {
129                try {
130                    DocumentModel docRoute = session.getDocument(new IdRef(routeId));
131                    DocumentRoute route = docRoute.getAdapter(DocumentRoute.class);
132                    List<String> relatedDocIds = route.getAttachedDocuments();
133                    route.cancel(session);
134
135                    log.debug("Canceling workflow  " + route.getDocument().getName());
136
137                    if (reinitLifecycle) {
138                        reinitLifecycle(relatedDocIds, session);
139                    }
140                    routingService.createNewInstance(workflowId, relatedDocIds, session, true);
141                    for (String string : relatedDocIds) {
142                        log.debug("Starting workflow for " + string);
143                    }
144                    // removing old workflow instance
145                    session.removeDocument(route.getDocument().getRef());
146
147                    routesRestartedCount++;
148                    if (routesRestartedCount % batchSize == 0) {
149                        session.close();
150                        TransactionHelper.commitOrRollbackTransaction();
151                        TransactionHelper.startTransaction();
152                        session = CoreInstance.openCoreSession(null);
153                    }
154                } catch (NuxeoException e) {
155                    Throwable t = unwrapException(e);
156                    log.error(t.getClass().getSimpleName() + ": " + t.getMessage());
157                    log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount
158                            + " workflows have been processed.");
159                }
160            }
161        } finally {
162            if (session != null) {
163                session.close();
164            }
165            TransactionHelper.commitOrRollbackTransaction();
166            if (!transactionStarted) {
167                TransactionHelper.startTransaction();
168            }
169            split.stop();
170            log.info(split.toString());
171        }
172    }
173
174    public static Throwable unwrapException(Throwable t) {
175        Throwable cause = null;
176        if (t != null) {
177            cause = t.getCause();
178        }
179        if (cause == null) {
180            return t;
181        } else {
182            return unwrapException(cause);
183        }
184    }
185
186    protected void reinitLifecycle(List<String> docIds, CoreSession session) {
187        for (String docId : docIds) {
188            session.reinitLifeCycleState(new IdRef(docId));
189        }
190    }
191
192}