001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.automation; 020 021import net.jodah.failsafe.RetryPolicy; 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.nuxeo.ecm.automation.OperationContext; 025import org.nuxeo.ecm.automation.core.Constants; 026import org.nuxeo.ecm.automation.core.annotations.Context; 027import org.nuxeo.ecm.automation.core.annotations.Operation; 028import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 029import org.nuxeo.ecm.automation.core.annotations.Param; 030import org.nuxeo.ecm.platform.importer.mqueues.consumer.BatchPolicy; 031import org.nuxeo.ecm.platform.importer.mqueues.consumer.ConsumerPolicy; 032import org.nuxeo.ecm.platform.importer.mqueues.consumer.DocumentConsumerPool; 033import org.nuxeo.ecm.platform.importer.mqueues.consumer.DocumentMessageConsumerFactory; 034import org.nuxeo.ecm.platform.importer.mqueues.message.DocumentMessage; 035import org.nuxeo.ecm.platform.importer.mqueues.mqueues.CQMQueues; 036import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues; 037 038import java.io.File; 039import java.time.Duration; 040import java.util.concurrent.TimeUnit; 041 042/** 043 * @since 9.1 044 */ 045@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1", 046 description = "Import mqueues document into repository.") 047public class DocumentConsumers { 048 private static final Log log = LogFactory.getLog(DocumentConsumers.class); 049 public static final String ID = "MQImporter.runDocumentConsumers"; 050 051 @Context 052 protected OperationContext ctx; 053 054 @Param(name = "rootFolder") 055 protected String rootFolder; 056 057 @Param(name = "repositoryName", required = false) 058 protected String repositoryName; 059 060 @Param(name = "batchSize", required = false) 061 protected Integer batchSize = 10; 062 063 @Param(name = "batchThresholdS", required = false) 064 protected Integer batchThresholdS = 20; 065 066 @Param(name = "retryMax", required = false) 067 protected Integer retryMax = 3; 068 069 @Param(name = "retryDelayS", required = false) 070 protected Integer retryDelayS = 2; 071 072 @Param(name = "queuePath", required = false) 073 protected String queuePath; 074 075 @OperationMethod 076 public void run() { 077 RandomBlobProducers.checkAccess(ctx); 078 queuePath = getQueuePath(); 079 repositoryName = getRepositoryName(); 080 try (MQueues<DocumentMessage> mQueues = new CQMQueues<>(new File(queuePath))) { 081 DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(mQueues, 082 new DocumentMessageConsumerFactory(repositoryName, rootFolder), 083 ConsumerPolicy.builder() 084 .batchPolicy(BatchPolicy.builder().capacity(batchSize) 085 .timeThreshold(Duration.ofSeconds(batchThresholdS)) 086 .build()) 087 .retryPolicy( 088 new RetryPolicy().withMaxRetries(retryMax).withDelay(retryDelayS, TimeUnit.SECONDS)) 089 .salted() 090 .build()); 091 consumers.start().get(); 092 } catch (Exception e) { 093 log.error(e.getMessage(), e); 094 } 095 } 096 097 private String getRepositoryName() { 098 if (repositoryName != null && !repositoryName.isEmpty()) { 099 return repositoryName; 100 } 101 return ctx.getCoreSession().getRepositoryName(); 102 } 103 104 private String getQueuePath() { 105 if (queuePath != null && !queuePath.isEmpty()) { 106 return queuePath; 107 } 108 return RandomDocumentProducers.getDefaultDocumentQueuePath(); 109 } 110 111}