001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.importer.stream.automation; 020 021import static org.nuxeo.importer.stream.automation.BlobConsumers.DEFAULT_LOG_CONFIG; 022 023import java.time.Duration; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.ecm.automation.OperationContext; 029import org.nuxeo.ecm.automation.core.Constants; 030import org.nuxeo.ecm.automation.core.annotations.Context; 031import org.nuxeo.ecm.automation.core.annotations.Operation; 032import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 033import org.nuxeo.ecm.automation.core.annotations.Param; 034import org.nuxeo.importer.stream.consumer.DocumentConsumerPolicy; 035import org.nuxeo.importer.stream.consumer.DocumentConsumerPool; 036import org.nuxeo.importer.stream.consumer.DocumentMessageConsumerFactory; 037import org.nuxeo.importer.stream.message.DocumentMessage; 038import org.nuxeo.lib.stream.log.LogManager; 039import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy; 040import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 041import org.nuxeo.runtime.api.Framework; 042import org.nuxeo.runtime.stream.StreamService; 043 044import net.jodah.failsafe.RetryPolicy; 045 046/** 047 * @since 9.1 048 */ 049@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1", description = "Import documents into repository.") 050public class DocumentConsumers { 051 private static final Log log = LogFactory.getLog(DocumentConsumers.class); 052 053 public static final String ID = "StreamImporter.runDocumentConsumers"; 054 055 @Context 056 protected OperationContext ctx; 057 058 @Param(name = "nbThreads", required = false) 059 protected Integer nbThreads; 060 061 @Param(name = "rootFolder") 062 protected String rootFolder; 063 064 @Param(name = "repositoryName", required = false) 065 protected String repositoryName; 066 067 @Param(name = "batchSize", required = false) 068 protected Integer batchSize = 10; 069 070 @Param(name = "batchThresholdS", required = false) 071 protected Integer batchThresholdS = 20; 072 073 @Param(name = "retryMax", required = false) 074 protected Integer retryMax = 3; 075 076 @Param(name = "retryDelayS", required = false) 077 protected Integer retryDelayS = 2; 078 079 @Param(name = "logName", required = false) 080 protected String logName; 081 082 @Param(name = "logConfig", required = false) 083 protected String logConfig; 084 085 @Param(name = "blockIndexing", required = false) 086 protected Boolean blockIndexing = false; 087 088 @Param(name = "blockAsyncListeners", required = false) 089 protected Boolean blockAsyncListeners = false; 090 091 @Param(name = "blockPostCommitListeners", required = false) 092 protected Boolean blockPostCommitListeners = false; 093 094 @Param(name = "blockDefaultSyncListeners", required = false) 095 protected Boolean blockSyncListeners = false; 096 097 @Param(name = "useBulkMode", required = false) 098 protected Boolean useBulkMode = false; 099 100 @Param(name = "waitMessageTimeoutSeconds", required = false) 101 protected Integer waitMessageTimeoutSeconds = 20; 102 103 @OperationMethod 104 public void run() { 105 RandomBlobProducers.checkAccess(ctx); 106 repositoryName = getRepositoryName(); 107 ConsumerPolicy consumerPolicy = DocumentConsumerPolicy.builder() 108 .blockIndexing(blockIndexing) 109 .blockAsyncListeners(blockAsyncListeners) 110 .blockPostCommitListeners(blockPostCommitListeners) 111 .blockDefaultSyncListener(blockSyncListeners) 112 .useBulkMode(useBulkMode) 113 .name(ID) 114 .batchPolicy(BatchPolicy.builder() 115 .capacity(batchSize) 116 .timeThreshold(Duration.ofSeconds( 117 batchThresholdS)) 118 .build()) 119 .retryPolicy(new RetryPolicy().withMaxRetries(retryMax) 120 .withDelay(retryDelayS, 121 TimeUnit.SECONDS)) 122 .maxThreads(getNbThreads()) 123 .waitMessageTimeout( 124 Duration.ofSeconds(waitMessageTimeoutSeconds)) 125 .salted() 126 .build(); 127 log.warn(String.format("Import documents from log: %s into: %s/%s, with policy: %s", getMQName(), 128 repositoryName, rootFolder, (DocumentConsumerPolicy) consumerPolicy)); 129 StreamService service = Framework.getService(StreamService.class); 130 LogManager manager = service.getLogManager(getMQConfig()); 131 try (DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(getMQName(), manager, 132 new DocumentMessageConsumerFactory(repositoryName, rootFolder), consumerPolicy)) { 133 consumers.start().get(); 134 } catch (Exception e) { 135 log.error(e.getMessage(), e); 136 } 137 } 138 139 protected short getNbThreads() { 140 if (nbThreads != null) { 141 return nbThreads.shortValue(); 142 } 143 return 0; 144 } 145 146 protected String getRepositoryName() { 147 if (repositoryName != null && !repositoryName.isEmpty()) { 148 return repositoryName; 149 } 150 return ctx.getCoreSession().getRepositoryName(); 151 } 152 153 protected String getMQName() { 154 if (logName != null) { 155 return logName; 156 } 157 return RandomDocumentProducers.DEFAULT_DOC_LOG_NAME; 158 } 159 160 protected String getMQConfig() { 161 if (logConfig != null) { 162 return logConfig; 163 } 164 return DEFAULT_LOG_CONFIG; 165 } 166}