001/*
002 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
003 *
004 * All rights reserved. This program and the accompanying materials
005 * are made available under the terms of the GNU Lesser General Public License
006 * (LGPL) version 2.1 which accompanies this distribution, and is available at
007 * http://www.gnu.org/licenses/lgpl.html
008 *
009 * This library is distributed in the hope that it will be useful,
010 * but WITHOUT ANY WARRANTY; without even the implied warranty of
011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012 * Lesser General Public License for more details.
013 *
014 * Contributors:
015 *     Vladimir Pasquier <vpasquier@nuxeo.com>
016 */
017
018package org.nuxeo.ecm.platform.routing.api.operation;
019
020import java.io.Serializable;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.commons.lang.StringUtils;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.javasimon.SimonManager;
029import org.javasimon.Split;
030import org.nuxeo.ecm.automation.OperationContext;
031import org.nuxeo.ecm.automation.core.Constants;
032import org.nuxeo.ecm.automation.core.annotations.Context;
033import org.nuxeo.ecm.automation.core.annotations.Operation;
034import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
035import org.nuxeo.ecm.automation.core.annotations.Param;
036import org.nuxeo.ecm.core.api.CoreInstance;
037import org.nuxeo.ecm.core.api.CoreSession;
038import org.nuxeo.ecm.core.api.DocumentModel;
039import org.nuxeo.ecm.core.api.IdRef;
040import org.nuxeo.ecm.core.api.IterableQueryResult;
041import org.nuxeo.ecm.core.api.NuxeoException;
042import org.nuxeo.ecm.platform.routing.api.DocumentRoute;
043import org.nuxeo.ecm.platform.routing.api.DocumentRoutingService;
044import org.nuxeo.runtime.api.Framework;
045import org.nuxeo.runtime.transaction.TransactionHelper;
046
047/**
048 * Bulk operation to cancel and restart all the workflow instances of the workflow model with the id
049 * <param>workflowId</param>. If the <param> nodeId</param> parameter is specified, then only the workflows suspened on
050 * that node are restarted.
051 *
052 * @since 5.7
053 */
054@Operation(id = BulkRestartWorkflow.ID, category = Constants.CAT_WORKFLOW, label = "Bulk Restart Workflow", description = "Bulk operation to restart workflows.", aliases = { "BulkRestartWorkflow" })
055public class BulkRestartWorkflow {
056
057    public static final String ID = "WorkflowModel.BulkRestartInstances";
058
059    private static final Log log = LogFactory.getLog(BulkRestartWorkflow.class);
060
061    @Param(name = "workflowId", required = true)
062    protected String workflowId;
063
064    @Param(name = "nodeId", required = false)
065    protected String nodeId;
066
067    @Param(name = "reinitLifecycle", required = false)
068    protected boolean reinitLifecycle;
069
070    @Param(name = "batchSize", required = false)
071    protected Integer batchSize;
072
073    @Context
074    protected OperationContext ctx;
075
076    public static final int DEFAULT_BATCH_SIZE = 1000;
077
078    @OperationMethod
079    public void run() {
080        CoreSession session = null;
081        boolean transactionStarted = false;
082        Split split = SimonManager.getStopwatch(ID).start();
083        try {
084            session = CoreInstance.openCoreSession(null);
085
086            // Fetching all routes
087            // If the nodeId parameter is null, fetch all the workflow routes
088            // with
089            // the given workflowId
090            String query = "Select %s from DocumentRoute where (ecm:name like '%s.%%' OR  ecm:name like '%s') and ecm:currentLifeCycleState = 'running'";
091            String key = "ecm:uuid";
092            if (StringUtils.isEmpty(nodeId)) {
093                if (StringUtils.isEmpty(workflowId)) {
094                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
095                    return;
096                }
097                query = String.format(query, key, workflowId, workflowId);
098            } else {
099                query = "Select %s from RouteNode  where rnode:nodeId = '%s' and ecm:currentLifeCycleState = 'suspended'";
100                key = "ecm:parentId";
101                if (StringUtils.isEmpty(nodeId)) {
102                    log.error("Need to specify either the workflowModelId either the nodeId to query the workflows");
103                    return;
104                }
105                query = String.format(query, key, nodeId);
106            }
107
108            IterableQueryResult results = session.queryAndFetch(query, "NXQL");
109            List<String> routeIds = new ArrayList<String>();
110            for (Map<String, Serializable> result : results) {
111                routeIds.add(result.get(key).toString());
112            }
113            results.close();
114            DocumentRoutingService routingService = Framework.getLocalService(DocumentRoutingService.class);
115            // Batching initialization
116            if (batchSize == null) {
117                batchSize = DEFAULT_BATCH_SIZE;
118            }
119
120            if (!TransactionHelper.isTransactionActive()) {
121                TransactionHelper.startTransaction();
122                transactionStarted = true;
123            }
124            long routesRestartedCount = 0;
125            for (String routeId : routeIds) {
126                try {
127                    DocumentModel docRoute = session.getDocument(new IdRef(routeId));
128                    DocumentRoute route = docRoute.getAdapter(DocumentRoute.class);
129                    List<String> relatedDocIds = route.getAttachedDocuments();
130                    route.cancel(session);
131
132                    log.debug("Canceling workflow  " + route.getDocument().getName());
133
134                    if (reinitLifecycle) {
135                        reinitLifecycle(relatedDocIds, session);
136                    }
137                    routingService.createNewInstance(workflowId, relatedDocIds, session, true);
138                    for (String string : relatedDocIds) {
139                        log.debug("Starting workflow for " + string);
140                    }
141                    // removing old workflow instance
142                    session.removeDocument(route.getDocument().getRef());
143
144                    routesRestartedCount++;
145                    if (routesRestartedCount % batchSize == 0) {
146                        session.close();
147                        TransactionHelper.commitOrRollbackTransaction();
148                        TransactionHelper.startTransaction();
149                        session = CoreInstance.openCoreSession(null);
150                    }
151                } catch (NuxeoException e) {
152                    Throwable t = unwrapException(e);
153                    log.error(t.getClass().getSimpleName() + ": " + t.getMessage());
154                    log.error("Workflow with the docId '" + routeId + "' cannot be canceled. " + routesRestartedCount
155                            + " workflows have been processed.");
156                }
157            }
158        } finally {
159            if (session != null) {
160                session.close();
161            }
162            TransactionHelper.commitOrRollbackTransaction();
163            if (!transactionStarted) {
164                TransactionHelper.startTransaction();
165            }
166            split.stop();
167            log.info(split.toString());
168        }
169    }
170
171    public static Throwable unwrapException(Throwable t) {
172        Throwable cause = null;
173        if (t != null) {
174            cause = t.getCause();
175        }
176        if (cause == null) {
177            return t;
178        } else {
179            return unwrapException(cause);
180        }
181    }
182
183    protected void reinitLifecycle(List<String> docIds, CoreSession session) {
184        for (String docId : docIds) {
185            session.reinitLifeCycleState(new IdRef(docId));
186        }
187    }
188
189}