001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.consumer; 020 021import net.jodah.failsafe.RetryPolicy; 022 023import java.time.Duration; 024 025/** 026 * The consumer policy drive the consumer pool and runner. 027 * 028 * @since 9.1 029 */ 030public class ConsumerPolicy { 031 public enum StartOffset {BEGIN, END, LAST_COMMITTED} 032 public static final RetryPolicy NO_RETRY = new RetryPolicy().withMaxRetries(0); 033 /** 034 * Consumer policy that stop on starvation and failure. 035 */ 036 public static final ConsumerPolicy BOUNDED = builder() 037 .waitMessageTimeout(Duration.ofSeconds(5)) 038 .continueOnFailure(false).build(); 039 /** 040 * Consumer policy that wait for ever for new message and skip failure. 041 */ 042 public static final ConsumerPolicy UNBOUNDED = builder() 043 .continueOnFailure(true) 044 .waitMessageForEver().build(); 045 046 private final BatchPolicy batchPolicy; 047 private final RetryPolicy retryPolicy; 048 private final boolean skipFailure; 049 private final Duration waitMessageTimeout; 050 private final StartOffset startOffset; 051 private final boolean salted; 052 053 public ConsumerPolicy(Builder builder) { 054 batchPolicy = builder.batchPolicy; 055 retryPolicy = builder.retryPolicy; 056 skipFailure = builder.skipFailure; 057 waitMessageTimeout = builder.waitMessageTimeout; 058 startOffset = builder.startOffset; 059 salted = builder.salted; 060 } 061 062 public BatchPolicy getBatchPolicy() { 063 return batchPolicy; 064 } 065 066 public RetryPolicy getRetryPolicy() { 067 return retryPolicy; 068 } 069 070 public boolean continueOnFailure() { 071 return skipFailure; 072 } 073 074 public Duration getWaitMessageTimeout() { 075 return waitMessageTimeout; 076 } 077 078 public StartOffset getStartOffset() { 079 return startOffset; 080 } 081 082 public boolean isSalted() { 083 return salted; 084 } 085 086 087 public static Builder builder() { 088 return new Builder(); 089 } 090 091 public static class Builder { 092 private BatchPolicy batchPolicy = BatchPolicy.DEFAULT; 093 private RetryPolicy retryPolicy = NO_RETRY; 094 private boolean skipFailure = false; 095 private Duration waitMessageTimeout = Duration.ofSeconds(2); 096 private StartOffset startOffset = StartOffset.LAST_COMMITTED; 097 private boolean salted = false; 098 099 protected Builder() { 100 101 } 102 103 public Builder batchPolicy(BatchPolicy policy) { 104 batchPolicy = policy; 105 return this; 106 } 107 108 public Builder retryPolicy(RetryPolicy policy) { 109 retryPolicy = policy; 110 return this; 111 } 112 113 /** 114 * Continue on next message even if the retry policy has failed. 115 */ 116 public Builder continueOnFailure(boolean value) { 117 skipFailure = value; 118 return this; 119 } 120 121 /** 122 * Consumer will stop if there is no more message after this timeout. 123 */ 124 public Builder waitMessageTimeout(Duration duration) { 125 waitMessageTimeout = duration; 126 return this; 127 } 128 129 /** 130 * Consumer will wait for ever message. 131 */ 132 public Builder waitMessageForEver() { 133 waitMessageTimeout = Duration.ofSeconds(Integer.MAX_VALUE); 134 return this; 135 } 136 137 /** 138 * Where to read the first message. 139 */ 140 public Builder startOffset(StartOffset startOffset) { 141 this.startOffset = startOffset; 142 return this; 143 } 144 145 /** 146 * Consumer will wait some random time before start, to prevent wave of concurency in batch processing. 147 */ 148 public Builder salted() { 149 salted = true; 150 return this; 151 } 152 153 public ConsumerPolicy build() { 154 return new ConsumerPolicy(this); 155 } 156 } 157}