001/* 002 * (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.BULK_SCROLL_KEEP_ALIVE_PROPERTY; 022import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY; 023import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.BULK_SCROLL_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY; 024import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.BULK_SCROLL_SIZE_PROPERTY; 025import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.BULK_SCROLL_TRANSACTION_TIMEOUT_PROPERTY; 026import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.DEFAULT_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY; 027import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.DEFAULT_SCROLL_KEEP_ALIVE; 028import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.DEFAULT_SCROLL_SIZE; 029import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.DEFAULT_SCROLL_TRANSACTION_TIMEOUT; 030import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.SCROLLER_NAME; 031import static org.nuxeo.ecm.core.bulk.BulkAdminServiceImpl.STATUS_NAME; 032import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.COMMAND_STREAM; 033import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.DONE_STREAM; 034import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; 035import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; 036import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; 037 038import java.time.Duration; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.List; 042import java.util.Map; 043 044import org.nuxeo.ecm.core.bulk.computation.BulkScrollerComputation; 045import org.nuxeo.ecm.core.bulk.computation.BulkStatusComputation; 046import org.nuxeo.lib.stream.computation.Topology; 047import org.nuxeo.runtime.api.Framework; 048import org.nuxeo.runtime.services.config.ConfigurationService; 049import org.nuxeo.runtime.stream.StreamProcessorTopology; 050 051/** 052 * @since 11.1 053 */ 054public class BulkServiceProcessor implements StreamProcessorTopology { 055 056 @Override 057 public Topology getTopology(Map<String, String> options) { 058 List<String> mapping = new ArrayList<>(); 059 mapping.add(INPUT_1 + ":" + COMMAND_STREAM); 060 BulkAdminService actionService = Framework.getService(BulkAdminService.class); 061 List<String> actions = actionService.getActions(); 062 int i = 1; 063 for (String action : actions) { 064 mapping.add(String.format("o%s:%s", i, actionService.getInputStream(action))); 065 i++; 066 } 067 mapping.add(String.format("o%s:%s", i, STATUS_STREAM)); 068 ConfigurationService confService = Framework.getService(ConfigurationService.class); 069 int scrollBatchSize = confService.getInteger(BULK_SCROLL_SIZE_PROPERTY, DEFAULT_SCROLL_SIZE); 070 int scrollKeepAlive = confService.getInteger(BULK_SCROLL_KEEP_ALIVE_PROPERTY, DEFAULT_SCROLL_KEEP_ALIVE); 071 Duration transactionTimeout = confService.getDuration(BULK_SCROLL_TRANSACTION_TIMEOUT_PROPERTY, 072 DEFAULT_SCROLL_TRANSACTION_TIMEOUT); 073 074 boolean scrollProduceImmediate = confService.isBooleanTrue(BULK_SCROLL_PRODUCE_IMMEDIATE_PROPERTY); 075 int scrollProduceImmediateThreshold = confService.getInteger(BULK_SCROLL_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY) 076 .orElse(DEFAULT_PRODUCE_IMMEDIATE_THRESHOLD_PROPERTY); 077 return Topology.builder() 078 .addComputation( 079 () -> BulkScrollerComputation.builder(SCROLLER_NAME, actions.size() + 1) 080 .setScrollBatchSize(scrollBatchSize) 081 .setScrollKeepAliveSeconds(scrollKeepAlive) 082 .setTransactionTimeout(transactionTimeout) 083 .setProduceImmediate(scrollProduceImmediate) 084 .setProduceImmediateThreshold( 085 scrollProduceImmediateThreshold) 086 .build(), 087 mapping) 088 .addComputation(() -> new BulkStatusComputation(STATUS_NAME), 089 Arrays.asList(INPUT_1 + ":" + STATUS_STREAM, // 090 OUTPUT_1 + ":" + DONE_STREAM)) 091 .build(); 092 } 093}