001/*
002 * (C) Copyright 2012-2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Vladimir Pasquier <vpasquier@nuxeo.com>
018 */
019
020package org.nuxeo.ecm.platform.routing.api.operation;
021
022import java.io.Serializable;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026
027import org.apache.commons.lang3.StringUtils;
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.ecm.automation.OperationContext;
031import org.nuxeo.ecm.automation.core.Constants;
032import org.nuxeo.ecm.automation.core.annotations.Context;
033import org.nuxeo.ecm.automation.core.annotations.Operation;
034import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
035import org.nuxeo.ecm.automation.core.annotations.Param;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.IterableQueryResult;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
043import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047/**
048 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id
049 * <code>workflowId</code>. If the <code> nodeId</code> parameter is specified, then only the workflows suspended on
050 * that node are restarted.
051 *
052 * @since 5.7
053 */
054@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" })
055public class BulkRestartWorkflow {
056
057    public static final String ID = "WorkflowModel.BulkRestartInstances";
058
059    private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class);
060
061    @Param(name = "workflowId")
062    protected String workflowId;
063
064    @Param(name = "nodeId", required = false)
065    protected String nodeId;
066
067    @Param(name = "reinitLifecycle", required = false)
068    protected boolean reinitLifecycle;
069
070    @Param(name = "batchSize", required = false)
071    protected Integer batchSize;
072
073    @Context
074    protected OperationContext ctx;
075
076    public static final int DEFAULT_BATCH_SIZE = 1000;
077
078    @OperationMethod
079    public void run() {
080        boolean transactionStarted = false;
081        try {
082            CoreSession session = CoreInstance.getCoreSession(null);
083
084            // Fetching all routes
085            // If the nodeId parameter is null, fetch all the workflow routes
086            // with
087            // the given workflowId
088            String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR  ecm:name like '%s') and ecm:currentLifeCycleState = 'running'";
089            String key = "ecm:uuid";
090            if (StringUtils.isEmpty(nodeId)) {
091                if (StringUtils.isEmpty(workflowId)) {
092                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
093                    return;
094                }
095                query = String.format(query, key, workflowId, workflowId);
096            } else {
097                query = "Select %s from RouteNode  where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'";
098                key = "ecm:parentId";
099                if (StringUtils.isEmpty(nodeId)) {
100                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
101                    return;
102                }
103                query = String.format(query, key, nodeId);
104            }
105
106            IterableQueryResult results = session.queryAndFetch(query, "NXQL");
107            List<String> routeIds = new ArrayList<>();
108            for (Map<String, Serializable> result : results) {
109                routeIds.add(result.get(key).toString());
110            }
111            results.close();
112            DocumentRoutingService routingService = Framework.getService(DocumentRoutingService.class);
113            // Batching initialization
114            if (batchSize == null) {
115                batchSize = DEFAULT_BATCH_SIZE;
116            }
117
118            if (!TransactionHelper.isTransactionActive()) {
119                TransactionHelper.startTransaction();
120                transactionStarted = true;
121            }
122            long routesRestartedCount = 0;
123            for (String routeId : routeIds) {
124                try {
125                    DocumentModel docRoute = session.getDocument(new IdRef(routeId));
126                    DocumentRoute route = docRoute.getAdapter(DocumentRoute.class);
127                    List<String> relatedDocIds = route.getAttachedDocuments();
128                    route.cancel(session);
129
130                    log.debug("Canceling workflow  " + route.getDocument().getName());
131
132                    if (reinitLifecycle) {
133                        reinitLifecycle(relatedDocIds, session);
134                    }
135                    routingService.createNewInstance(workflowId, relatedDocIds, session, true);
136                    for (String string : relatedDocIds) {
137                        log.debug("Starting workflow for " + string);
138                    }
139                    // removing old workflow instance
140                    session.removeDocument(route.getDocument().getRef());
141
142                    routesRestartedCount++;
143                    if (routesRestartedCount % batchSize == 0) {
144                        TransactionHelper.commitOrRollbackTransaction();
145                        TransactionHelper.startTransaction();
146                        session = CoreInstance.getCoreSession(null);
147                    }
148                } catch (NuxeoException e) {
149                    Throwable t = unwrapException(e);
150                    log.error(t.getClass().getSimpleName() + ": " + t.getMessage());
151                    log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount
152                            + " workflows have been processed.");
153                }
154            }
155        } finally {
156            TransactionHelper.commitOrRollbackTransaction();
157            if (!transactionStarted) {
158                TransactionHelper.startTransaction();
159            }
160        }
161    }
162
163    public static Throwable unwrapException(Throwable t) {
164        Throwable cause = null;
165        if (t != null) {
166            cause = t.getCause();
167        }
168        if (cause == null) {
169            return t;
170        } else {
171            return unwrapException(cause);
172        }
173    }
174
175    protected void reinitLifecycle(List<String> docIds, CoreSession session) {
176        for (String docId : docIds) {
177            session.reinitLifeCycleState(new IdRef(docId));
178        }
179    }
180
181}