001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.consumer; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.nuxeo.ecm.core.event.EventServiceAdmin; 024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor; 025import org.nuxeo.lib.stream.codec.Codec; 026import org.nuxeo.lib.stream.log.LogManager; 027import org.nuxeo.lib.stream.pattern.Message; 028import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory; 029import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 030import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool; 031import org.nuxeo.runtime.api.Framework; 032 033import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC; 034 035/** 036 * Consumer Pool that block Nuxeo listeners during import. 037 * 038 * @since 9.1 039 */ 040public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> { 041 private static final Log log = LogFactory.getLog(DocumentConsumerPool.class); 042 043 protected static final String NOTIF_LISTENER = "notificationListener"; 044 045 protected static final String MIME_LISTENER = "mimetypeIconUpdater"; 046 047 protected static final String INDEXING_LISTENER = "elasticSearchInlineListener"; 048 049 protected static final String DUBLICORE_LISTENER = "dclistener"; 050 051 protected static final String TPL_LISTENER = "templateCreator"; 052 053 protected static final String BINARY_LISTENER = "binaryMetadataSyncListener"; 054 055 protected static final String UID_LISTENER = "uidlistener"; 056 057 protected static final String VIDEO_LISTENER = "videoChangedListener"; 058 059 protected static final String PICTURE_LISTENER = "pictureViewsGenerationListener"; 060 061 protected static final String BLOB_LISTENER = "checkBlobUpdate"; 062 063 protected boolean blockAsync; 064 065 protected final DocumentConsumerPolicy policy; 066 067 protected boolean blockPostCommit; 068 069 protected boolean bulkMode; 070 071 protected boolean listenerIndexingEnabled; 072 073 protected boolean listenerNotifEnabled; 074 075 protected boolean listenerMimeEnabled; 076 077 protected boolean listenerDublincoreEnabled; 078 079 protected boolean listenerTplEnabled; 080 081 protected boolean listenerBinaryEnabled; 082 083 protected boolean listenerUidEnabled; 084 085 protected boolean listenerVideoEnabled; 086 087 protected boolean listenerPictureEnabled; 088 089 protected boolean listenerBlobEnabled; 090 091 /** 092 * @deprecated since 11.1, due to serialization issue with java 11, use 093 * {@link #DocumentConsumerPool(String, LogManager, Codec, ConsumerFactory, ConsumerPolicy)} which 094 * allows to give a {@link org.nuxeo.lib.stream.codec.Codec codec} to 095 * {@link org.nuxeo.lib.stream.log.LogTailer tailer}. 096 */ 097 @Deprecated 098 @SuppressWarnings("unchecked") 099 public DocumentConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory, 100 ConsumerPolicy consumerPolicy) { 101 this(logName, manager, NO_CODEC, factory, consumerPolicy); 102 } 103 104 public DocumentConsumerPool(String logName, LogManager manager, Codec<M> codec, ConsumerFactory<M> factory, 105 ConsumerPolicy consumerPolicy) { 106 super(logName, manager, codec, factory, consumerPolicy); 107 EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class); 108 policy = (DocumentConsumerPolicy) consumerPolicy; 109 if (eventAdmin == null) { 110 log.info("Can not apply document policy there is no event service available"); 111 return; 112 } 113 if (policy.blockAsyncListeners()) { 114 blockAsync = eventAdmin.isBlockAsyncHandlers(); 115 eventAdmin.setBlockAsyncHandlers(true); 116 log.debug("Block asynchronous listeners"); 117 } 118 if (policy.blockPostCommitListeners()) { 119 blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers(); 120 eventAdmin.setBlockSyncPostCommitHandlers(true); 121 log.debug("Block post commit listeners"); 122 } 123 if (policy.bulkMode()) { 124 bulkMode = eventAdmin.isBulkModeEnabled(); 125 eventAdmin.setBulkModeEnabled(true); 126 log.debug("Enable bulk mode"); 127 } 128 if (policy.blockIndexing()) { 129 listenerIndexingEnabled = disableSyncListener(eventAdmin, INDEXING_LISTENER); 130 log.debug("Block ES indexing"); 131 } 132 if (policy.blockDefaultSyncListeners()) { 133 listenerNotifEnabled = disableSyncListener(eventAdmin, NOTIF_LISTENER); 134 listenerMimeEnabled = disableSyncListener(eventAdmin, MIME_LISTENER); 135 listenerDublincoreEnabled = disableSyncListener(eventAdmin, DUBLICORE_LISTENER); 136 listenerTplEnabled = disableSyncListener(eventAdmin, TPL_LISTENER); 137 listenerBinaryEnabled = disableSyncListener(eventAdmin, BINARY_LISTENER); 138 listenerUidEnabled = disableSyncListener(eventAdmin, UID_LISTENER); 139 listenerVideoEnabled = disableSyncListener(eventAdmin, VIDEO_LISTENER); 140 listenerPictureEnabled = disableSyncListener(eventAdmin, PICTURE_LISTENER); 141 listenerBlobEnabled = disableSyncListener(eventAdmin, BLOB_LISTENER); 142 log.debug("Block some default synchronous listener"); 143 } 144 } 145 146 protected boolean disableSyncListener(EventServiceAdmin eventAdmin, String name) { 147 EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name); 148 if (desc != null && desc.isEnabled()) { 149 eventAdmin.setListenerEnabledFlag(name, false); 150 return true; 151 } 152 return false; 153 } 154 155 @Override 156 public void close() { 157 super.close(); 158 159 EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class); 160 if (eventAdmin == null) { 161 return; 162 } 163 if (policy.blockAsyncListeners()) { 164 eventAdmin.setBlockAsyncHandlers(blockAsync); 165 log.debug("Restore asynchronous listeners blocking state: " + blockAsync); 166 } 167 if (policy.blockPostCommitListeners()) { 168 eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit); 169 log.debug("Restore post commit listeners blocking state: " + blockPostCommit); 170 } 171 if (policy.bulkMode()) { 172 eventAdmin.setBulkModeEnabled(bulkMode); 173 log.debug("Restore bulk mode: " + bulkMode); 174 } 175 if (policy.blockIndexing() && listenerIndexingEnabled) { 176 eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true); 177 log.debug("Unblock ES indexing"); 178 } 179 if (policy.blockDefaultSyncListeners()) { 180 if (listenerNotifEnabled) { 181 eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true); 182 } 183 if (listenerMimeEnabled) { 184 eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true); 185 } 186 if (listenerDublincoreEnabled) { 187 eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true); 188 } 189 if (listenerTplEnabled) { 190 eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true); 191 } 192 if (listenerBinaryEnabled) { 193 eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true); 194 } 195 if (listenerUidEnabled) { 196 eventAdmin.setListenerEnabledFlag(UID_LISTENER, true); 197 } 198 if (listenerVideoEnabled) { 199 eventAdmin.setListenerEnabledFlag(VIDEO_LISTENER, true); 200 } 201 if (listenerPictureEnabled) { 202 eventAdmin.setListenerEnabledFlag(PICTURE_LISTENER, true); 203 } 204 if (listenerBlobEnabled) { 205 eventAdmin.setListenerEnabledFlag(BLOB_LISTENER, true); 206 } 207 log.debug("Unblock some default synchronous listener"); 208 } 209 } 210 211}