001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.importer.stream.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.event.EventServiceAdmin;
024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
025import org.nuxeo.lib.stream.log.LogManager;
026import org.nuxeo.lib.stream.pattern.Message;
027import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory;
028import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
029import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool;
030import org.nuxeo.runtime.api.Framework;
031
032/**
033 * Consumer Pool that block Nuxeo listeners during import.
034 *
035 * @since 9.1
036 */
037public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> {
038    private static final Log log = LogFactory.getLog(DocumentConsumerPool.class);
039
040    protected static final String NOTIF_LISTENER = "notificationListener";
041
042    protected static final String MIME_LISTENER = "mimetypeIconUpdater";
043
044    protected static final String INDEXING_LISTENER = "elasticSearchInlineListener";
045
046    protected static final String DUBLICORE_LISTENER = "dclistener";
047
048    protected static final String TPL_LISTENER = "templateCreator";
049
050    protected static final String BINARY_LISTENER = "binaryMetadataSyncListener";
051
052    protected static final String UID_LISTENER = "uidlistener";
053
054    protected boolean blockAsync;
055
056    protected final DocumentConsumerPolicy policy;
057
058    protected boolean blockPostCommit;
059
060    protected boolean bulkMode;
061
062    protected boolean listenerIndexingEnabled;
063
064    protected boolean listenerNotifEnabled;
065
066    protected boolean listenerMimeEnabled;
067
068    protected boolean listenerDublincoreEnabled;
069
070    protected boolean listenerTplEnabled;
071
072    protected boolean listenerBinaryEnabled;
073
074    protected boolean listenerUidEnabled;
075
076    public DocumentConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory,
077            ConsumerPolicy consumerPolicy) {
078        super(logName, manager, factory, consumerPolicy);
079        EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class);
080        policy = (DocumentConsumerPolicy) consumerPolicy;
081        if (eventAdmin == null) {
082            log.info("Can not apply document policy there is no event service available");
083            return;
084        }
085        if (policy.blockAsyncListeners()) {
086            blockAsync = eventAdmin.isBlockAsyncHandlers();
087            eventAdmin.setBlockAsyncHandlers(true);
088            log.debug("Block asynchronous listeners");
089        }
090        if (policy.blockPostCommitListeners()) {
091            blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers();
092            eventAdmin.setBlockSyncPostCommitHandlers(true);
093            log.debug("Block post commit listeners");
094        }
095        if (policy.bulkMode()) {
096            bulkMode = eventAdmin.isBulkModeEnabled();
097            eventAdmin.setBulkModeEnabled(true);
098            log.debug("Enable bulk mode");
099        }
100        if (policy.blockIndexing()) {
101            listenerIndexingEnabled = disableSyncListener(eventAdmin, INDEXING_LISTENER);
102            log.debug("Block ES indexing");
103        }
104        if (policy.blockDefaultSyncListeners()) {
105            listenerNotifEnabled = disableSyncListener(eventAdmin, NOTIF_LISTENER);
106            listenerMimeEnabled = disableSyncListener(eventAdmin, MIME_LISTENER);
107            listenerDublincoreEnabled = disableSyncListener(eventAdmin, DUBLICORE_LISTENER);
108            listenerTplEnabled = disableSyncListener(eventAdmin, TPL_LISTENER);
109            listenerBinaryEnabled = disableSyncListener(eventAdmin, BINARY_LISTENER);
110            listenerUidEnabled = disableSyncListener(eventAdmin, UID_LISTENER);
111            log.debug("Block some default synchronous listener");
112        }
113    }
114
115    protected boolean disableSyncListener(EventServiceAdmin eventAdmin, String name) {
116        EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name);
117        if (desc != null && desc.isEnabled()) {
118            eventAdmin.setListenerEnabledFlag(name, false);
119            return true;
120        }
121        return false;
122    }
123
124    @Override
125    public void close() throws Exception {
126        super.close();
127
128        EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class);
129        if (eventAdmin == null) {
130            return;
131        }
132        if (policy.blockAsyncListeners()) {
133            eventAdmin.setBlockAsyncHandlers(blockAsync);
134            log.debug("Restore asynchronous listeners blocking state: " + blockAsync);
135        }
136        if (policy.blockPostCommitListeners()) {
137            eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit);
138            log.debug("Restore post commit listeners blocking state: " + blockPostCommit);
139        }
140        if (policy.bulkMode()) {
141            eventAdmin.setBulkModeEnabled(bulkMode);
142            log.debug("Restore bulk mode: " + bulkMode);
143        }
144        if (policy.blockIndexing() && listenerIndexingEnabled) {
145            eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true);
146            log.debug("Unblock ES indexing");
147        }
148        if (policy.blockDefaultSyncListeners()) {
149            if (listenerNotifEnabled) {
150                eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true);
151            }
152            if (listenerMimeEnabled) {
153                eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true);
154            }
155            if (listenerDublincoreEnabled) {
156                eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true);
157            }
158            if (listenerTplEnabled) {
159                eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true);
160            }
161            if (listenerBinaryEnabled) {
162                eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true);
163            }
164            if (listenerUidEnabled) {
165                eventAdmin.setListenerEnabledFlag(UID_LISTENER, true);
166            }
167            log.debug("Unblock some default synchronous listener");
168        }
169    }
170
171}