001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern.consumer;
020
021import java.time.Duration;
022
023import net.jodah.failsafe.RetryPolicy;
024
025/**
026 * The consumer policy drive the consumer pool and runner.
027 *
028 * @since 9.1
029 */
030public class ConsumerPolicy {
031    public static final String DEFAULT_NAME = "default";
032
033    public enum StartOffset {
034        BEGIN, END, LAST_COMMITTED
035    }
036
037    public static final RetryPolicy NO_RETRY = new RetryPolicy().withMaxRetries(0);
038
039    /**
040     * Consumer policy that stop on starvation and failure.
041     */
042    public static final ConsumerPolicy BOUNDED = builder().waitMessageTimeout(Duration.ofSeconds(5))
043                                                          .continueOnFailure(false)
044                                                          .build();
045
046    public static final ConsumerPolicy BOUNDED_RETRY = builder().waitMessageTimeout(Duration.ofSeconds(5))
047                                                                .retryPolicy(new RetryPolicy().withMaxRetries(3))
048                                                                .continueOnFailure(false)
049                                                                .build();
050
051    /**
052     * Consumer policy that wait for ever for new message and skip failure.
053     */
054    public static final ConsumerPolicy UNBOUNDED = builder().continueOnFailure(true).waitMessageForEver().build();
055
056    public static final ConsumerPolicy UNBOUNDED_RETRY = builder().continueOnFailure(true)
057                                                                  .retryPolicy(new RetryPolicy().withMaxRetries(3))
058                                                                  .waitMessageForEver()
059                                                                  .build();
060
061    protected final BatchPolicy batchPolicy;
062
063    protected final RetryPolicy retryPolicy;
064
065    protected final boolean skipFailure;
066
067    protected final Duration waitMessageTimeout;
068
069    protected final StartOffset startOffset;
070
071    protected final boolean salted;
072
073    protected final String name;
074
075    protected final short maxThreads;
076
077    public ConsumerPolicy(ConsumerPolicyBuilder builder) {
078        batchPolicy = builder.batchPolicy;
079        retryPolicy = builder.retryPolicy;
080        skipFailure = builder.skipFailure;
081        waitMessageTimeout = builder.waitMessageTimeout;
082        startOffset = builder.startOffset;
083        salted = builder.salted;
084        maxThreads = builder.maxThreads;
085        if (builder.name != null) {
086            name = builder.name;
087        } else {
088            name = DEFAULT_NAME;
089        }
090    }
091
092    public BatchPolicy getBatchPolicy() {
093        return batchPolicy;
094    }
095
096    public RetryPolicy getRetryPolicy() {
097        return retryPolicy;
098    }
099
100    public boolean continueOnFailure() {
101        return skipFailure;
102    }
103
104    public Duration getWaitMessageTimeout() {
105        return waitMessageTimeout;
106    }
107
108    public StartOffset getStartOffset() {
109        return startOffset;
110    }
111
112    public boolean isSalted() {
113        return salted;
114    }
115
116    public String getName() {
117        return name;
118    }
119
120    public short getMaxThreads() {
121        return maxThreads;
122    }
123
124    public static ConsumerPolicyBuilder builder() {
125        return new ConsumerPolicyBuilder();
126    }
127
128    @Override
129    public String toString() {
130        return "ConsumerPolicy{" + "batchPolicy=" + batchPolicy + ", retryPolicy=" + retryPolicy + ", skipFailure="
131                + skipFailure + ", waitMessageTimeout=" + waitMessageTimeout + ", startOffset=" + startOffset
132                + ", salted=" + salted + ", name='" + name + '\'' + ", maxThreads=" + maxThreads + '}';
133    }
134
135}