001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.importer.stream.automation;
020
021import static org.nuxeo.importer.stream.StreamImporters.DEFAULT_LOG_CONFIG;
022import static org.nuxeo.importer.stream.StreamImporters.DEFAULT_LOG_DOC_NAME;
023
024import java.time.Duration;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.commons.logging.Log;
029import org.apache.commons.logging.LogFactory;
030import org.nuxeo.ecm.automation.OperationContext;
031import org.nuxeo.ecm.automation.OperationException;
032import org.nuxeo.ecm.automation.core.Constants;
033import org.nuxeo.ecm.automation.core.annotations.Context;
034import org.nuxeo.ecm.automation.core.annotations.Operation;
035import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
036import org.nuxeo.ecm.automation.core.annotations.Param;
037import org.nuxeo.importer.stream.StreamImporters;
038import org.nuxeo.importer.stream.consumer.DocumentConsumerPolicy;
039import org.nuxeo.importer.stream.consumer.DocumentConsumerPool;
040import org.nuxeo.importer.stream.consumer.DocumentMessageConsumerFactory;
041import org.nuxeo.importer.stream.message.DocumentMessage;
042import org.nuxeo.lib.stream.codec.Codec;
043import org.nuxeo.lib.stream.log.LogManager;
044import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy;
045import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.stream.StreamService;
048
049import net.jodah.failsafe.RetryPolicy;
050
051/**
052 * @since 9.1
053 */
054@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1", description = "Import documents into repository.")
055public class DocumentConsumers {
056    private static final Log log = LogFactory.getLog(DocumentConsumers.class);
057
058    public static final String ID = "StreamImporter.runDocumentConsumers";
059
060    @Context
061    protected OperationContext ctx;
062
063    @Param(name = "nbThreads", required = false)
064    protected Integer nbThreads;
065
066    @Param(name = "rootFolder")
067    protected String rootFolder;
068
069    @Param(name = "repositoryName", required = false)
070    protected String repositoryName;
071
072    @Param(name = "batchSize", required = false)
073    protected Integer batchSize = 10;
074
075    @Param(name = "batchThresholdS", required = false)
076    protected Integer batchThresholdS = 5;
077
078    @Param(name = "retryMax", required = false)
079    protected Integer retryMax = 3;
080
081    @Param(name = "retryDelayS", required = false)
082    protected Integer retryDelayS = 2;
083
084    @Param(name = "logName", required = false)
085    protected String logName = DEFAULT_LOG_DOC_NAME;
086
087    @Param(name = "blockIndexing", required = false)
088    protected Boolean blockIndexing = false;
089
090    @Param(name = "blockAsyncListeners", required = false)
091    protected Boolean blockAsyncListeners = false;
092
093    @Param(name = "blockPostCommitListeners", required = false)
094    protected Boolean blockPostCommitListeners = false;
095
096    @Param(name = "blockDefaultSyncListeners", required = false)
097    protected Boolean blockSyncListeners = false;
098
099    @Param(name = "useBulkMode", required = false)
100    protected Boolean useBulkMode = false;
101
102    @Param(name = "waitMessageTimeoutSeconds", required = false)
103    protected Integer waitMessageTimeoutSeconds = 20;
104
105    @OperationMethod
106    public void run() throws OperationException {
107        RandomBlobProducers.checkAccess(ctx);
108        repositoryName = getRepositoryName();
109        ConsumerPolicy consumerPolicy = DocumentConsumerPolicy.builder()
110                                                              .blockIndexing(blockIndexing)
111                                                              .blockAsyncListeners(blockAsyncListeners)
112                                                              .blockPostCommitListeners(blockPostCommitListeners)
113                                                              .blockDefaultSyncListener(blockSyncListeners)
114                                                              .useBulkMode(useBulkMode)
115                                                              .name(ID)
116                                                              .batchPolicy(BatchPolicy.builder()
117                                                                                      .capacity(batchSize)
118                                                                                      .timeThreshold(Duration.ofSeconds(
119                                                                                              batchThresholdS))
120                                                                                      .build())
121                                                              .retryPolicy(new RetryPolicy().withMaxRetries(retryMax)
122                                                                                            .withDelay(retryDelayS,
123                                                                                                    TimeUnit.SECONDS))
124                                                              .maxThreads(getNbThreads())
125                                                              .waitMessageTimeout(
126                                                                      Duration.ofSeconds(waitMessageTimeoutSeconds))
127                                                              .salted()
128                                                              .build();
129        log.warn(String.format("Import documents from log: %s into: %s/%s, with policy: %s", logName, repositoryName,
130                rootFolder, consumerPolicy));
131        LogManager manager = Framework.getService(StreamService.class).getLogManager();
132        Codec<DocumentMessage> codec = StreamImporters.getDocCodec();
133        try (DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(logName, manager, codec,
134                new DocumentMessageConsumerFactory(repositoryName, rootFolder), consumerPolicy)) {
135            consumers.start().get();
136        } catch (InterruptedException e) {
137            Thread.currentThread().interrupt();
138            log.warn("Operation interrupted");
139            throw new RuntimeException(e);
140        } catch (ExecutionException e) {
141            log.error("Operation fails", e);
142            throw new OperationException(e);
143        }
144    }
145
146    protected short getNbThreads() {
147        if (nbThreads != null) {
148            return nbThreads.shortValue();
149        }
150        return 0;
151    }
152
153    protected String getRepositoryName() {
154        if (repositoryName != null && !repositoryName.isEmpty()) {
155            return repositoryName;
156        }
157        return ctx.getCoreSession().getRepositoryName();
158    }
159}