001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.event.EventServiceAdmin; 024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor; 025import org.nuxeo.lib.stream.log.LogManager; 026import org.nuxeo.lib.stream.pattern.Message; 027import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory; 028import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 029import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool; 030import org.nuxeo.runtime.api.Framework; 031 032/** 033 * Consumer Pool that block Nuxeo listeners during import. 034 * 035 * @since 9.1 036 */ 037public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> { 038 private static final Log log = LogFactory.getLog(DocumentConsumerPool.class); 039 040 protected static final String NOTIF_LISTENER = "notificationListener"; 041 042 protected static final String MIME_LISTENER = "mimetypeIconUpdater"; 043 044 protected static final String INDEXING_LISTENER = "elasticSearchInlineListener"; 045 046 protected static final String DUBLICORE_LISTENER = "dclistener"; 047 048 protected static final String TPL_LISTENER = "templateCreator"; 049 050 protected static final String BINARY_LISTENER = "binaryMetadataSyncListener"; 051 052 protected static final String UID_LISTENER = "uidlistener"; 053 054 protected boolean blockAsync; 055 056 protected final DocumentConsumerPolicy policy; 057 058 protected boolean blockPostCommit; 059 060 protected boolean bulkMode; 061 062 protected boolean listenerIndexingEnabled; 063 064 protected boolean listenerNotifEnabled; 065 066 protected boolean listenerMimeEnabled; 067 068 protected boolean listenerDublincoreEnabled; 069 070 protected boolean listenerTplEnabled; 071 072 protected boolean listenerBinaryEnabled; 073 074 protected boolean listenerUidEnabled; 075 076 public DocumentConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory, 077 ConsumerPolicy consumerPolicy) { 078 super(logName, manager, factory, consumerPolicy); 079 EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class); 080 policy = (DocumentConsumerPolicy) consumerPolicy; 081 if (eventAdmin == null) { 082 log.info("Can not apply document policy there is no event service available"); 083 return; 084 } 085 if (policy.blockAsyncListeners()) { 086 blockAsync = eventAdmin.isBlockAsyncHandlers(); 087 eventAdmin.setBlockAsyncHandlers(true); 088 log.debug("Block asynchronous listeners"); 089 } 090 if (policy.blockPostCommitListeners()) { 091 blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers(); 092 eventAdmin.setBlockSyncPostCommitHandlers(true); 093 log.debug("Block post commit listeners"); 094 } 095 if (policy.bulkMode()) { 096 bulkMode = eventAdmin.isBulkModeEnabled(); 097 eventAdmin.setBulkModeEnabled(true); 098 log.debug("Enable bulk mode"); 099 } 100 if (policy.blockIndexing()) { 101 listenerIndexingEnabled = disableSyncListener(eventAdmin, INDEXING_LISTENER); 102 log.debug("Block ES indexing"); 103 } 104 if (policy.blockDefaultSyncListeners()) { 105 listenerNotifEnabled = disableSyncListener(eventAdmin, NOTIF_LISTENER); 106 listenerMimeEnabled = disableSyncListener(eventAdmin, MIME_LISTENER); 107 listenerDublincoreEnabled = disableSyncListener(eventAdmin, DUBLICORE_LISTENER); 108 listenerTplEnabled = disableSyncListener(eventAdmin, TPL_LISTENER); 109 listenerBinaryEnabled = disableSyncListener(eventAdmin, BINARY_LISTENER); 110 listenerUidEnabled = disableSyncListener(eventAdmin, UID_LISTENER); 111 log.debug("Block some default synchronous listener"); 112 } 113 } 114 115 protected boolean disableSyncListener(EventServiceAdmin eventAdmin, String name) { 116 EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name); 117 if (desc != null && desc.isEnabled()) { 118 eventAdmin.setListenerEnabledFlag(name, false); 119 return true; 120 } 121 return false; 122 } 123 124 @Override 125 public void close() throws Exception { 126 super.close(); 127 128 EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class); 129 if (eventAdmin == null) { 130 return; 131 } 132 if (policy.blockAsyncListeners()) { 133 eventAdmin.setBlockAsyncHandlers(blockAsync); 134 log.debug("Restore asynchronous listeners blocking state: " + blockAsync); 135 } 136 if (policy.blockPostCommitListeners()) { 137 eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit); 138 log.debug("Restore post commit listeners blocking state: " + blockPostCommit); 139 } 140 if (policy.bulkMode()) { 141 eventAdmin.setBulkModeEnabled(bulkMode); 142 log.debug("Restore bulk mode: " + bulkMode); 143 } 144 if (policy.blockIndexing() && listenerIndexingEnabled) { 145 eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true); 146 log.debug("Unblock ES indexing"); 147 } 148 if (policy.blockDefaultSyncListeners()) { 149 if (listenerNotifEnabled) { 150 eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true); 151 } 152 if (listenerMimeEnabled) { 153 eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true); 154 } 155 if (listenerDublincoreEnabled) { 156 eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true); 157 } 158 if (listenerTplEnabled) { 159 eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true); 160 } 161 if (listenerBinaryEnabled) { 162 eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true); 163 } 164 if (listenerUidEnabled) { 165 eventAdmin.setListenerEnabledFlag(UID_LISTENER, true); 166 } 167 log.debug("Unblock some default synchronous listener"); 168 } 169 } 170 171}