001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.core.bulk;
020
021import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.BULK_LOG_MANAGER_NAME;
022import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.COMMAND_STREAM;
023import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.DONE_STREAM;
024import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.RECORD_CODEC;
025import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
026import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
027import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1;
028
029import java.time.Duration;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.concurrent.TimeUnit;
036import java.util.stream.Collectors;
037
038import org.nuxeo.ecm.core.bulk.computation.BulkScrollerComputation;
039import org.nuxeo.ecm.core.bulk.computation.BulkStatusComputation;
040import org.nuxeo.lib.stream.codec.Codec;
041import org.nuxeo.lib.stream.computation.ComputationPolicy;
042import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
043import org.nuxeo.lib.stream.computation.Record;
044import org.nuxeo.lib.stream.computation.Settings;
045import org.nuxeo.lib.stream.computation.StreamProcessor;
046import org.nuxeo.lib.stream.computation.Topology;
047import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
048import org.nuxeo.runtime.api.Framework;
049import org.nuxeo.runtime.codec.CodecService;
050import org.nuxeo.runtime.model.Descriptor;
051import org.nuxeo.runtime.services.config.ConfigurationService;
052import org.nuxeo.runtime.stream.StreamService;
053
054import net.jodah.failsafe.RetryPolicy;
055
056/**
057 * @since 10.3
058 */
059public class BulkAdminServiceImpl implements BulkAdminService {
060
061    public static final String SCROLLER_NAME = "scroller";
062
063    public static final String STATUS_NAME = "status";
064
065    public static final String BULK_SCROLLER_CONCURRENCY_PROPERTY = "nuxeo.core.bulk.scroller.concurrency";
066
067    public static final String BULK_STATUS_CONCURRENCY_PROPERTY = "nuxeo.core.bulk.status.concurrency";
068
069    public static final String BULK_STATUS_CONTINUE_ON_FAILURE_PROPERTY = "nuxeo.core.bulk.status.continueOnFailure";
070
071    public static final String BULK_STATUS_MAX_RETRIES_PROPERTY = "nuxeo.core.bulk.status.maxRetries";
072
073    public static final String BULK_STATUS_DELAY_PROPERTY = "nuxeo.core.bulk.status.delayMillis";
074
075    public static final String BULK_STATUS_MAX_DELAY_PROPERTY = "nuxeo.core.bulk.status.maxDelayMillis";
076
077    public static final String BULK_SCROLL_SIZE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.size";
078
079    public static final String BULK_SCROLL_KEEP_ALIVE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.keepAliveSeconds";
080
081    public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediate";
082
083    public static final String BULK_SCROLL_CONTINUE_ON_FAILURE_PROPERTY = "nuxeo.core.bulk.scroller.continueOnFailure";
084
085    public static final String DEFAULT_STATUS_CONCURRENCY = "1";
086
087    public static final String DEFAULT_STATUS_MAX_RETRIES = "3";
088
089    public static final String DEFAULT_STATUS_DELAY_MILLIS = "500";
090
091    public static final String DEFAULT_STATUS_MAX_DELAY_MILLIS = "10000";
092
093    public static final String DEFAULT_SCROLLER_CONCURRENCY = "1";
094
095    public static final String DEFAULT_SCROLL_SIZE = "100";
096
097    public static final String DEFAULT_SCROLL_KEEP_ALIVE = "60";
098
099    public static final String DEFAULT_SCROLL_PRODUCE_IMMEDIATE = "false";
100
101    public static final Duration STOP_DURATION = Duration.ofSeconds(1);
102
103    protected final Map<String, BulkActionDescriptor> descriptors;
104
105    protected final List<String> actions;
106
107    protected StreamProcessor streamProcessor;
108
109    protected Map<String, BulkActionValidation> actionValidations;
110
111    public BulkAdminServiceImpl(List<BulkActionDescriptor> descriptorsList) {
112        this.actions = descriptorsList.stream().map(Descriptor::getId).collect(Collectors.toList());
113        this.descriptors = new HashMap<>(descriptorsList.size());
114        descriptorsList.forEach(descriptor -> descriptors.put(descriptor.name, descriptor));
115        actionValidations = descriptorsList.stream().collect(HashMap::new,
116                (map, desc) -> map.put(desc.name, desc.validationClass != null ? desc.newValidationInstance() : null),
117                 HashMap::putAll);
118    }
119
120    protected void initProcessor() {
121        StreamService service = Framework.getService(StreamService.class);
122        ConfigurationService confService = Framework.getService(ConfigurationService.class);
123        streamProcessor = new LogStreamProcessor(service.getLogManager(BULK_LOG_MANAGER_NAME));
124        CodecService codecService = Framework.getService(CodecService.class);
125        Codec<Record> codec = codecService.getCodec(RECORD_CODEC, Record.class);
126        // we don't set any partitioning because it is already defined by logConfig contribution
127        Settings settings = new Settings(1, 1, codec);
128        settings.setConcurrency(SCROLLER_NAME, Integer.parseInt(
129                confService.getProperty(BULK_SCROLLER_CONCURRENCY_PROPERTY, DEFAULT_SCROLLER_CONCURRENCY)));
130        settings.setConcurrency(STATUS_NAME, Integer.parseInt(
131                confService.getProperty(BULK_STATUS_CONCURRENCY_PROPERTY, DEFAULT_STATUS_CONCURRENCY)));
132        // we don't want any retry on scroller this creates duplicates
133        ComputationPolicy scrollerPolicy = new ComputationPolicyBuilder().continueOnFailure(
134                confService.isBooleanPropertyTrue(BULK_SCROLL_CONTINUE_ON_FAILURE_PROPERTY))
135                                                                         .retryPolicy(ComputationPolicy.NO_RETRY)
136                                                                         .build();
137        settings.setPolicy(SCROLLER_NAME, scrollerPolicy);
138        // status policy is configurable
139        RetryPolicy retryPolicy = new RetryPolicy().withMaxRetries(
140                Integer.parseInt(confService.getProperty(BULK_STATUS_MAX_RETRIES_PROPERTY, DEFAULT_STATUS_MAX_RETRIES)))
141                                                   .withBackoff(
142                                                           Integer.parseInt(
143                                                                   confService.getProperty(BULK_STATUS_DELAY_PROPERTY,
144                                                                           DEFAULT_STATUS_DELAY_MILLIS)),
145                                                           Integer.parseInt(confService.getProperty(
146                                                                   BULK_STATUS_MAX_DELAY_PROPERTY,
147                                                                   DEFAULT_STATUS_MAX_DELAY_MILLIS)),
148                                                           TimeUnit.MILLISECONDS);
149        ComputationPolicy statusPolicy = new ComputationPolicyBuilder().continueOnFailure(
150                confService.isBooleanPropertyTrue(BULK_STATUS_CONTINUE_ON_FAILURE_PROPERTY))
151                                                                       .retryPolicy(retryPolicy)
152                                                                       .build();
153        settings.setPolicy(SCROLLER_NAME, statusPolicy);
154        int scrollSize = Integer.parseInt(confService.getProperty(BULK_SCROLL_SIZE_PROPERTY, DEFAULT_SCROLL_SIZE));
155        int scrollKeepAlive = Integer.parseInt(
156                confService.getProperty(BULK_SCROLL_KEEP_ALIVE_PROPERTY, DEFAULT_SCROLL_KEEP_ALIVE));
157        boolean scrollProduceImmediate = Boolean.parseBoolean(
158                confService.getProperty(BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY, DEFAULT_SCROLL_PRODUCE_IMMEDIATE));
159
160        streamProcessor.init(getTopology(scrollSize, scrollKeepAlive, scrollProduceImmediate), settings);
161    }
162
163    protected Topology getTopology(int scrollBatchSize, int scrollKeepAlive, boolean scrollProduceImmediate) {
164        List<String> mapping = new ArrayList<>();
165        mapping.add(INPUT_1 + ":" + COMMAND_STREAM);
166        int i = 1;
167        for (String action : actions) {
168            mapping.add(String.format("o%s:%s", i, action));
169            i++;
170        }
171        mapping.add(String.format("o%s:%s", i, STATUS_STREAM));
172        return Topology.builder()
173                       .addComputation( //
174                               () -> new BulkScrollerComputation(SCROLLER_NAME, actions.size() + 1, scrollBatchSize,
175                                       scrollKeepAlive, scrollProduceImmediate), //
176                               mapping)
177                       .addComputation(() -> new BulkStatusComputation(STATUS_NAME),
178                               Arrays.asList(INPUT_1 + ":" + STATUS_STREAM, //
179                                       OUTPUT_1 + ":" + DONE_STREAM))
180                       .build();
181    }
182
183    @Override
184    public List<String> getActions() {
185        return actions;
186    }
187
188    @Override
189    public int getBucketSize(String action) {
190        return descriptors.get(action).getBucketSize();
191    }
192
193    @Override
194    public int getBatchSize(String action) {
195        return descriptors.get(action).getBatchSize();
196    }
197
198    @Override
199    public boolean isHttpEnabled(String actionId) {
200        return descriptors.get(actionId).httpEnabled;
201    }
202
203    @Override
204    public boolean isSequentialCommands(String actionId) {
205        return descriptors.get(actionId).sequentialCommands;
206    }
207
208    @Override
209    public BulkActionValidation getActionValidation(String action) {
210        return actionValidations.get(action);
211    }
212
213    public void afterStart() {
214        initProcessor();
215        streamProcessor.start();
216    }
217
218    public void beforeStop() {
219        if (streamProcessor != null) {
220            streamProcessor.stop(STOP_DURATION);
221        }
222    }
223
224}