001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.event.EventServiceAdmin;
024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
025import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
026import org.nuxeo.ecm.platform.importer.mqueues.pattern.Message;
027import org.nuxeo.runtime.api.Framework;
028
029/**
030 * Consumer Pool that block Nuxeo listeners during import.
031 *
032 * @since 9.1
033 */
034public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> {
035    private static final Log log = LogFactory.getLog(DocumentConsumerPool.class);
036    protected static final String NOTIF_LISTENER = "notificationListener";
037    protected static final String MIME_LISTENER = "mimetypeIconUpdater";
038    protected static final String INDEXING_LISTENER = "elasticSearchInlineListener";
039    protected static final String DUBLICORE_LISTENER = "dclistener";
040    protected static final String TPL_LISTENER = "templateCreator";
041    protected static final String BINARY_LISTENER = "binaryMetadataSyncListener";
042    protected static final String UID_LISTENER = "uidlistener";
043    protected boolean blockAsync;
044
045    protected final DocumentConsumerPolicy policy;
046    protected boolean blockPostCommit;
047    protected boolean bulkMode;
048    protected boolean listenerIndexingEnabled;
049    protected boolean listenerNotifEnabled;
050    protected boolean listenerMimeEnabled;
051    protected boolean listenerDublincoreEnabled;
052    protected boolean listenerTplEnabled;
053    protected boolean listenerBinaryEnabled;
054    protected boolean listenerUidEnabled;
055
056    public DocumentConsumerPool(String mqName, MQManager<M> manager, ConsumerFactory<M> factory, ConsumerPolicy consumerPolicy) {
057        super(mqName, manager, factory, consumerPolicy);
058        EventServiceAdmin eventAdmin = Framework.getLocalService(EventServiceAdmin.class);
059        policy = (DocumentConsumerPolicy) consumerPolicy;
060        if (eventAdmin == null) {
061            log.info("Can not apply document policy there is no event service available");
062            return;
063        }
064        if (policy.blockAsyncListeners()) {
065            blockAsync = eventAdmin.isBlockAsyncHandlers();
066            eventAdmin.setBlockAsyncHandlers(true);
067            log.debug("Block asynchronous listeners");
068        }
069        if (policy.blockPostCommitListeners()) {
070            blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers();
071            eventAdmin.setBlockSyncPostCommitHandlers(true);
072            log.debug("Block post commit listeners");
073        }
074        if (policy.bulkMode()) {
075            bulkMode = eventAdmin.isBulkModeEnabled();
076            eventAdmin.setBulkModeEnabled(true);
077            log.debug("Enable bulk mode");
078        }
079        if (policy.blockIndexing()) {
080            listenerIndexingEnabled = disableSyncListner(eventAdmin, INDEXING_LISTENER);
081            log.debug("Block ES indexing");
082        }
083        if (policy.blockDefaultSyncListeners()) {
084            listenerNotifEnabled = disableSyncListner(eventAdmin, NOTIF_LISTENER);
085            listenerMimeEnabled = disableSyncListner(eventAdmin, MIME_LISTENER);
086            listenerDublincoreEnabled = disableSyncListner(eventAdmin, DUBLICORE_LISTENER);
087            listenerTplEnabled = disableSyncListner(eventAdmin, TPL_LISTENER);
088            listenerBinaryEnabled = disableSyncListner(eventAdmin, BINARY_LISTENER);
089            listenerUidEnabled = disableSyncListner(eventAdmin, UID_LISTENER);
090            log.debug("Block some default synchronous listener");
091        }
092    }
093
094    protected boolean disableSyncListner(EventServiceAdmin eventAdmin, String name) {
095        EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name);
096        if (desc != null && desc.isEnabled()) {
097            eventAdmin.setListenerEnabledFlag(name, false);
098            return true;
099        }
100        return false;
101    }
102
103    @Override
104    public void close() throws Exception {
105        super.close();
106
107        EventServiceAdmin eventAdmin = Framework.getLocalService(EventServiceAdmin.class);
108        if (eventAdmin == null) {
109            return;
110        }
111        if (policy.blockAsyncListeners()) {
112            eventAdmin.setBlockAsyncHandlers(blockAsync);
113            log.debug("Restore asynchronous listeners blocking state: " + blockAsync);
114        }
115        if (policy.blockPostCommitListeners()) {
116            eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit);
117            log.debug("Restore post commit listeners blocking state: " + blockPostCommit);
118        }
119        if (policy.bulkMode()) {
120            eventAdmin.setBulkModeEnabled(bulkMode);
121            log.debug("Restore bulk mode: " + bulkMode);
122        }
123        if (policy.blockIndexing() && listenerIndexingEnabled) {
124            eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true);
125            log.debug("Unblock ES indexing");
126        }
127        if (policy.blockDefaultSyncListeners()) {
128            if (listenerNotifEnabled) {
129                eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true);
130            }
131            if (listenerMimeEnabled) {
132                eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true);
133            }
134            if (listenerDublincoreEnabled) {
135                eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true);
136            }
137            if (listenerTplEnabled) {
138                eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true);
139            }
140            if (listenerBinaryEnabled) {
141                eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true);
142            }
143            if (listenerUidEnabled) {
144                eventAdmin.setListenerEnabledFlag(UID_LISTENER, true);
145            }
146            log.debug("Unblock some default synchronous listener");
147        }
148    }
149
150
151}