001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.importer.stream.producer; 020 021import java.util.Collections; 022 023import org.nuxeo.importer.stream.StreamImporters; 024import org.nuxeo.importer.stream.message.BlobInfoMessage; 025import org.nuxeo.importer.stream.message.DocumentMessage; 026import org.nuxeo.lib.stream.codec.Codec; 027import org.nuxeo.lib.stream.log.LogManager; 028import org.nuxeo.lib.stream.log.LogPartition; 029import org.nuxeo.lib.stream.log.LogTailer; 030import org.nuxeo.lib.stream.log.Name; 031import org.nuxeo.lib.stream.pattern.producer.ProducerFactory; 032import org.nuxeo.lib.stream.pattern.producer.ProducerIterator; 033 034/** 035 * @since 9.1 036 */ 037public class RandomDocumentMessageProducerFactory implements ProducerFactory<DocumentMessage> { 038 protected final long nbDocuments; 039 040 protected final String lang; 041 042 protected final int blobSizeKb; 043 044 protected final LogManager manager; 045 046 protected final String logName; 047 048 protected final boolean countFolderAsDocument; 049 050 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, int blobSizeKb) { 051 this(nbDocuments, lang, blobSizeKb, true); 052 } 053 054 /** 055 * Generates random document messages that contains random blob. 056 */ 057 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, int blobSizeKb, 058 Boolean countFolderAsDocument) { 059 this.nbDocuments = nbDocuments; 060 this.lang = lang; 061 this.manager = null; 062 this.blobSizeKb = blobSizeKb; 063 this.logName = null; 064 this.countFolderAsDocument = countFolderAsDocument; 065 } 066 067 /** 068 * Generates random documents messages that point to existing blobs. 069 */ 070 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, LogManager manager, 071 String logBlobInfoName, Boolean countFolderAsDocument) { 072 this.nbDocuments = nbDocuments; 073 this.lang = lang; 074 this.manager = manager; 075 this.logName = logBlobInfoName; 076 this.blobSizeKb = 0; 077 this.countFolderAsDocument = countFolderAsDocument; 078 } 079 080 public RandomDocumentMessageProducerFactory(long nbDocuments, String lang, LogManager manager, 081 String logBlobInfoName) { 082 this(nbDocuments, lang, manager, logBlobInfoName, true); 083 } 084 085 @SuppressWarnings("resource") 086 @Override 087 public ProducerIterator<DocumentMessage> createProducer(int producerId) { 088 BlobInfoFetcher fetcher = null; 089 if (manager != null) { 090 // read only on the first partition 091 Codec<BlobInfoMessage> blobInfoCodec = StreamImporters.getBlobInfoCodec(); 092 LogTailer<BlobInfoMessage> tailer = manager.createTailer(getGroupName(producerId), 093 Collections.singleton(LogPartition.of(Name.ofUrn(logName), 0)), blobInfoCodec); 094 fetcher = new RandomLogBlobInfoFetcher(tailer); 095 } 096 return new RandomDocumentMessageProducer(producerId, nbDocuments, lang, fetcher) // NOSONAR (factory) 097 .withBlob(blobSizeKb, false) 098 .countFolderAsDocument( 099 countFolderAsDocument); 100 } 101 102 protected Name getGroupName(int producerId) { 103 return Name.ofUrn("importer/RandomDocumentMessageProducer-" + producerId); 104 } 105 106}