001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.importer.stream.automation;
020
021import static org.nuxeo.importer.stream.automation.BlobConsumers.DEFAULT_LOG_CONFIG;
022
023import java.time.Duration;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.ecm.automation.OperationContext;
029import org.nuxeo.ecm.automation.core.Constants;
030import org.nuxeo.ecm.automation.core.annotations.Context;
031import org.nuxeo.ecm.automation.core.annotations.Operation;
032import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
033import org.nuxeo.ecm.automation.core.annotations.Param;
034import org.nuxeo.importer.stream.consumer.DocumentConsumerPolicy;
035import org.nuxeo.importer.stream.consumer.DocumentConsumerPool;
036import org.nuxeo.importer.stream.consumer.DocumentMessageConsumerFactory;
037import org.nuxeo.importer.stream.message.DocumentMessage;
038import org.nuxeo.lib.stream.log.LogManager;
039import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
040import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
041import org.nuxeo.runtime.api.Framework;
042import org.nuxeo.runtime.stream.StreamService;
043
044import net.jodah.failsafe.RetryPolicy;
045
046/**
047 * @since 9.1
048 */
049@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1", description = "Import documents into repository.")
050public class DocumentConsumers {
051    private static final Log log = LogFactory.getLog(DocumentConsumers.class);
052
053    public static final String ID = "StreamImporter.runDocumentConsumers";
054
055    @Context
056    protected OperationContext ctx;
057
058    @Param(name = "nbThreads", required = false)
059    protected Integer nbThreads;
060
061    @Param(name = "rootFolder")
062    protected String rootFolder;
063
064    @Param(name = "repositoryName", required = false)
065    protected String repositoryName;
066
067    @Param(name = "batchSize", required = false)
068    protected Integer batchSize = 10;
069
070    @Param(name = "batchThresholdS", required = false)
071    protected Integer batchThresholdS = 20;
072
073    @Param(name = "retryMax", required = false)
074    protected Integer retryMax = 3;
075
076    @Param(name = "retryDelayS", required = false)
077    protected Integer retryDelayS = 2;
078
079    @Param(name = "logName", required = false)
080    protected String logName;
081
082    @Param(name = "logConfig", required = false)
083    protected String logConfig;
084
085    @Param(name = "blockIndexing", required = false)
086    protected Boolean blockIndexing = false;
087
088    @Param(name = "blockAsyncListeners", required = false)
089    protected Boolean blockAsyncListeners = false;
090
091    @Param(name = "blockPostCommitListeners", required = false)
092    protected Boolean blockPostCommitListeners = false;
093
094    @Param(name = "blockDefaultSyncListeners", required = false)
095    protected Boolean blockSyncListeners = false;
096
097    @Param(name = "useBulkMode", required = false)
098    protected Boolean useBulkMode = false;
099
100    @Param(name = "waitMessageTimeoutSeconds", required = false)
101    protected Integer waitMessageTimeoutSeconds = 20;
102
103    @OperationMethod
104    public void run() {
105        RandomBlobProducers.checkAccess(ctx);
106        repositoryName = getRepositoryName();
107        ConsumerPolicy consumerPolicy = DocumentConsumerPolicy.builder()
108                                                              .blockIndexing(blockIndexing)
109                                                              .blockAsyncListeners(blockAsyncListeners)
110                                                              .blockPostCommitListeners(blockPostCommitListeners)
111                                                              .blockDefaultSyncListener(blockSyncListeners)
112                                                              .useBulkMode(useBulkMode)
113                                                              .name(ID)
114                                                              .batchPolicy(BatchPolicy.builder()
115                                                                                      .capacity(batchSize)
116                                                                                      .timeThreshold(Duration.ofSeconds(
117                                                                                              batchThresholdS))
118                                                                                      .build())
119                                                              .retryPolicy(new RetryPolicy().withMaxRetries(retryMax)
120                                                                                            .withDelay(retryDelayS,
121                                                                                                    TimeUnit.SECONDS))
122                                                              .maxThreads(getNbThreads())
123                                                              .waitMessageTimeout(
124                                                                      Duration.ofSeconds(waitMessageTimeoutSeconds))
125                                                              .salted()
126                                                              .build();
127        log.warn(String.format("Import documents from log: %s into: %s/%s, with policy: %s", getMQName(),
128                repositoryName, rootFolder, (DocumentConsumerPolicy) consumerPolicy));
129        StreamService service = Framework.getService(StreamService.class);
130        LogManager manager = service.getLogManager(getMQConfig());
131        try (DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(getMQName(), manager,
132                new DocumentMessageConsumerFactory(repositoryName, rootFolder), consumerPolicy)) {
133            consumers.start().get();
134        } catch (Exception e) {
135            log.error(e.getMessage(), e);
136        }
137    }
138
139    protected short getNbThreads() {
140        if (nbThreads != null) {
141            return nbThreads.shortValue();
142        }
143        return 0;
144    }
145
146    protected String getRepositoryName() {
147        if (repositoryName != null && !repositoryName.isEmpty()) {
148            return repositoryName;
149        }
150        return ctx.getCoreSession().getRepositoryName();
151    }
152
153    protected String getMQName() {
154        if (logName != null) {
155            return logName;
156        }
157        return RandomDocumentProducers.DEFAULT_DOC_LOG_NAME;
158    }
159
160    protected String getMQConfig() {
161        if (logConfig != null) {
162            return logConfig;
163        }
164        return DEFAULT_LOG_CONFIG;
165    }
166}