001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.automation.core.operations.services.bulk; 020 021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 022import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 023import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; 024 025import java.io.Serializable; 026import java.util.Arrays; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.logging.log4j.LogManager; 033import org.apache.logging.log4j.Logger; 034import org.nuxeo.ecm.automation.AutomationService; 035import org.nuxeo.ecm.automation.OperationContext; 036import org.nuxeo.ecm.automation.OperationException; 037import org.nuxeo.ecm.automation.OperationNotFoundException; 038import org.nuxeo.ecm.automation.OperationType; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.DocumentModelList; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 044import org.nuxeo.lib.stream.computation.Topology; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.stream.StreamProcessorTopology; 047 048/** 049 * Bulk Action that runs an automation operation 050 * 051 * @since 10.3 052 */ 053public class AutomationBulkAction implements StreamProcessorTopology { 054 055 private static final Logger log = LogManager.getLogger(AutomationBulkAction.class); 056 057 public static final String ACTION_NAME = "automation"; 058 059 public static final String OPERATION_ID = "operationId"; 060 061 public static final String OPERATION_PARAMETERS = "parameters"; 062 063 @Override 064 public Topology getTopology(Map<String, String> options) { 065 return Topology.builder() 066 .addComputation(AutomationComputation::new, Arrays.asList(INPUT_1 + ":" + ACTION_NAME, // 067 OUTPUT_1 + ":" + STATUS_STREAM)) 068 .build(); 069 } 070 071 public static class AutomationComputation extends AbstractBulkComputation { 072 public static final String DOC_INPUT_TYPE = "document"; 073 074 public static final String DOCS_INPUT_TYPE = "documents"; 075 076 protected AutomationService service; 077 078 protected String operationId; 079 080 protected String inputType; 081 082 protected Map<String, ?> params; 083 084 public AutomationComputation() { 085 super(ACTION_NAME); 086 } 087 088 @Override 089 public void startBucket(String bucketKey) { 090 operationId = null; 091 service = Framework.getService(AutomationService.class); 092 Map<String, Serializable> commandParams = getCurrentCommand().getParams(); 093 checkOperation((String) commandParams.get(OPERATION_ID)); 094 checkParams(commandParams.get(OPERATION_PARAMETERS)); 095 } 096 097 @Override 098 protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) { 099 if (operationId == null) { 100 return; 101 } 102 DocumentModelList documents = loadDocuments(session, ids); 103 if (DOCS_INPUT_TYPE.equals(inputType)) { 104 runOperationOnAllDocuments(session, documents); 105 } else { 106 runOperationOnEachDocument(session, documents); 107 } 108 } 109 110 protected void runOperationOnAllDocuments(CoreSession session, DocumentModelList documents) { 111 try (OperationContext ctx = new OperationContext(session)) { 112 ctx.setInput(documents); 113 service.run(ctx, operationId, params); 114 } catch (OperationException e) { 115 throw new NuxeoException("Operation fails on documents: " + documents, e); 116 } 117 } 118 119 protected void runOperationOnEachDocument(CoreSession session, DocumentModelList documents) { 120 for (DocumentModel doc : documents) { 121 try (OperationContext ctx = new OperationContext(session)) { 122 ctx.setInput(doc); 123 service.run(ctx, operationId, params); 124 } catch (OperationException e) { 125 throw new NuxeoException("Operation fails on doc: " + doc.getId(), e); 126 } 127 } 128 } 129 130 protected void checkOperation(String operationId) { 131 if (StringUtils.isBlank(operationId)) { 132 log.warn("No operationId provided skipping command: " + getCurrentCommand().getId()); 133 return; 134 } 135 try { 136 OperationType op = service.getOperation(operationId); 137 inputType = op.getInputType(); 138 if (inputType == null || DOC_INPUT_TYPE.equals(inputType)) { 139 inputType = DOC_INPUT_TYPE; 140 } else if (DOCS_INPUT_TYPE.equals(inputType)) { 141 inputType = DOCS_INPUT_TYPE; 142 } else { 143 log.warn(String.format("Unsupported operation input type %s for command: %s", inputType, 144 getCurrentCommand().getId())); 145 return; 146 } 147 } catch (OperationNotFoundException e) { 148 log.warn(String.format("Operation '%s' not found, skipping command: %s", operationId, 149 getCurrentCommand().getId())); 150 return; 151 } 152 this.operationId = operationId; 153 } 154 155 protected void checkParams(Serializable serializable) { 156 if (serializable == null) { 157 params = null; 158 } else if (serializable instanceof HashMap) { 159 params = (Map<String, ?>) serializable; 160 } else { 161 log.warn("Unknown operation parameters type: " + serializable.getClass() + " for command: " + command); 162 operationId = null; 163 } 164 } 165 } 166 167}