001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.producer; 020 021import java.util.Collections; 022 023import org.nuxeo.importer.stream.message.BlobInfoMessage; 024import org.nuxeo.importer.stream.message.DocumentMessage; 025import org.nuxeo.lib.stream.log.LogManager; 026import org.nuxeo.lib.stream.log.LogPartition; 027import org.nuxeo.lib.stream.log.LogTailer; 028import org.nuxeo.lib.stream.pattern.producer.ProducerFactory; 029import org.nuxeo.lib.stream.pattern.producer.ProducerIterator; 030 031/** 032 * @since 9.1 033 */ 034public class RandomDocumentMessageProducerFactory implements ProducerFactory<DocumentMessage> { 035 protected final long nbDocuments; 036 037 protected final String lang; 038 039 protected final int blobSizeKb; 040 041 protected final LogManager manager; 042 043 protected final String logName; 044 045 /** 046 * Generates random document messages that contains random blob. 047 */ 048 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, int blobSizeKb) { 049 this.nbDocuments = nbDocuments; 050 this.lang = lang; 051 this.manager = null; 052 this.blobSizeKb = blobSizeKb; 053 this.logName = null; 054 } 055 056 /** 057 * Generates random documents messages that point to existing blobs. 058 */ 059 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, LogManager manager, 060 String logBlobInfoName) { 061 this.nbDocuments = nbDocuments; 062 this.lang = lang; 063 this.manager = manager; 064 this.logName = logBlobInfoName; 065 this.blobSizeKb = 0; 066 } 067 068 @Override 069 public ProducerIterator<DocumentMessage> createProducer(int producerId) { 070 BlobInfoFetcher fetcher = null; 071 if (manager != null) { 072 LogTailer<BlobInfoMessage> tailer = manager.createTailer(getGroupName(producerId), 073 Collections.singleton(LogPartition.of(logName, 0))); 074 fetcher = new RandomLogBlobInfoFetcher(tailer); 075 } 076 return new RandomDocumentMessageProducer(producerId, nbDocuments, lang, fetcher).withBlob(blobSizeKb, false); 077 } 078 079 protected String getGroupName(int producerId) { 080 return "RandomDocumentMessageProducer." + producerId; 081 } 082 083}