001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer; 020 021import net.jodah.failsafe.RetryPolicy; 022 023import java.time.Duration; 024 025/** 026 * The consumer policy drive the consumer pool and runner. 027 * 028 * @since 9.1 029 */ 030public class ConsumerPolicy { 031 public static final String DEFAULT_NAME = "default"; 032 033 public enum StartOffset {BEGIN, END, LAST_COMMITTED} 034 035 public static final RetryPolicy NO_RETRY = new RetryPolicy().withMaxRetries(0); 036 /** 037 * Consumer policy that stop on starvation and failure. 038 */ 039 public static final ConsumerPolicy BOUNDED = builder() 040 .waitMessageTimeout(Duration.ofSeconds(5)) 041 .continueOnFailure(false).build(); 042 043 public static final ConsumerPolicy BOUNDED_RETRY = builder() 044 .waitMessageTimeout(Duration.ofSeconds(5)) 045 .retryPolicy(new RetryPolicy().withMaxRetries(3)) 046 .continueOnFailure(false).build(); 047 048 /** 049 * Consumer policy that wait for ever for new message and skip failure. 050 */ 051 public static final ConsumerPolicy UNBOUNDED = builder() 052 .continueOnFailure(true) 053 .waitMessageForEver().build(); 054 055 public static final ConsumerPolicy UNBOUNDED_RETRY = builder() 056 .continueOnFailure(true) 057 .retryPolicy(new RetryPolicy().withMaxRetries(3)) 058 .waitMessageForEver().build(); 059 060 private final BatchPolicy batchPolicy; 061 private final RetryPolicy retryPolicy; 062 private final boolean skipFailure; 063 private final Duration waitMessageTimeout; 064 private final StartOffset startOffset; 065 private final boolean salted; 066 private final String name; 067 private final short maxThreads; 068 069 public ConsumerPolicy(ConsumerPolicyBuilder builder) { 070 batchPolicy = builder.batchPolicy; 071 retryPolicy = builder.retryPolicy; 072 skipFailure = builder.skipFailure; 073 waitMessageTimeout = builder.waitMessageTimeout; 074 startOffset = builder.startOffset; 075 salted = builder.salted; 076 maxThreads = builder.maxThreads; 077 if (builder.name != null) { 078 name = builder.name; 079 } else { 080 name = DEFAULT_NAME; 081 } 082 } 083 084 public BatchPolicy getBatchPolicy() { 085 return batchPolicy; 086 } 087 088 public RetryPolicy getRetryPolicy() { 089 return retryPolicy; 090 } 091 092 public boolean continueOnFailure() { 093 return skipFailure; 094 } 095 096 public Duration getWaitMessageTimeout() { 097 return waitMessageTimeout; 098 } 099 100 public StartOffset getStartOffset() { 101 return startOffset; 102 } 103 104 public boolean isSalted() { 105 return salted; 106 } 107 108 public String getName() { 109 return name; 110 } 111 112 public short getMaxThreads() { 113 return maxThreads; 114 } 115 116 public static ConsumerPolicyBuilder builder() { 117 return new ConsumerPolicyBuilder(); 118 } 119 120 @Override 121 public String toString() { 122 return "ConsumerPolicy{" + 123 "batchPolicy=" + batchPolicy + 124 ", retryPolicy=" + retryPolicy + 125 ", skipFailure=" + skipFailure + 126 ", waitMessageTimeout=" + waitMessageTimeout + 127 ", startOffset=" + startOffset + 128 ", salted=" + salted + 129 ", name='" + name + '\'' + 130 ", maxThreads=" + maxThreads + 131 '}'; 132 } 133 134 135}