001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.producer; 020 021import java.util.Collections; 022 023import org.nuxeo.importer.stream.message.BlobInfoMessage; 024import org.nuxeo.importer.stream.message.DocumentMessage; 025import org.nuxeo.lib.stream.log.LogManager; 026import org.nuxeo.lib.stream.log.LogPartition; 027import org.nuxeo.lib.stream.log.LogTailer; 028import org.nuxeo.lib.stream.pattern.producer.ProducerFactory; 029import org.nuxeo.lib.stream.pattern.producer.ProducerIterator; 030 031/** 032 * @since 9.1 033 */ 034public class RandomDocumentMessageProducerFactory implements ProducerFactory<DocumentMessage> { 035 protected final long nbDocuments; 036 037 protected final String lang; 038 039 protected final int blobSizeKb; 040 041 protected final LogManager manager; 042 043 protected final String logName; 044 045 protected final boolean countFolderAsDocument; 046 047 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, int blobSizeKb) { 048 this(nbDocuments, lang, blobSizeKb, true); 049 } 050 051 /** 052 * Generates random document messages that contains random blob. 053 */ 054 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, int blobSizeKb, 055 Boolean countFolderAsDocument) { 056 this.nbDocuments = nbDocuments; 057 this.lang = lang; 058 this.manager = null; 059 this.blobSizeKb = blobSizeKb; 060 this.logName = null; 061 this.countFolderAsDocument = countFolderAsDocument; 062 } 063 064 /** 065 * Generates random documents messages that point to existing blobs. 066 */ 067 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, LogManager manager, 068 String logBlobInfoName, Boolean countFolderAsDocument) { 069 this.nbDocuments = nbDocuments; 070 this.lang = lang; 071 this.manager = manager; 072 this.logName = logBlobInfoName; 073 this.blobSizeKb = 0; 074 this.countFolderAsDocument = countFolderAsDocument; 075 } 076 077 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, LogManager manager, 078 String logBlobInfoName) { 079 this(nbDocuments, lang, manager, logBlobInfoName, true); 080 } 081 082 @SuppressWarnings("resource") 083 @Override 084 public ProducerIterator<DocumentMessage> createProducer(int producerId) { 085 BlobInfoFetcher fetcher = null; 086 if (manager != null) { 087 // read only on the first partition 088 LogTailer<BlobInfoMessage> tailer = manager.createTailer(getGroupName(producerId), 089 Collections.singleton(LogPartition.of(logName, 0))); 090 fetcher = new RandomLogBlobInfoFetcher(tailer); 091 } 092 return new RandomDocumentMessageProducer(producerId, nbDocuments, lang, fetcher) // NOSONAR (factory) 093 .withBlob(blobSizeKb, false) 094 .countFolderAsDocument( 095 countFolderAsDocument); 096 } 097 098 protected String getGroupName(int producerId) { 099 return "RandomDocumentMessageProducer." + producerId; 100 } 101 102}