001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import java.time.Duration; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.stream.Collectors; 026 027import org.nuxeo.lib.stream.computation.StreamManager; 028import org.nuxeo.lib.stream.computation.StreamProcessor; 029import org.nuxeo.runtime.api.Framework; 030import org.nuxeo.runtime.model.Descriptor; 031import org.nuxeo.runtime.stream.StreamService; 032 033/** 034 * @since 10.3 035 */ 036public class BulkAdminServiceImpl implements BulkAdminService { 037 038 public static final String SCROLLER_NAME = "bulk/scroller"; 039 040 public static final String STATUS_NAME = "bulk/status"; 041 042 public static final String BULK_SERVICE_PROCESSOR_NAME = "bulkServiceProcessor"; 043 044 public static final String BULK_SCROLL_SIZE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.size"; 045 046 public static final String BULK_SCROLL_KEEP_ALIVE_PROPERTY = "nuxeo.core.bulk.scroller.scroll.keepAliveSeconds"; 047 048 public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediate"; 049 050 // @since 11.4 051 public static final String BULK_SCROLL_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY = "nuxeo.core.bulk.scroller.produceImmediateThreshold"; 052 053 // by default switch to produce immediate when there are more than 1m ids 054 // @since 11.4 055 public static final int DEFAULT_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY = 1_000_000; 056 057 public static final int DEFAULT_SCROLL_SIZE = 100; 058 059 public static final int DEFAULT_SCROLL_KEEP_ALIVE = 60; 060 061 // @since 11.2 062 public static final String BULK_SCROLL_TRANSACTION_TIMEOUT_PROPERTY = "nuxeo.core.bulk.scroller.transactionTimeout"; 063 064 // @since 11.2 065 public static final Duration DEFAULT_SCROLL_TRANSACTION_TIMEOUT = Duration.ofDays(2); 066 067 public static final Duration STOP_DURATION = Duration.ofSeconds(1); 068 069 protected final Map<String, BulkActionDescriptor> descriptors; 070 071 protected final List<String> actions; 072 073 protected StreamProcessor streamProcessor; 074 075 protected Map<String, BulkActionValidation> actionValidations; 076 077 public BulkAdminServiceImpl(List<BulkActionDescriptor> descriptorsList) { 078 this.actions = descriptorsList.stream().map(Descriptor::getId).collect(Collectors.toList()); 079 this.descriptors = new HashMap<>(descriptorsList.size()); 080 descriptorsList.forEach(descriptor -> descriptors.put(descriptor.name, descriptor)); 081 actionValidations = descriptorsList.stream() 082 .collect(HashMap::new, (map, desc) -> map.put(desc.name, 083 desc.validationClass != null ? desc.newValidationInstance() : null), 084 HashMap::putAll); 085 } 086 087 @Override 088 public List<String> getActions() { 089 return actions; 090 } 091 092 @Override 093 public int getBucketSize(String action) { 094 return descriptors.get(action).getBucketSize(); 095 } 096 097 @Override 098 public int getBatchSize(String action) { 099 return descriptors.get(action).getBatchSize(); 100 } 101 102 @Override 103 public Long getQueryLimit(String action) { 104 return descriptors.get(action).getDefaultQueryLimit(); 105 } 106 107 @Override 108 public String getDefaultScroller(String action) { 109 return descriptors.get(action).getDefaultScroller(); 110 } 111 112 @Override 113 public String getInputStream(String action) { 114 return descriptors.get(action).getInputStream(); 115 } 116 117 @Override 118 public boolean isHttpEnabled(String actionId) { 119 return descriptors.get(actionId).httpEnabled; 120 } 121 122 @Override 123 public boolean isSequentialCommands(String actionId) { 124 return descriptors.get(actionId).sequentialCommands; 125 } 126 127 @Override 128 public BulkActionValidation getActionValidation(String action) { 129 return actionValidations.get(action); 130 } 131 132 public void afterStart() { 133 StreamManager manager = Framework.getService(StreamService.class).getStreamManager(); 134 streamProcessor = manager.createStreamProcessor(BULK_SERVICE_PROCESSOR_NAME); 135 streamProcessor.start(); 136 } 137 138 public void beforeStop() { 139 if (streamProcessor != null) { 140 streamProcessor.stop(STOP_DURATION); 141 } 142 } 143 144}