001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.automation.core.operations.services.bulk; 020 021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 022import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 023import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; 024 025import java.io.Serializable; 026import java.util.Arrays; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030 031import org.apache.commons.lang3.StringUtils; 032import org.apache.logging.log4j.LogManager; 033import org.apache.logging.log4j.Logger; 034import org.nuxeo.ecm.automation.AutomationService; 035import org.nuxeo.ecm.automation.OperationContext; 036import org.nuxeo.ecm.automation.OperationException; 037import org.nuxeo.ecm.automation.OperationNotFoundException; 038import org.nuxeo.ecm.automation.OperationType; 039import org.nuxeo.ecm.core.api.CoreSession; 040import org.nuxeo.ecm.core.api.DocumentModel; 041import org.nuxeo.ecm.core.api.DocumentModelList; 042import org.nuxeo.ecm.core.api.NuxeoException; 043import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; 044import org.nuxeo.lib.stream.computation.Topology; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.stream.StreamProcessorTopology; 047 048/** 049 * Bulk Action that runs an automation operation 050 * 051 * @since 10.3 052 */ 053public class AutomationBulkAction implements StreamProcessorTopology { 054 055 private static final Logger log = LogManager.getLogger(AutomationBulkAction.class); 056 057 public static final String ACTION_NAME = "automation"; 058 059 // @since 11.1 060 public static final String ACTION_FULL_NAME = "bulk/" + ACTION_NAME; 061 062 public static final String OPERATION_ID = "operationId"; 063 064 public static final String OPERATION_PARAMETERS = "parameters"; 065 066 @Override 067 public Topology getTopology(Map<String, String> options) { 068 return Topology.builder() 069 .addComputation(AutomationComputation::new, Arrays.asList(INPUT_1 + ":" + ACTION_FULL_NAME, // 070 OUTPUT_1 + ":" + STATUS_STREAM)) 071 .build(); 072 } 073 074 public static class AutomationComputation extends AbstractBulkComputation { 075 public static final String DOC_INPUT_TYPE = "document"; 076 077 public static final String DOCS_INPUT_TYPE = "documents"; 078 079 protected AutomationService service; 080 081 protected String operationId; 082 083 protected String inputType; 084 085 protected Map<String, ?> params; 086 087 public AutomationComputation() { 088 super(ACTION_FULL_NAME); 089 } 090 091 @Override 092 public void startBucket(String bucketKey) { 093 operationId = null; 094 service = Framework.getService(AutomationService.class); 095 Map<String, Serializable> commandParams = getCurrentCommand().getParams(); 096 checkOperation((String) commandParams.get(OPERATION_ID)); 097 checkParams(commandParams.get(OPERATION_PARAMETERS)); 098 } 099 100 @Override 101 protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) { 102 if (operationId == null) { 103 return; 104 } 105 DocumentModelList documents = loadDocuments(session, ids); 106 if (DOCS_INPUT_TYPE.equals(inputType)) { 107 runOperationOnAllDocuments(session, documents); 108 } else { 109 runOperationOnEachDocument(session, documents); 110 } 111 } 112 113 protected void runOperationOnAllDocuments(CoreSession session, DocumentModelList documents) { 114 try (OperationContext ctx = new OperationContext(session)) { 115 ctx.setInput(documents); 116 service.run(ctx, operationId, params); 117 } catch (OperationException e) { 118 throw new NuxeoException("Operation fails on documents: " + documents, e); 119 } 120 } 121 122 protected void runOperationOnEachDocument(CoreSession session, DocumentModelList documents) { 123 for (DocumentModel doc : documents) { 124 try (OperationContext ctx = new OperationContext(session)) { 125 ctx.setInput(doc); 126 service.run(ctx, operationId, params); 127 } catch (OperationException e) { 128 throw new NuxeoException("Operation fails on doc: " + doc.getId(), e); 129 } 130 } 131 } 132 133 protected void checkOperation(String operationId) { 134 if (StringUtils.isBlank(operationId)) { 135 log.warn("No operationId provided skipping command: " + getCurrentCommand().getId()); 136 return; 137 } 138 try { 139 OperationType op = service.getOperation(operationId); 140 inputType = op.getInputType(); 141 if (inputType == null || DOC_INPUT_TYPE.equals(inputType)) { 142 inputType = DOC_INPUT_TYPE; 143 } else if (DOCS_INPUT_TYPE.equals(inputType)) { 144 inputType = DOCS_INPUT_TYPE; 145 } else { 146 log.warn(String.format("Unsupported operation input type %s for command: %s", inputType, 147 getCurrentCommand().getId())); 148 return; 149 } 150 } catch (OperationNotFoundException e) { 151 log.warn(String.format("Operation '%s' not found, skipping command: %s", operationId, 152 getCurrentCommand().getId())); 153 return; 154 } 155 this.operationId = operationId; 156 } 157 158 protected void checkParams(Serializable serializable) { 159 if (serializable == null) { 160 params = null; 161 } else if (serializable instanceof HashMap) { 162 params = (Map<String, ?>) serializable; 163 } else { 164 log.warn("Unknown operation parameters type: " + serializable.getClass() + " for command: " + command); 165 operationId = null; 166 } 167 } 168 } 169 170}