001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.event.EventServiceAdmin; 024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor; 025import org.nuxeo.lib.stream.log.LogManager; 026import org.nuxeo.lib.stream.pattern.Message; 027import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory; 028import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 029import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool; 030import org.nuxeo.runtime.api.Framework; 031 032/** 033 * Consumer Pool that block Nuxeo listeners during import. 034 * 035 * @since 9.1 036 */ 037public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> { 038 private static final Log log = LogFactory.getLog(DocumentConsumerPool.class); 039 040 protected static final String NOTIF_LISTENER = "notificationListener"; 041 042 protected static final String MIME_LISTENER = "mimetypeIconUpdater"; 043 044 protected static final String INDEXING_LISTENER = "elasticSearchInlineListener"; 045 046 protected static final String DUBLICORE_LISTENER = "dclistener"; 047 048 protected static final String TPL_LISTENER = "templateCreator"; 049 050 protected static final String BINARY_LISTENER = "binaryMetadataSyncListener"; 051 052 protected static final String UID_LISTENER = "uidlistener"; 053 054 protected static final String VIDEO_LISTENER = "videoChangedListener"; 055 056 protected static final String PICTURE_LISTENER = "pictureViewsGenerationListener"; 057 058 protected static final String BLOB_LISTENER = "checkBlobUpdate"; 059 060 protected boolean blockAsync; 061 062 protected final DocumentConsumerPolicy policy; 063 064 protected boolean blockPostCommit; 065 066 protected boolean bulkMode; 067 068 protected boolean listenerIndexingEnabled; 069 070 protected boolean listenerNotifEnabled; 071 072 protected boolean listenerMimeEnabled; 073 074 protected boolean listenerDublincoreEnabled; 075 076 protected boolean listenerTplEnabled; 077 078 protected boolean listenerBinaryEnabled; 079 080 protected boolean listenerUidEnabled; 081 082 protected boolean listenerVideoEnabled; 083 084 protected boolean listenerPictureEnabled; 085 086 protected boolean listenerBlobEnabled; 087 088 public DocumentConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory, 089 ConsumerPolicy consumerPolicy) { 090 super(logName, manager, factory, consumerPolicy); 091 EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class); 092 policy = (DocumentConsumerPolicy) consumerPolicy; 093 if (eventAdmin == null) { 094 log.info("Can not apply document policy there is no event service available"); 095 return; 096 } 097 if (policy.blockAsyncListeners()) { 098 blockAsync = eventAdmin.isBlockAsyncHandlers(); 099 eventAdmin.setBlockAsyncHandlers(true); 100 log.debug("Block asynchronous listeners"); 101 } 102 if (policy.blockPostCommitListeners()) { 103 blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers(); 104 eventAdmin.setBlockSyncPostCommitHandlers(true); 105 log.debug("Block post commit listeners"); 106 } 107 if (policy.bulkMode()) { 108 bulkMode = eventAdmin.isBulkModeEnabled(); 109 eventAdmin.setBulkModeEnabled(true); 110 log.debug("Enable bulk mode"); 111 } 112 if (policy.blockIndexing()) { 113 listenerIndexingEnabled = disableSyncListener(eventAdmin, INDEXING_LISTENER); 114 log.debug("Block ES indexing"); 115 } 116 if (policy.blockDefaultSyncListeners()) { 117 listenerNotifEnabled = disableSyncListener(eventAdmin, NOTIF_LISTENER); 118 listenerMimeEnabled = disableSyncListener(eventAdmin, MIME_LISTENER); 119 listenerDublincoreEnabled = disableSyncListener(eventAdmin, DUBLICORE_LISTENER); 120 listenerTplEnabled = disableSyncListener(eventAdmin, TPL_LISTENER); 121 listenerBinaryEnabled = disableSyncListener(eventAdmin, BINARY_LISTENER); 122 listenerUidEnabled = disableSyncListener(eventAdmin, UID_LISTENER); 123 listenerVideoEnabled = disableSyncListener(eventAdmin, VIDEO_LISTENER); 124 listenerPictureEnabled = disableSyncListener(eventAdmin, PICTURE_LISTENER); 125 listenerBlobEnabled = disableSyncListener(eventAdmin, BLOB_LISTENER); 126 log.debug("Block some default synchronous listener"); 127 } 128 } 129 130 protected boolean disableSyncListener(EventServiceAdmin eventAdmin, String name) { 131 EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name); 132 if (desc != null && desc.isEnabled()) { 133 eventAdmin.setListenerEnabledFlag(name, false); 134 return true; 135 } 136 return false; 137 } 138 139 @Override 140 public void close() { 141 super.close(); 142 143 EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class); 144 if (eventAdmin == null) { 145 return; 146 } 147 if (policy.blockAsyncListeners()) { 148 eventAdmin.setBlockAsyncHandlers(blockAsync); 149 log.debug("Restore asynchronous listeners blocking state: " + blockAsync); 150 } 151 if (policy.blockPostCommitListeners()) { 152 eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit); 153 log.debug("Restore post commit listeners blocking state: " + blockPostCommit); 154 } 155 if (policy.bulkMode()) { 156 eventAdmin.setBulkModeEnabled(bulkMode); 157 log.debug("Restore bulk mode: " + bulkMode); 158 } 159 if (policy.blockIndexing() && listenerIndexingEnabled) { 160 eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true); 161 log.debug("Unblock ES indexing"); 162 } 163 if (policy.blockDefaultSyncListeners()) { 164 if (listenerNotifEnabled) { 165 eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true); 166 } 167 if (listenerMimeEnabled) { 168 eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true); 169 } 170 if (listenerDublincoreEnabled) { 171 eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true); 172 } 173 if (listenerTplEnabled) { 174 eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true); 175 } 176 if (listenerBinaryEnabled) { 177 eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true); 178 } 179 if (listenerUidEnabled) { 180 eventAdmin.setListenerEnabledFlag(UID_LISTENER, true); 181 } 182 if (listenerVideoEnabled) { 183 eventAdmin.setListenerEnabledFlag(VIDEO_LISTENER, true); 184 } 185 if (listenerPictureEnabled) { 186 eventAdmin.setListenerEnabledFlag(PICTURE_LISTENER, true); 187 } 188 if (listenerBlobEnabled) { 189 eventAdmin.setListenerEnabledFlag(BLOB_LISTENER, true); 190 } 191 log.debug("Unblock some default synchronous listener"); 192 } 193 } 194 195}