001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.automation.core.operations.services.bulk;
020
021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
022import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
023import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1;
024
025import java.io.Serializable;
026import java.util.Arrays;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.logging.log4j.LogManager;
033import org.apache.logging.log4j.Logger;
034import org.nuxeo.ecm.automation.AutomationService;
035import org.nuxeo.ecm.automation.OperationContext;
036import org.nuxeo.ecm.automation.OperationException;
037import org.nuxeo.ecm.automation.OperationNotFoundException;
038import org.nuxeo.ecm.automation.OperationType;
039import org.nuxeo.ecm.core.api.CoreSession;
040import org.nuxeo.ecm.core.api.DocumentModel;
041import org.nuxeo.ecm.core.api.DocumentModelList;
042import org.nuxeo.ecm.core.api.NuxeoException;
043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
044import org.nuxeo.lib.stream.computation.Topology;
045import org.nuxeo.runtime.api.Framework;
046import org.nuxeo.runtime.stream.StreamProcessorTopology;
047
048/**
049 * Bulk Action that runs an automation operation
050 *
051 * @since 10.3
052 */
053public class AutomationBulkAction implements StreamProcessorTopology {
054
055    private static final Logger log = LogManager.getLogger(AutomationBulkAction.class);
056
057    public static final String ACTION_NAME = "automation";
058
059    // @since 11.1
060    public static final String ACTION_FULL_NAME = "bulk/" + ACTION_NAME;
061
062    public static final String OPERATION_ID = "operationId";
063
064    public static final String OPERATION_PARAMETERS = "parameters";
065
066    @Override
067    public Topology getTopology(Map<String, String> options) {
068        return Topology.builder()
069                       .addComputation(AutomationComputation::new, Arrays.asList(INPUT_1 + ":" + ACTION_FULL_NAME, //
070                               OUTPUT_1 + ":" + STATUS_STREAM))
071                       .build();
072    }
073
074    public static class AutomationComputation extends AbstractBulkComputation {
075        public static final String DOC_INPUT_TYPE = "document";
076
077        public static final String DOCS_INPUT_TYPE = "documents";
078
079        protected AutomationService service;
080
081        protected String operationId;
082
083        protected String inputType;
084
085        protected Map<String, ?> params;
086
087        public AutomationComputation() {
088            super(ACTION_FULL_NAME);
089        }
090
091        @Override
092        public void startBucket(String bucketKey) {
093            operationId = null;
094            service = Framework.getService(AutomationService.class);
095            Map<String, Serializable> commandParams = getCurrentCommand().getParams();
096            checkOperation((String) commandParams.get(OPERATION_ID));
097            checkParams(commandParams.get(OPERATION_PARAMETERS));
098        }
099
100        @Override
101        protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
102            if (operationId == null) {
103                return;
104            }
105            DocumentModelList documents = loadDocuments(session, ids);
106            if (DOCS_INPUT_TYPE.equals(inputType)) {
107                runOperationOnAllDocuments(session, documents);
108            } else {
109                runOperationOnEachDocument(session, documents);
110            }
111        }
112
113        protected void runOperationOnAllDocuments(CoreSession session, DocumentModelList documents) {
114            try (OperationContext ctx = new OperationContext(session)) {
115                ctx.setInput(documents);
116                service.run(ctx, operationId, params);
117            } catch (OperationException e) {
118                throw new NuxeoException("Operation fails on documents: " + documents, e);
119            }
120        }
121
122        protected void runOperationOnEachDocument(CoreSession session, DocumentModelList documents) {
123            for (DocumentModel doc : documents) {
124                try (OperationContext ctx = new OperationContext(session)) {
125                    ctx.setInput(doc);
126                    service.run(ctx, operationId, params);
127                } catch (OperationException e) {
128                    throw new NuxeoException("Operation fails on doc: " + doc.getId(), e);
129                }
130            }
131        }
132
133        protected void checkOperation(String operationId) {
134            if (StringUtils.isBlank(operationId)) {
135                log.warn("No operationId provided skipping command: " + getCurrentCommand().getId());
136                return;
137            }
138            try {
139                OperationType op = service.getOperation(operationId);
140                inputType = op.getInputType();
141                if (inputType == null || DOC_INPUT_TYPE.equals(inputType)) {
142                    inputType = DOC_INPUT_TYPE;
143                } else if (DOCS_INPUT_TYPE.equals(inputType)) {
144                    inputType = DOCS_INPUT_TYPE;
145                } else {
146                    log.warn(String.format("Unsupported operation input type %s for command: %s", inputType,
147                            getCurrentCommand().getId()));
148                    return;
149                }
150            } catch (OperationNotFoundException e) {
151                log.warn(String.format("Operation '%s' not found, skipping command: %s", operationId,
152                        getCurrentCommand().getId()));
153                return;
154            }
155            this.operationId = operationId;
156        }
157
158        protected void checkParams(Serializable serializable) {
159            if (serializable == null) {
160                params = null;
161            } else if (serializable instanceof HashMap) {
162                params = (Map<String, ?>) serializable;
163            } else {
164                log.warn("Unknown operation parameters type: " + serializable.getClass() + " for command: " + command);
165                operationId = null;
166            }
167        }
168    }
169
170}