001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.time.Duration;
022
023import net.jodah.failsafe.RetryPolicy;
024
025/**
026 * Builder to create a ComputationPolicy.
027 *
028 * @since 10.3
029 */
030public class ComputationPolicyBuilder {
031
032    protected static final int DEFAULT_BATCH_CAPACITY = 1;
033
034    protected static final int DEFAULT_BATCH_THRESHOLD_SECOND = 1;
035
036    protected RetryPolicy retryPolicy = ComputationPolicy.NO_RETRY;
037
038    protected boolean skipFailure = false;
039
040    protected int skipFirstFailures = 0;
041
042    protected int batchCapacity = DEFAULT_BATCH_CAPACITY;
043
044    protected Duration batchThreshold = Duration.ofSeconds(DEFAULT_BATCH_THRESHOLD_SECOND);
045
046    public ComputationPolicyBuilder() {
047        // Empty constructor
048    }
049
050    /**
051     * Defines how to group records by batch using a capacity and a time threshold.
052     * <p>
053     * This is used only by computation that extends AbstractBatchComputation.
054     *
055     * @param capacity the number of records in the batch
056     * @param timeThreshold process the batch even if not full after this duration
057     */
058    public ComputationPolicyBuilder batchPolicy(int capacity, Duration timeThreshold) {
059        batchCapacity = capacity;
060        batchThreshold = timeThreshold;
061        return this;
062    }
063
064    /**
065     * Defines what to do in case of failure during the batch processing.
066     */
067    public ComputationPolicyBuilder retryPolicy(RetryPolicy policy) {
068        retryPolicy = policy;
069        return this;
070    }
071
072    /**
073     * The fallback when processing a batch has failed after applying the retry policy has failed.
074     *
075     * @param value When {@code true} Skips the records affected by the batch in failure and continue.<p>
076     *            When {@code false} aborts the computation, this is the default behavior.
077     */
078    public ComputationPolicyBuilder continueOnFailure(boolean value) {
079        skipFailure = value;
080        return this;
081    }
082
083    /**
084     * A recovery fallback that continues processing after failure only for a limited number of time.
085     *
086     * @since 11.1
087     */
088    public ComputationPolicyBuilder skipFirstFailures(int failures) {
089        if (failures > 0) {
090            skipFirstFailures = failures;
091            skipFailure = false;
092        }
093        return this;
094    }
095
096    /**
097     * Creates the policy.
098     */
099    public ComputationPolicy build() {
100        return new ComputationPolicy(this);
101    }
102
103}