001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.BULK_LOG_MANAGER_NAME; 022import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.COMMAND_STREAM; 023import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.DONE_STREAM; 024import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.RECORD_CODEC; 025import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 026import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 027import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; 028 029import java.time.Duration; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.TimeUnit; 036import java.util.stream.Collectors; 037 038import org.nuxeo.ecm.core.bulk.computation.BulkScrollerComputation; 039import org.nuxeo.ecm.core.bulk.computation.BulkStatusComputation; 040import org.nuxeo.lib.stream.codec.Codec; 041import org.nuxeo.lib.stream.computation.ComputationPolicy; 042import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder; 043import org.nuxeo.lib.stream.computation.Record; 044import org.nuxeo.lib.stream.computation.Settings; 045import org.nuxeo.lib.stream.computation.StreamProcessor; 046import org.nuxeo.lib.stream.computation.Topology; 047import org.nuxeo.lib.stream.computation.log.LogStreamProcessor; 048import org.nuxeo.runtime.api.Framework; 049import org.nuxeo.runtime.codec.CodecService; 050import org.nuxeo.runtime.model.Descriptor; 051import org.nuxeo.runtime.services.config.ConfigurationService; 052import org.nuxeo.runtime.stream.StreamService; 053 054import net.jodah.failsafe.RetryPolicy; 055 056/** 057 * @since 10.3 058 */ 059public class BulkAdminServiceImpl implements BulkAdminService { 060 061 public static final String SCROLLER_NAME = "scroller"; 062 063 public static final String STATUS_NAME = "status"; 064 065 public static final String BULK_SCROLLER_CONCURRENCY_PROPERTY = "nuxeo.core.bulk.scroller.concurrency"; 066 067 public static final String BULK_STATUS_CONCURRENCY_PROPERTY = "nuxeo.core.bulk.status.concurrency"; 068 069 public static final String BULK_STATUS_CONTINUE_ON_FAILURE_PROPERTY = "nuxeo.core.bulk.status.continueOnFailure"; 070 071 public static final String BULK_STATUS_MAX_RETRIES_PROPERTY = "nuxeo.core.bulk.status.maxRetries"; 072 073 public static final String BULK_STATUS_DELAY_PROPERTY = "nuxeo.core.bulk.status.delayMillis"; 074 075 public static final String BULK_STATUS_MAX_DELAY_PROPERTY = "nuxeo.core.bulk.status.maxDelayMillis"; 076 077 public static final String BULK_SCROLL_SIZE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.size"; 078 079 public static final String BULK_SCROLL_KEEP_ALIVE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.keepAliveSeconds"; 080 081 public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediate"; 082 083 public static final String BULK_SCROLL_CONTINUE_ON_FAILURE_PROPERTY = "nuxeo.core.bulk.scroller.continueOnFailure"; 084 085 public static final String DEFAULT_STATUS_CONCURRENCY = "1"; 086 087 public static final String DEFAULT_STATUS_MAX_RETRIES = "3"; 088 089 public static final String DEFAULT_STATUS_DELAY_MILLIS = "500"; 090 091 public static final String DEFAULT_STATUS_MAX_DELAY_MILLIS = "10000"; 092 093 public static final String DEFAULT_SCROLLER_CONCURRENCY = "1"; 094 095 public static final String DEFAULT_SCROLL_SIZE = "100"; 096 097 public static final String DEFAULT_SCROLL_KEEP_ALIVE = "60"; 098 099 public static final String DEFAULT_SCROLL_PRODUCE_IMMEDIATE = "false"; 100 101 public static final Duration STOP_DURATION = Duration.ofSeconds(1); 102 103 protected final Map<String, BulkActionDescriptor> descriptors; 104 105 protected final List<String> actions; 106 107 protected StreamProcessor streamProcessor; 108 109 protected Map<String, BulkActionValidation> actionValidations; 110 111 public BulkAdminServiceImpl(List<BulkActionDescriptor> descriptorsList) { 112 this.actions = descriptorsList.stream().map(Descriptor::getId).collect(Collectors.toList()); 113 this.descriptors = new HashMap<>(descriptorsList.size()); 114 descriptorsList.forEach(descriptor -> descriptors.put(descriptor.name, descriptor)); 115 actionValidations = descriptorsList.stream().collect(HashMap::new, 116 (map, desc) -> map.put(desc.name, desc.validationClass != null ? desc.newValidationInstance() : null), 117 HashMap::putAll); 118 } 119 120 protected void initProcessor() { 121 StreamService service = Framework.getService(StreamService.class); 122 ConfigurationService confService = Framework.getService(ConfigurationService.class); 123 streamProcessor = new LogStreamProcessor(service.getLogManager(BULK_LOG_MANAGER_NAME)); 124 CodecService codecService = Framework.getService(CodecService.class); 125 Codec<Record> codec = codecService.getCodec(RECORD_CODEC, Record.class); 126 // we don't set any partitioning because it is already defined by logConfig contribution 127 Settings settings = new Settings(1, 1, codec); 128 settings.setConcurrency(SCROLLER_NAME, Integer.parseInt( 129 confService.getProperty(BULK_SCROLLER_CONCURRENCY_PROPERTY, DEFAULT_SCROLLER_CONCURRENCY))); 130 settings.setConcurrency(STATUS_NAME, Integer.parseInt( 131 confService.getProperty(BULK_STATUS_CONCURRENCY_PROPERTY, DEFAULT_STATUS_CONCURRENCY))); 132 // we don't want any retry on scroller this creates duplicates 133 ComputationPolicy scrollerPolicy = new ComputationPolicyBuilder().continueOnFailure( 134 confService.isBooleanPropertyTrue(BULK_SCROLL_CONTINUE_ON_FAILURE_PROPERTY)) 135 .retryPolicy(ComputationPolicy.NO_RETRY) 136 .build(); 137 settings.setPolicy(SCROLLER_NAME, scrollerPolicy); 138 // status policy is configurable 139 RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries( 140 Integer.parseInt(confService.getProperty(BULK_STATUS_MAX_RETRIES_PROPERTY, DEFAULT_STATUS_MAX_RETRIES))) 141 .withBackoff( 142 Integer.parseInt( 143 confService.getProperty(BULK_STATUS_DELAY_PROPERTY, 144 DEFAULT_STATUS_DELAY_MILLIS)), 145 Integer.parseInt(confService.getProperty( 146 BULK_STATUS_MAX_DELAY_PROPERTY, 147 DEFAULT_STATUS_MAX_DELAY_MILLIS)), 148 TimeUnit.MILLISECONDS); 149 ComputationPolicy statusPolicy = new ComputationPolicyBuilder().continueOnFailure( 150 confService.isBooleanPropertyTrue(BULK_STATUS_CONTINUE_ON_FAILURE_PROPERTY)) 151 .retryPolicy(retryPolicy) 152 .build(); 153 settings.setPolicy(SCROLLER_NAME, statusPolicy); 154 int scrollSize = Integer.parseInt(confService.getProperty(BULK_SCROLL_SIZE_PROPERTY, DEFAULT_SCROLL_SIZE)); 155 int scrollKeepAlive = Integer.parseInt( 156 confService.getProperty(BULK_SCROLL_KEEP_ALIVE_PROPERTY, DEFAULT_SCROLL_KEEP_ALIVE)); 157 boolean scrollProduceImmediate = Boolean.parseBoolean( 158 confService.getProperty(BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY, DEFAULT_SCROLL_PRODUCE_IMMEDIATE)); 159 160 streamProcessor.init(getTopology(scrollSize, scrollKeepAlive, scrollProduceImmediate), settings); 161 } 162 163 protected Topology getTopology(int scrollBatchSize, int scrollKeepAlive, boolean scrollProduceImmediate) { 164 List<String> mapping = new ArrayList<>(); 165 mapping.add(INPUT_1 + ":" + COMMAND_STREAM); 166 int i = 1; 167 for (String action : actions) { 168 mapping.add(String.format("o%s:%s", i, action)); 169 i++; 170 } 171 mapping.add(String.format("o%s:%s", i, STATUS_STREAM)); 172 return Topology.builder() 173 .addComputation( // 174 () -> new BulkScrollerComputation(SCROLLER_NAME, actions.size() + 1, scrollBatchSize, 175 scrollKeepAlive, scrollProduceImmediate), // 176 mapping) 177 .addComputation(() -> new BulkStatusComputation(STATUS_NAME), 178 Arrays.asList(INPUT_1 + ":" + STATUS_STREAM, // 179 OUTPUT_1 + ":" + DONE_STREAM)) 180 .build(); 181 } 182 183 @Override 184 public List<String> getActions() { 185 return actions; 186 } 187 188 @Override 189 public int getBucketSize(String action) { 190 return descriptors.get(action).getBucketSize(); 191 } 192 193 @Override 194 public int getBatchSize(String action) { 195 return descriptors.get(action).getBatchSize(); 196 } 197 198 @Override 199 public boolean isHttpEnabled(String actionId) { 200 return descriptors.get(actionId).httpEnabled; 201 } 202 203 @Override 204 public boolean isSequentialCommands(String actionId) { 205 return descriptors.get(actionId).sequentialCommands; 206 } 207 208 @Override 209 public BulkActionValidation getActionValidation(String action) { 210 return actionValidations.get(action); 211 } 212 213 public void afterStart() { 214 initProcessor(); 215 streamProcessor.start(); 216 } 217 218 public void beforeStop() { 219 if (streamProcessor != null) { 220 streamProcessor.stop(STOP_DURATION); 221 } 222 } 223 224}