001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.event.EventServiceAdmin; 024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor; 025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager; 026import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message; 027import org.nuxeo.runtime.api.Framework; 028 029/** 030 * Consumer Pool that block Nuxeo listeners during import. 031 * 032 * @since 9.1 033 */ 034public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> { 035 private static final Log log = LogFactory.getLog(DocumentConsumerPool.class); 036 protected static final String NOTIF_LISTENER = "notificationListener"; 037 protected static final String MIME_LISTENER = "mimetypeIconUpdater"; 038 protected static final String INDEXING_LISTENER = "elasticSearchInlineListener"; 039 protected static final String DUBLICORE_LISTENER = "dclistener"; 040 protected static final String TPL_LISTENER = "templateCreator"; 041 protected static final String BINARY_LISTENER = "binaryMetadataSyncListener"; 042 protected static final String UID_LISTENER = "uidlistener"; 043 protected boolean blockAsync; 044 045 protected final DocumentConsumerPolicy policy; 046 protected boolean blockPostCommit; 047 protected boolean bulkMode; 048 protected boolean listenerIndexingEnabled; 049 protected boolean listenerNotifEnabled; 050 protected boolean listenerMimeEnabled; 051 protected boolean listenerDublincoreEnabled; 052 protected boolean listenerTplEnabled; 053 protected boolean listenerBinaryEnabled; 054 protected boolean listenerUidEnabled; 055 056 public DocumentConsumerPool(String mqName, MQManager<M> manager, ConsumerFactory<M> factory, ConsumerPolicy consumerPolicy) { 057 super(mqName, manager, factory, consumerPolicy); 058 EventServiceAdmin eventAdmin = Framework.getLocalService(EventServiceAdmin.class); 059 policy = (DocumentConsumerPolicy) consumerPolicy; 060 if (eventAdmin == null) { 061 log.info("Can not apply document policy there is no event service available"); 062 return; 063 } 064 if (policy.blockAsyncListeners()) { 065 blockAsync = eventAdmin.isBlockAsyncHandlers(); 066 eventAdmin.setBlockAsyncHandlers(true); 067 log.debug("Block asynchronous listeners"); 068 } 069 if (policy.blockPostCommitListeners()) { 070 blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers(); 071 eventAdmin.setBlockSyncPostCommitHandlers(true); 072 log.debug("Block post commit listeners"); 073 } 074 if (policy.bulkMode()) { 075 bulkMode = eventAdmin.isBulkModeEnabled(); 076 eventAdmin.setBulkModeEnabled(true); 077 log.debug("Enable bulk mode"); 078 } 079 if (policy.blockIndexing()) { 080 listenerIndexingEnabled = disableSyncListner(eventAdmin, INDEXING_LISTENER); 081 log.debug("Block ES indexing"); 082 } 083 if (policy.blockDefaultSyncListeners()) { 084 listenerNotifEnabled = disableSyncListner(eventAdmin, NOTIF_LISTENER); 085 listenerMimeEnabled = disableSyncListner(eventAdmin, MIME_LISTENER); 086 listenerDublincoreEnabled = disableSyncListner(eventAdmin, DUBLICORE_LISTENER); 087 listenerTplEnabled = disableSyncListner(eventAdmin, TPL_LISTENER); 088 listenerBinaryEnabled = disableSyncListner(eventAdmin, BINARY_LISTENER); 089 listenerUidEnabled = disableSyncListner(eventAdmin, UID_LISTENER); 090 log.debug("Block some default synchronous listener"); 091 } 092 } 093 094 protected boolean disableSyncListner(EventServiceAdmin eventAdmin, String name) { 095 EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name); 096 if (desc != null && desc.isEnabled()) { 097 eventAdmin.setListenerEnabledFlag(name, false); 098 return true; 099 } 100 return false; 101 } 102 103 @Override 104 public void close() throws Exception { 105 super.close(); 106 107 EventServiceAdmin eventAdmin = Framework.getLocalService(EventServiceAdmin.class); 108 if (eventAdmin == null) { 109 return; 110 } 111 if (policy.blockAsyncListeners()) { 112 eventAdmin.setBlockAsyncHandlers(blockAsync); 113 log.debug("Restore asynchronous listeners blocking state: " + blockAsync); 114 } 115 if (policy.blockPostCommitListeners()) { 116 eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit); 117 log.debug("Restore post commit listeners blocking state: " + blockPostCommit); 118 } 119 if (policy.bulkMode()) { 120 eventAdmin.setBulkModeEnabled(bulkMode); 121 log.debug("Restore bulk mode: " + bulkMode); 122 } 123 if (policy.blockIndexing() && listenerIndexingEnabled) { 124 eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true); 125 log.debug("Unblock ES indexing"); 126 } 127 if (policy.blockDefaultSyncListeners()) { 128 if (listenerNotifEnabled) { 129 eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true); 130 } 131 if (listenerMimeEnabled) { 132 eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true); 133 } 134 if (listenerDublincoreEnabled) { 135 eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true); 136 } 137 if (listenerTplEnabled) { 138 eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true); 139 } 140 if (listenerBinaryEnabled) { 141 eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true); 142 } 143 if (listenerUidEnabled) { 144 eventAdmin.setListenerEnabledFlag(UID_LISTENER, true); 145 } 146 log.debug("Unblock some default synchronous listener"); 147 } 148 } 149 150 151}