001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.consumer;
020
021import net.jodah.failsafe.RetryPolicy;
022
023import java.time.Duration;
024
025/**
026 * The consumer policy drive the consumer pool and runner.
027 *
028 * @since 9.1
029 */
030public class ConsumerPolicy {
031    public enum StartOffset {BEGIN, END, LAST_COMMITTED}
032    public static final RetryPolicy NO_RETRY = new RetryPolicy().withMaxRetries(0);
033    /**
034     * Consumer policy that stop on starvation and failure.
035     */
036    public static final ConsumerPolicy BOUNDED = builder()
037            .waitMessageTimeout(Duration.ofSeconds(5))
038            .continueOnFailure(false).build();
039    /**
040     * Consumer policy that wait for ever for new message and skip failure.
041     */
042    public static final ConsumerPolicy UNBOUNDED = builder()
043            .continueOnFailure(true)
044            .waitMessageForEver().build();
045
046    private final BatchPolicy batchPolicy;
047    private final RetryPolicy retryPolicy;
048    private final boolean skipFailure;
049    private final Duration waitMessageTimeout;
050    private final StartOffset startOffset;
051    private final boolean salted;
052
053    public ConsumerPolicy(Builder builder) {
054        batchPolicy = builder.batchPolicy;
055        retryPolicy = builder.retryPolicy;
056        skipFailure = builder.skipFailure;
057        waitMessageTimeout = builder.waitMessageTimeout;
058        startOffset = builder.startOffset;
059        salted = builder.salted;
060    }
061
062    public BatchPolicy getBatchPolicy() {
063        return batchPolicy;
064    }
065
066    public RetryPolicy getRetryPolicy() {
067        return retryPolicy;
068    }
069
070    public boolean continueOnFailure() {
071        return skipFailure;
072    }
073
074    public Duration getWaitMessageTimeout() {
075        return waitMessageTimeout;
076    }
077
078    public StartOffset getStartOffset() {
079        return startOffset;
080    }
081
082    public boolean isSalted() {
083        return salted;
084    }
085
086
087    public static Builder builder() {
088        return new Builder();
089    }
090
091    public static class Builder {
092        private BatchPolicy batchPolicy = BatchPolicy.DEFAULT;
093        private RetryPolicy retryPolicy = NO_RETRY;
094        private boolean skipFailure = false;
095        private Duration waitMessageTimeout = Duration.ofSeconds(2);
096        private StartOffset startOffset = StartOffset.LAST_COMMITTED;
097        private boolean salted = false;
098
099        protected Builder() {
100
101        }
102
103        public Builder batchPolicy(BatchPolicy policy) {
104            batchPolicy = policy;
105            return this;
106        }
107
108        public Builder retryPolicy(RetryPolicy policy) {
109            retryPolicy = policy;
110            return this;
111        }
112
113        /**
114         * Continue on next message even if the retry policy has failed.
115         */
116        public Builder continueOnFailure(boolean value) {
117            skipFailure = value;
118            return this;
119        }
120
121        /**
122         * Consumer will stop if there is no more message after this timeout.
123         */
124        public Builder waitMessageTimeout(Duration duration) {
125            waitMessageTimeout = duration;
126            return this;
127        }
128
129        /**
130         * Consumer will wait for ever message.
131         */
132        public Builder waitMessageForEver() {
133            waitMessageTimeout = Duration.ofSeconds(Integer.MAX_VALUE);
134            return this;
135        }
136
137        /**
138         * Where to read the first message.
139         */
140        public Builder startOffset(StartOffset startOffset) {
141            this.startOffset = startOffset;
142            return this;
143        }
144
145        /**
146         * Consumer will wait some random time before start, to prevent wave of concurency in batch processing.
147         */
148        public Builder salted() {
149            salted = true;
150            return this;
151        }
152
153        public ConsumerPolicy build() {
154            return new ConsumerPolicy(this);
155        }
156    }
157}