001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.importer.stream.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.event.EventServiceAdmin;
024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
025import org.nuxeo.lib.stream.log.LogManager;
026import org.nuxeo.lib.stream.pattern.Message;
027import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory;
028import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
029import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool;
030import org.nuxeo.runtime.api.Framework;
031
032/**
033 * Consumer Pool that block Nuxeo listeners during import.
034 *
035 * @since 9.1
036 */
037public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> {
038    private static final Log log = LogFactory.getLog(DocumentConsumerPool.class);
039
040    protected static final String NOTIF_LISTENER = "notificationListener";
041
042    protected static final String MIME_LISTENER = "mimetypeIconUpdater";
043
044    protected static final String INDEXING_LISTENER = "elasticSearchInlineListener";
045
046    protected static final String DUBLICORE_LISTENER = "dclistener";
047
048    protected static final String TPL_LISTENER = "templateCreator";
049
050    protected static final String BINARY_LISTENER = "binaryMetadataSyncListener";
051
052    protected static final String UID_LISTENER = "uidlistener";
053
054    protected static final String VIDEO_LISTENER = "videoChangedListener";
055
056    protected static final String PICTURE_LISTENER = "pictureViewsGenerationListener";
057
058    protected static final String BLOB_LISTENER = "checkBlobUpdate";
059
060    protected boolean blockAsync;
061
062    protected final DocumentConsumerPolicy policy;
063
064    protected boolean blockPostCommit;
065
066    protected boolean bulkMode;
067
068    protected boolean listenerIndexingEnabled;
069
070    protected boolean listenerNotifEnabled;
071
072    protected boolean listenerMimeEnabled;
073
074    protected boolean listenerDublincoreEnabled;
075
076    protected boolean listenerTplEnabled;
077
078    protected boolean listenerBinaryEnabled;
079
080    protected boolean listenerUidEnabled;
081
082    protected boolean listenerVideoEnabled;
083
084    protected boolean listenerPictureEnabled;
085
086    protected boolean listenerBlobEnabled;
087
088    public DocumentConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory,
089            ConsumerPolicy consumerPolicy) {
090        super(logName, manager, factory, consumerPolicy);
091        EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class);
092        policy = (DocumentConsumerPolicy) consumerPolicy;
093        if (eventAdmin == null) {
094            log.info("Can not apply document policy there is no event service available");
095            return;
096        }
097        if (policy.blockAsyncListeners()) {
098            blockAsync = eventAdmin.isBlockAsyncHandlers();
099            eventAdmin.setBlockAsyncHandlers(true);
100            log.debug("Block asynchronous listeners");
101        }
102        if (policy.blockPostCommitListeners()) {
103            blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers();
104            eventAdmin.setBlockSyncPostCommitHandlers(true);
105            log.debug("Block post commit listeners");
106        }
107        if (policy.bulkMode()) {
108            bulkMode = eventAdmin.isBulkModeEnabled();
109            eventAdmin.setBulkModeEnabled(true);
110            log.debug("Enable bulk mode");
111        }
112        if (policy.blockIndexing()) {
113            listenerIndexingEnabled = disableSyncListener(eventAdmin, INDEXING_LISTENER);
114            log.debug("Block ES indexing");
115        }
116        if (policy.blockDefaultSyncListeners()) {
117            listenerNotifEnabled = disableSyncListener(eventAdmin, NOTIF_LISTENER);
118            listenerMimeEnabled = disableSyncListener(eventAdmin, MIME_LISTENER);
119            listenerDublincoreEnabled = disableSyncListener(eventAdmin, DUBLICORE_LISTENER);
120            listenerTplEnabled = disableSyncListener(eventAdmin, TPL_LISTENER);
121            listenerBinaryEnabled = disableSyncListener(eventAdmin, BINARY_LISTENER);
122            listenerUidEnabled = disableSyncListener(eventAdmin, UID_LISTENER);
123            listenerVideoEnabled = disableSyncListener(eventAdmin, VIDEO_LISTENER);
124            listenerPictureEnabled = disableSyncListener(eventAdmin, PICTURE_LISTENER);
125            listenerBlobEnabled = disableSyncListener(eventAdmin, BLOB_LISTENER);
126            log.debug("Block some default synchronous listener");
127        }
128    }
129
130    protected boolean disableSyncListener(EventServiceAdmin eventAdmin, String name) {
131        EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name);
132        if (desc != null && desc.isEnabled()) {
133            eventAdmin.setListenerEnabledFlag(name, false);
134            return true;
135        }
136        return false;
137    }
138
139    @Override
140    public void close() {
141        super.close();
142
143        EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class);
144        if (eventAdmin == null) {
145            return;
146        }
147        if (policy.blockAsyncListeners()) {
148            eventAdmin.setBlockAsyncHandlers(blockAsync);
149            log.debug("Restore asynchronous listeners blocking state: " + blockAsync);
150        }
151        if (policy.blockPostCommitListeners()) {
152            eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit);
153            log.debug("Restore post commit listeners blocking state: " + blockPostCommit);
154        }
155        if (policy.bulkMode()) {
156            eventAdmin.setBulkModeEnabled(bulkMode);
157            log.debug("Restore bulk mode: " + bulkMode);
158        }
159        if (policy.blockIndexing() && listenerIndexingEnabled) {
160            eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true);
161            log.debug("Unblock ES indexing");
162        }
163        if (policy.blockDefaultSyncListeners()) {
164            if (listenerNotifEnabled) {
165                eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true);
166            }
167            if (listenerMimeEnabled) {
168                eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true);
169            }
170            if (listenerDublincoreEnabled) {
171                eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true);
172            }
173            if (listenerTplEnabled) {
174                eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true);
175            }
176            if (listenerBinaryEnabled) {
177                eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true);
178            }
179            if (listenerUidEnabled) {
180                eventAdmin.setListenerEnabledFlag(UID_LISTENER, true);
181            }
182            if (listenerVideoEnabled) {
183                eventAdmin.setListenerEnabledFlag(VIDEO_LISTENER, true);
184            }
185            if (listenerPictureEnabled) {
186                eventAdmin.setListenerEnabledFlag(PICTURE_LISTENER, true);
187            }
188            if (listenerBlobEnabled) {
189                eventAdmin.setListenerEnabledFlag(BLOB_LISTENER, true);
190            }
191            log.debug("Unblock some default synchronous listener");
192        }
193    }
194
195}