001/*
002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Benoit Delbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.automation;
020
021import net.jodah.failsafe.RetryPolicy;
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.nuxeo.ecm.automation.OperationContext;
025import org.nuxeo.ecm.automation.core.Constants;
026import org.nuxeo.ecm.automation.core.annotations.Context;
027import org.nuxeo.ecm.automation.core.annotations.Operation;
028import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
029import org.nuxeo.ecm.automation.core.annotations.Param;
030import org.nuxeo.ecm.platform.importer.mqueues.consumer.BatchPolicy;
031import org.nuxeo.ecm.platform.importer.mqueues.consumer.ConsumerPolicy;
032import org.nuxeo.ecm.platform.importer.mqueues.consumer.DocumentConsumerPool;
033import org.nuxeo.ecm.platform.importer.mqueues.consumer.DocumentMessageConsumerFactory;
034import org.nuxeo.ecm.platform.importer.mqueues.message.DocumentMessage;
035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.CQMQueues;
036import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;
037
038import java.io.File;
039import java.time.Duration;
040import java.util.concurrent.TimeUnit;
041
042/**
043 * @since 9.1
044 */
045@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1",
046        description = "Import mqueues document into repository.")
047public class DocumentConsumers {
048    private static final Log log = LogFactory.getLog(DocumentConsumers.class);
049    public static final String ID = "MQImporter.runDocumentConsumers";
050
051    @Context
052    protected OperationContext ctx;
053
054    @Param(name = "rootFolder")
055    protected String rootFolder;
056
057    @Param(name = "repositoryName", required = false)
058    protected String repositoryName;
059
060    @Param(name = "batchSize", required = false)
061    protected Integer batchSize = 10;
062
063    @Param(name = "batchThresholdS", required = false)
064    protected Integer batchThresholdS = 20;
065
066    @Param(name = "retryMax", required = false)
067    protected Integer retryMax = 3;
068
069    @Param(name = "retryDelayS", required = false)
070    protected Integer retryDelayS = 2;
071
072    @Param(name = "queuePath", required = false)
073    protected String queuePath;
074
075    @OperationMethod
076    public void run() {
077        RandomBlobProducers.checkAccess(ctx);
078        queuePath = getQueuePath();
079        repositoryName = getRepositoryName();
080        try (MQueues<DocumentMessage> mQueues = new CQMQueues<>(new File(queuePath))) {
081            DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(mQueues,
082                    new DocumentMessageConsumerFactory(repositoryName, rootFolder),
083                    ConsumerPolicy.builder()
084                            .batchPolicy(BatchPolicy.builder().capacity(batchSize)
085                                    .timeThreshold(Duration.ofSeconds(batchThresholdS))
086                                    .build())
087                            .retryPolicy(
088                                    new RetryPolicy().withMaxRetries(retryMax).withDelay(retryDelayS, TimeUnit.SECONDS))
089                            .salted()
090                            .build());
091            consumers.start().get();
092        } catch (Exception e) {
093            log.error(e.getMessage(), e);
094        }
095    }
096
097    private String getRepositoryName() {
098        if (repositoryName != null && !repositoryName.isEmpty()) {
099            return repositoryName;
100        }
101        return ctx.getCoreSession().getRepositoryName();
102    }
103
104    private String getQueuePath() {
105        if (queuePath != null && !queuePath.isEmpty()) {
106            return queuePath;
107        }
108        return RandomDocumentProducers.getDefaultDocumentQueuePath();
109    }
110
111}