001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.bulk; 020 021import java.util.List; 022import java.util.stream.Collectors; 023 024import org.nuxeo.runtime.api.Framework; 025import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl; 026import org.nuxeo.runtime.model.ComponentContext; 027import org.nuxeo.runtime.model.ComponentManager; 028import org.nuxeo.runtime.model.DefaultComponent; 029 030/** 031 * The bulk component. 032 * 033 * @since 10.2 034 */ 035public class BulkComponent extends DefaultComponent { 036 037 public static final String XP_ACTIONS = "actions"; 038 039 protected BulkService bulkService; 040 041 protected BulkAdminService bulkAdminService; 042 043 @Override 044 @SuppressWarnings("unchecked") 045 public <T> T getAdapter(Class<T> adapter) { 046 if (adapter.isAssignableFrom(BulkService.class)) { 047 return (T) bulkService; 048 } else if (adapter.isAssignableFrom(BulkAdminService.class)) { 049 return (T) bulkAdminService; 050 } 051 return null; 052 } 053 054 @Override 055 public int getApplicationStartedOrder() { 056 // The Bulk Service uses a processor. The processor's topology is built using the BulkAdminService that defines 057 // the Bulk Actions. Processor being contributed to the StreamService, the BulkAdminService must be started 058 // before the StreamService. The StreamService is started after the KafkaConfigService, so we use the same 059 // level. 060 return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER; 061 } 062 063 @Override 064 public void start(ComponentContext context) { 065 super.start(context); 066 bulkAdminService = new BulkAdminServiceImpl(getEnabledDescriptors()); 067 bulkService = new BulkServiceImpl(); 068 new ComponentListener().install(); 069 } 070 071 protected List<BulkActionDescriptor> getEnabledDescriptors() { 072 List<BulkActionDescriptor> descriptors = getDescriptors(XP_ACTIONS); 073 return descriptors.stream().filter(BulkActionDescriptor::isEnabled).collect(Collectors.toList()); 074 } 075 076 protected class ComponentListener implements ComponentManager.Listener { 077 @Override 078 public void afterStart(ComponentManager mgr, boolean isResume) { 079 // this is called once all components are started and ready 080 ((BulkAdminServiceImpl) bulkAdminService).afterStart(); 081 } 082 083 @Override 084 public void beforeStop(ComponentManager mgr, boolean isStandby) { 085 // this is called before components are stopped 086 if (bulkAdminService != null) { 087 ((BulkAdminServiceImpl) bulkAdminService).beforeStop(); 088 bulkAdminService = null; 089 } 090 bulkService = null; 091 Framework.getRuntime().getComponentManager().removeListener(this); 092 } 093 } 094}