001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.importer.stream.automation;
020
021import static org.nuxeo.importer.stream.automation.BlobConsumers.DEFAULT_LOG_CONFIG;
022
023import java.time.Duration;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.ecm.automation.OperationContext;
030import org.nuxeo.ecm.automation.OperationException;
031import org.nuxeo.ecm.automation.core.Constants;
032import org.nuxeo.ecm.automation.core.annotations.Context;
033import org.nuxeo.ecm.automation.core.annotations.Operation;
034import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
035import org.nuxeo.ecm.automation.core.annotations.Param;
036import org.nuxeo.importer.stream.consumer.DocumentConsumerPolicy;
037import org.nuxeo.importer.stream.consumer.DocumentConsumerPool;
038import org.nuxeo.importer.stream.consumer.DocumentMessageConsumerFactory;
039import org.nuxeo.importer.stream.message.DocumentMessage;
040import org.nuxeo.lib.stream.log.LogManager;
041import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
042import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
043import org.nuxeo.runtime.api.Framework;
044import org.nuxeo.runtime.stream.StreamService;
045
046import net.jodah.failsafe.RetryPolicy;
047
048/**
049 * @since 9.1
050 */
051@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1", description = "Import documents into repository.")
052public class DocumentConsumers {
053    private static final Log log = LogFactory.getLog(DocumentConsumers.class);
054
055    public static final String ID = "StreamImporter.runDocumentConsumers";
056
057    @Context
058    protected OperationContext ctx;
059
060    @Param(name = "nbThreads", required = false)
061    protected Integer nbThreads;
062
063    @Param(name = "rootFolder")
064    protected String rootFolder;
065
066    @Param(name = "repositoryName", required = false)
067    protected String repositoryName;
068
069    @Param(name = "batchSize", required = false)
070    protected Integer batchSize = 10;
071
072    @Param(name = "batchThresholdS", required = false)
073    protected Integer batchThresholdS = 20;
074
075    @Param(name = "retryMax", required = false)
076    protected Integer retryMax = 3;
077
078    @Param(name = "retryDelayS", required = false)
079    protected Integer retryDelayS = 2;
080
081    @Param(name = "logName", required = false)
082    protected String logName;
083
084    @Param(name = "logConfig", required = false)
085    protected String logConfig;
086
087    @Param(name = "blockIndexing", required = false)
088    protected Boolean blockIndexing = false;
089
090    @Param(name = "blockAsyncListeners", required = false)
091    protected Boolean blockAsyncListeners = false;
092
093    @Param(name = "blockPostCommitListeners", required = false)
094    protected Boolean blockPostCommitListeners = false;
095
096    @Param(name = "blockDefaultSyncListeners", required = false)
097    protected Boolean blockSyncListeners = false;
098
099    @Param(name = "useBulkMode", required = false)
100    protected Boolean useBulkMode = false;
101
102    @Param(name = "waitMessageTimeoutSeconds", required = false)
103    protected Integer waitMessageTimeoutSeconds = 20;
104
105    @OperationMethod
106    public void run() throws OperationException {
107        RandomBlobProducers.checkAccess(ctx);
108        repositoryName = getRepositoryName();
109        ConsumerPolicy consumerPolicy = DocumentConsumerPolicy.builder()
110                                                              .blockIndexing(blockIndexing)
111                                                              .blockAsyncListeners(blockAsyncListeners)
112                                                              .blockPostCommitListeners(blockPostCommitListeners)
113                                                              .blockDefaultSyncListener(blockSyncListeners)
114                                                              .useBulkMode(useBulkMode)
115                                                              .name(ID)
116                                                              .batchPolicy(BatchPolicy.builder()
117                                                                                      .capacity(batchSize)
118                                                                                      .timeThreshold(Duration.ofSeconds(
119                                                                                              batchThresholdS))
120                                                                                      .build())
121                                                              .retryPolicy(new RetryPolicy().withMaxRetries(retryMax)
122                                                                                            .withDelay(retryDelayS,
123                                                                                                    TimeUnit.SECONDS))
124                                                              .maxThreads(getNbThreads())
125                                                              .waitMessageTimeout(
126                                                                      Duration.ofSeconds(waitMessageTimeoutSeconds))
127                                                              .salted()
128                                                              .build();
129        log.warn(String.format("Import documents from log: %s into: %s/%s, with policy: %s", getLogName(),
130                repositoryName, rootFolder, (DocumentConsumerPolicy) consumerPolicy));
131        StreamService service = Framework.getService(StreamService.class);
132        LogManager manager = service.getLogManager(getLogConfig());
133        try (DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(getLogName(), manager,
134                new DocumentMessageConsumerFactory(repositoryName, rootFolder), consumerPolicy)) {
135            consumers.start().get();
136        } catch (InterruptedException e) {
137            Thread.currentThread().interrupt();
138            log.warn("Operation interrupted");
139            throw new RuntimeException(e);
140        } catch (ExecutionException e) {
141            log.error("Operation fails", e);
142            throw new OperationException(e);
143        }
144    }
145
146    protected short getNbThreads() {
147        if (nbThreads != null) {
148            return nbThreads.shortValue();
149        }
150        return 0;
151    }
152
153    protected String getRepositoryName() {
154        if (repositoryName != null && !repositoryName.isEmpty()) {
155            return repositoryName;
156        }
157        return ctx.getCoreSession().getRepositoryName();
158    }
159
160    protected String getLogName() {
161        if (logName != null) {
162            return logName;
163        }
164        return RandomDocumentProducers.DEFAULT_DOC_LOG_NAME;
165    }
166
167    protected String getLogConfig() {
168        if (logConfig != null) {
169            return logConfig;
170        }
171        return DEFAULT_LOG_CONFIG;
172    }
173}