001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer;
020
021import net.jodah.failsafe.RetryPolicy;
022
023import java.time.Duration;
024
025/**
026 * The consumer policy drive the consumer pool and runner.
027 *
028 * @since 9.1
029 */
030public class ConsumerPolicy {
031    public static final String DEFAULT_NAME = "default";
032
033    public enum StartOffset {BEGIN, END, LAST_COMMITTED}
034
035    public static final RetryPolicy NO_RETRY = new RetryPolicy().withMaxRetries(0);
036    /**
037     * Consumer policy that stop on starvation and failure.
038     */
039    public static final ConsumerPolicy BOUNDED = builder()
040            .waitMessageTimeout(Duration.ofSeconds(5))
041            .continueOnFailure(false).build();
042
043    public static final ConsumerPolicy BOUNDED_RETRY = builder()
044            .waitMessageTimeout(Duration.ofSeconds(5))
045            .retryPolicy(new RetryPolicy().withMaxRetries(3))
046            .continueOnFailure(false).build();
047
048    /**
049     * Consumer policy that wait for ever for new message and skip failure.
050     */
051    public static final ConsumerPolicy UNBOUNDED = builder()
052            .continueOnFailure(true)
053            .waitMessageForEver().build();
054
055    public static final ConsumerPolicy UNBOUNDED_RETRY = builder()
056            .continueOnFailure(true)
057            .retryPolicy(new RetryPolicy().withMaxRetries(3))
058            .waitMessageForEver().build();
059
060    private final BatchPolicy batchPolicy;
061    private final RetryPolicy retryPolicy;
062    private final boolean skipFailure;
063    private final Duration waitMessageTimeout;
064    private final StartOffset startOffset;
065    private final boolean salted;
066    private final String name;
067    private final short maxThreads;
068
069    public ConsumerPolicy(ConsumerPolicyBuilder builder) {
070        batchPolicy = builder.batchPolicy;
071        retryPolicy = builder.retryPolicy;
072        skipFailure = builder.skipFailure;
073        waitMessageTimeout = builder.waitMessageTimeout;
074        startOffset = builder.startOffset;
075        salted = builder.salted;
076        maxThreads = builder.maxThreads;
077        if (builder.name != null) {
078            name = builder.name;
079        } else {
080            name = DEFAULT_NAME;
081        }
082    }
083
084    public BatchPolicy getBatchPolicy() {
085        return batchPolicy;
086    }
087
088    public RetryPolicy getRetryPolicy() {
089        return retryPolicy;
090    }
091
092    public boolean continueOnFailure() {
093        return skipFailure;
094    }
095
096    public Duration getWaitMessageTimeout() {
097        return waitMessageTimeout;
098    }
099
100    public StartOffset getStartOffset() {
101        return startOffset;
102    }
103
104    public boolean isSalted() {
105        return salted;
106    }
107
108    public String getName() {
109        return name;
110    }
111
112    public short getMaxThreads() {
113        return maxThreads;
114    }
115
116    public static ConsumerPolicyBuilder builder() {
117        return new ConsumerPolicyBuilder();
118    }
119
120    @Override
121    public String toString() {
122        return "ConsumerPolicy{" +
123                "batchPolicy=" + batchPolicy +
124                ", retryPolicy=" + retryPolicy +
125                ", skipFailure=" + skipFailure +
126                ", waitMessageTimeout=" + waitMessageTimeout +
127                ", startOffset=" + startOffset +
128                ", salted=" + salted +
129                ", name='" + name + '\'' +
130                ", maxThreads=" + maxThreads +
131                '}';
132    }
133
134
135}