001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.core.bulk;
020
021import java.time.Duration;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.stream.Collectors;
026
027import org.nuxeo.lib.stream.computation.StreamManager;
028import org.nuxeo.lib.stream.computation.StreamProcessor;
029import org.nuxeo.runtime.api.Framework;
030import org.nuxeo.runtime.model.Descriptor;
031import org.nuxeo.runtime.stream.StreamService;
032
033/**
034 * @since 10.3
035 */
036public class BulkAdminServiceImpl implements BulkAdminService {
037
038    public static final String SCROLLER_NAME = "bulk/scroller";
039
040    public static final String STATUS_NAME = "bulk/status";
041
042    public static final String BULK_SERVICE_PROCESSOR_NAME = "bulkServiceProcessor";
043
044    public static final String BULK_SCROLL_SIZE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.size";
045
046    public static final String BULK_SCROLL_KEEP_ALIVE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.keepAliveSeconds";
047
048    public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediate";
049
050    // @since 11.4
051    public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediateThreshold";
052
053    // by default switch to produce immediate when there are more than 1m ids
054    // @since 11.4
055    public static final int DEFAULT_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY = 1_000_000;
056
057    public static final int DEFAULT_SCROLL_SIZE = 100;
058
059    public static final int DEFAULT_SCROLL_KEEP_ALIVE = 60;
060
061    // @since 11.2
062    public static final String BULK_SCROLL_TRANSACTION_TIMEOUT_PROPERTY = "nuxeo.core.bulk.scroller.transactionTimeout";
063
064    // @since 11.2
065    public static final Duration DEFAULT_SCROLL_TRANSACTION_TIMEOUT = Duration.ofDays(2);
066
067    public static final Duration STOP_DURATION = Duration.ofSeconds(1);
068
069    protected final Map<String, BulkActionDescriptor> descriptors;
070
071    protected final List<String> actions;
072
073    protected StreamProcessor streamProcessor;
074
075    protected Map<String, BulkActionValidation> actionValidations;
076
077    public BulkAdminServiceImpl(List<BulkActionDescriptor> descriptorsList) {
078        this.actions = descriptorsList.stream().map(Descriptor::getId).collect(Collectors.toList());
079        this.descriptors = new HashMap<>(descriptorsList.size());
080        descriptorsList.forEach(descriptor -> descriptors.put(descriptor.name, descriptor));
081        actionValidations = descriptorsList.stream()
082                                           .collect(HashMap::new, (map, desc) -> map.put(desc.name,
083                                                   desc.validationClass != null ? desc.newValidationInstance() : null),
084                                                   HashMap::putAll);
085    }
086
087    @Override
088    public List<String> getActions() {
089        return actions;
090    }
091
092    @Override
093    public int getBucketSize(String action) {
094        return descriptors.get(action).getBucketSize();
095    }
096
097    @Override
098    public int getBatchSize(String action) {
099        return descriptors.get(action).getBatchSize();
100    }
101
102    @Override
103    public Long getQueryLimit(String action) {
104        return descriptors.get(action).getDefaultQueryLimit();
105    }
106
107    @Override
108    public String getDefaultScroller(String action) {
109        return descriptors.get(action).getDefaultScroller();
110    }
111
112    @Override
113    public String getInputStream(String action) {
114        return descriptors.get(action).getInputStream();
115    }
116
117    @Override
118    public boolean isHttpEnabled(String actionId) {
119        return descriptors.get(actionId).httpEnabled;
120    }
121
122    @Override
123    public boolean isSequentialCommands(String actionId) {
124        return descriptors.get(actionId).sequentialCommands;
125    }
126
127    @Override
128    public BulkActionValidation getActionValidation(String action) {
129        return actionValidations.get(action);
130    }
131
132    public void afterStart() {
133        StreamManager manager = Framework.getService(StreamService.class).getStreamManager();
134        streamProcessor = manager.createStreamProcessor(BULK_SERVICE_PROCESSOR_NAME);
135        streamProcessor.start();
136    }
137
138    public void beforeStop() {
139        if (streamProcessor != null) {
140            streamProcessor.stop(STOP_DURATION);
141        }
142    }
143
144}