001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Benoit Delbosc 018 */ 019package org.nuxeo.importer.stream.automation; 020 021import static org.nuxeo.importer.stream.automation.BlobConsumers.DEFAULT_LOG_CONFIG; 022 023import java.time.Duration; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.TimeUnit; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.ecm.automation.OperationContext; 030import org.nuxeo.ecm.automation.OperationException; 031import org.nuxeo.ecm.automation.core.Constants; 032import org.nuxeo.ecm.automation.core.annotations.Context; 033import org.nuxeo.ecm.automation.core.annotations.Operation; 034import org.nuxeo.ecm.automation.core.annotations.OperationMethod; 035import org.nuxeo.ecm.automation.core.annotations.Param; 036import org.nuxeo.importer.stream.consumer.DocumentConsumerPolicy; 037import org.nuxeo.importer.stream.consumer.DocumentConsumerPool; 038import org.nuxeo.importer.stream.consumer.DocumentMessageConsumerFactory; 039import org.nuxeo.importer.stream.message.DocumentMessage; 040import org.nuxeo.lib.stream.log.LogManager; 041import org.nuxeo.lib.stream.pattern.consumer.BatchPolicy; 042import org.nuxeo.lib.stream.pattern.consumer.ConsumerPolicy; 043import org.nuxeo.runtime.api.Framework; 044import org.nuxeo.runtime.stream.StreamService; 045 046import net.jodah.failsafe.RetryPolicy; 047 048/** 049 * @since 9.1 050 */ 051@Operation(id = DocumentConsumers.ID, category = Constants.CAT_SERVICES, label = "Imports document", since = "9.1", description = "Import documents into repository.") 052public class DocumentConsumers { 053 private static final Log log = LogFactory.getLog(DocumentConsumers.class); 054 055 public static final String ID = "StreamImporter.runDocumentConsumers"; 056 057 @Context 058 protected OperationContext ctx; 059 060 @Param(name = "nbThreads", required = false) 061 protected Integer nbThreads; 062 063 @Param(name = "rootFolder") 064 protected String rootFolder; 065 066 @Param(name = "repositoryName", required = false) 067 protected String repositoryName; 068 069 @Param(name = "batchSize", required = false) 070 protected Integer batchSize = 10; 071 072 @Param(name = "batchThresholdS", required = false) 073 protected Integer batchThresholdS = 20; 074 075 @Param(name = "retryMax", required = false) 076 protected Integer retryMax = 3; 077 078 @Param(name = "retryDelayS", required = false) 079 protected Integer retryDelayS = 2; 080 081 @Param(name = "logName", required = false) 082 protected String logName; 083 084 @Param(name = "logConfig", required = false) 085 protected String logConfig; 086 087 @Param(name = "blockIndexing", required = false) 088 protected Boolean blockIndexing = false; 089 090 @Param(name = "blockAsyncListeners", required = false) 091 protected Boolean blockAsyncListeners = false; 092 093 @Param(name = "blockPostCommitListeners", required = false) 094 protected Boolean blockPostCommitListeners = false; 095 096 @Param(name = "blockDefaultSyncListeners", required = false) 097 protected Boolean blockSyncListeners = false; 098 099 @Param(name = "useBulkMode", required = false) 100 protected Boolean useBulkMode = false; 101 102 @Param(name = "waitMessageTimeoutSeconds", required = false) 103 protected Integer waitMessageTimeoutSeconds = 20; 104 105 @OperationMethod 106 public void run() throws OperationException { 107 RandomBlobProducers.checkAccess(ctx); 108 repositoryName = getRepositoryName(); 109 ConsumerPolicy consumerPolicy = DocumentConsumerPolicy.builder() 110 .blockIndexing(blockIndexing) 111 .blockAsyncListeners(blockAsyncListeners) 112 .blockPostCommitListeners(blockPostCommitListeners) 113 .blockDefaultSyncListener(blockSyncListeners) 114 .useBulkMode(useBulkMode) 115 .name(ID) 116 .batchPolicy(BatchPolicy.builder() 117 .capacity(batchSize) 118 .timeThreshold(Duration.ofSeconds( 119 batchThresholdS)) 120 .build()) 121 .retryPolicy(new RetryPolicy().withMaxRetries(retryMax) 122 .withDelay(retryDelayS, 123 TimeUnit.SECONDS)) 124 .maxThreads(getNbThreads()) 125 .waitMessageTimeout( 126 Duration.ofSeconds(waitMessageTimeoutSeconds)) 127 .salted() 128 .build(); 129 log.warn(String.format("Import documents from log: %s into: %s/%s, with policy: %s", getLogName(), 130 repositoryName, rootFolder, (DocumentConsumerPolicy) consumerPolicy)); 131 StreamService service = Framework.getService(StreamService.class); 132 LogManager manager = service.getLogManager(getLogConfig()); 133 try (DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<>(getLogName(), manager, 134 new DocumentMessageConsumerFactory(repositoryName, rootFolder), consumerPolicy)) { 135 consumers.start().get(); 136 } catch (InterruptedException e) { 137 Thread.currentThread().interrupt(); 138 log.warn("Operation interrupted"); 139 throw new RuntimeException(e); 140 } catch (ExecutionException e) { 141 log.error("Operation fails", e); 142 throw new OperationException(e); 143 } 144 } 145 146 protected short getNbThreads() { 147 if (nbThreads != null) { 148 return nbThreads.shortValue(); 149 } 150 return 0; 151 } 152 153 protected String getRepositoryName() { 154 if (repositoryName != null && !repositoryName.isEmpty()) { 155 return repositoryName; 156 } 157 return ctx.getCoreSession().getRepositoryName(); 158 } 159 160 protected String getLogName() { 161 if (logName != null) { 162 return logName; 163 } 164 return RandomDocumentProducers.DEFAULT_DOC_LOG_NAME; 165 } 166 167 protected String getLogConfig() { 168 if (logConfig != null) { 169 return logConfig; 170 } 171 return DEFAULT_LOG_CONFIG; 172 } 173}