001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.importer.stream.consumer;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.core.event.EventServiceAdmin;
024import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
025import org.nuxeo.lib.stream.codec.Codec;
026import org.nuxeo.lib.stream.log.LogManager;
027import org.nuxeo.lib.stream.pattern.Message;
028import org.nuxeo.lib.stream.pattern.consumer.ConsumerFactory;
029import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
030import org.nuxeo.lib.stream.pattern.consumer.ConsumerPool;
031import org.nuxeo.runtime.api.Framework;
032
033import static org.nuxeo.lib.stream.codec.NoCodec.NO_CODEC;
034
035/**
036 * Consumer Pool that block Nuxeo listeners during import.
037 *
038 * @since 9.1
039 */
040public class DocumentConsumerPool<M extends Message> extends ConsumerPool<M> {
041    private static final Log log = LogFactory.getLog(DocumentConsumerPool.class);
042
043    protected static final String NOTIF_LISTENER = "notificationListener";
044
045    protected static final String MIME_LISTENER = "mimetypeIconUpdater";
046
047    protected static final String INDEXING_LISTENER = "elasticSearchInlineListener";
048
049    protected static final String DUBLICORE_LISTENER = "dclistener";
050
051    protected static final String TPL_LISTENER = "templateCreator";
052
053    protected static final String BINARY_LISTENER = "binaryMetadataSyncListener";
054
055    protected static final String UID_LISTENER = "uidlistener";
056
057    protected static final String VIDEO_LISTENER = "videoChangedListener";
058
059    protected static final String PICTURE_LISTENER = "pictureViewsGenerationListener";
060
061    protected static final String BLOB_LISTENER = "checkBlobUpdate";
062
063    protected boolean blockAsync;
064
065    protected final DocumentConsumerPolicy policy;
066
067    protected boolean blockPostCommit;
068
069    protected boolean bulkMode;
070
071    protected boolean listenerIndexingEnabled;
072
073    protected boolean listenerNotifEnabled;
074
075    protected boolean listenerMimeEnabled;
076
077    protected boolean listenerDublincoreEnabled;
078
079    protected boolean listenerTplEnabled;
080
081    protected boolean listenerBinaryEnabled;
082
083    protected boolean listenerUidEnabled;
084
085    protected boolean listenerVideoEnabled;
086
087    protected boolean listenerPictureEnabled;
088
089    protected boolean listenerBlobEnabled;
090
091    /**
092     * @deprecated since 11.1, due to serialization issue with java 11, use
093     *             {@link #DocumentConsumerPool(String, LogManager, Codec, ConsumerFactory, ConsumerPolicy)} which
094     *             allows to give a {@link org.nuxeo.lib.stream.codec.Codec codec} to
095     *             {@link org.nuxeo.lib.stream.log.LogTailer tailer}.
096     */
097    @Deprecated
098    @SuppressWarnings("unchecked")
099    public DocumentConsumerPool(String logName, LogManager manager, ConsumerFactory<M> factory,
100            ConsumerPolicy consumerPolicy) {
101        this(logName, manager, NO_CODEC, factory, consumerPolicy);
102    }
103
104    public DocumentConsumerPool(String logName, LogManager manager, Codec<M> codec, ConsumerFactory<M> factory,
105            ConsumerPolicy consumerPolicy) {
106        super(logName, manager, codec, factory, consumerPolicy);
107        EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class);
108        policy = (DocumentConsumerPolicy) consumerPolicy;
109        if (eventAdmin == null) {
110            log.info("Can not apply document policy there is no event service available");
111            return;
112        }
113        if (policy.blockAsyncListeners()) {
114            blockAsync = eventAdmin.isBlockAsyncHandlers();
115            eventAdmin.setBlockAsyncHandlers(true);
116            log.debug("Block asynchronous listeners");
117        }
118        if (policy.blockPostCommitListeners()) {
119            blockPostCommit = eventAdmin.isBlockSyncPostCommitHandlers();
120            eventAdmin.setBlockSyncPostCommitHandlers(true);
121            log.debug("Block post commit listeners");
122        }
123        if (policy.bulkMode()) {
124            bulkMode = eventAdmin.isBulkModeEnabled();
125            eventAdmin.setBulkModeEnabled(true);
126            log.debug("Enable bulk mode");
127        }
128        if (policy.blockIndexing()) {
129            listenerIndexingEnabled = disableSyncListener(eventAdmin, INDEXING_LISTENER);
130            log.debug("Block ES indexing");
131        }
132        if (policy.blockDefaultSyncListeners()) {
133            listenerNotifEnabled = disableSyncListener(eventAdmin, NOTIF_LISTENER);
134            listenerMimeEnabled = disableSyncListener(eventAdmin, MIME_LISTENER);
135            listenerDublincoreEnabled = disableSyncListener(eventAdmin, DUBLICORE_LISTENER);
136            listenerTplEnabled = disableSyncListener(eventAdmin, TPL_LISTENER);
137            listenerBinaryEnabled = disableSyncListener(eventAdmin, BINARY_LISTENER);
138            listenerUidEnabled = disableSyncListener(eventAdmin, UID_LISTENER);
139            listenerVideoEnabled = disableSyncListener(eventAdmin, VIDEO_LISTENER);
140            listenerPictureEnabled = disableSyncListener(eventAdmin, PICTURE_LISTENER);
141            listenerBlobEnabled = disableSyncListener(eventAdmin, BLOB_LISTENER);
142            log.debug("Block some default synchronous listener");
143        }
144    }
145
146    protected boolean disableSyncListener(EventServiceAdmin eventAdmin, String name) {
147        EventListenerDescriptor desc = eventAdmin.getListenerList().getDescriptor(name);
148        if (desc != null && desc.isEnabled()) {
149            eventAdmin.setListenerEnabledFlag(name, false);
150            return true;
151        }
152        return false;
153    }
154
155    @Override
156    public void close() {
157        super.close();
158
159        EventServiceAdmin eventAdmin = Framework.getService(EventServiceAdmin.class);
160        if (eventAdmin == null) {
161            return;
162        }
163        if (policy.blockAsyncListeners()) {
164            eventAdmin.setBlockAsyncHandlers(blockAsync);
165            log.debug("Restore asynchronous listeners blocking state: " + blockAsync);
166        }
167        if (policy.blockPostCommitListeners()) {
168            eventAdmin.setBlockSyncPostCommitHandlers(blockPostCommit);
169            log.debug("Restore post commit listeners blocking state: " + blockPostCommit);
170        }
171        if (policy.bulkMode()) {
172            eventAdmin.setBulkModeEnabled(bulkMode);
173            log.debug("Restore bulk mode: " + bulkMode);
174        }
175        if (policy.blockIndexing() && listenerIndexingEnabled) {
176            eventAdmin.setListenerEnabledFlag(INDEXING_LISTENER, true);
177            log.debug("Unblock ES indexing");
178        }
179        if (policy.blockDefaultSyncListeners()) {
180            if (listenerNotifEnabled) {
181                eventAdmin.setListenerEnabledFlag(NOTIF_LISTENER, true);
182            }
183            if (listenerMimeEnabled) {
184                eventAdmin.setListenerEnabledFlag(MIME_LISTENER, true);
185            }
186            if (listenerDublincoreEnabled) {
187                eventAdmin.setListenerEnabledFlag(DUBLICORE_LISTENER, true);
188            }
189            if (listenerTplEnabled) {
190                eventAdmin.setListenerEnabledFlag(TPL_LISTENER, true);
191            }
192            if (listenerBinaryEnabled) {
193                eventAdmin.setListenerEnabledFlag(BINARY_LISTENER, true);
194            }
195            if (listenerUidEnabled) {
196                eventAdmin.setListenerEnabledFlag(UID_LISTENER, true);
197            }
198            if (listenerVideoEnabled) {
199                eventAdmin.setListenerEnabledFlag(VIDEO_LISTENER, true);
200            }
201            if (listenerPictureEnabled) {
202                eventAdmin.setListenerEnabledFlag(PICTURE_LISTENER, true);
203            }
204            if (listenerBlobEnabled) {
205                eventAdmin.setListenerEnabledFlag(BLOB_LISTENER, true);
206            }
207            log.debug("Unblock some default synchronous listener");
208        }
209    }
210
211}