001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation.internals;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark;
024
025/**
026 * Keep track of minimum and maximum watermark level.
027 * On checkpoint move the low watermark to the previous maximum mark.
028 *
029 * @since 9.2
030 */
031public class WatermarkMonotonicInterval {
032    private static final Log log = LogFactory.getLog(WatermarkMonotonicInterval.class);
033    private volatile Watermark low = Watermark.LOWEST;
034    private Watermark lowest = Watermark.LOWEST;
035    private Watermark high = Watermark.LOWEST;
036
037    public WatermarkMonotonicInterval() {
038    }
039
040    /**
041     * Take in account the watermark.<br/>
042     * Not thread safe.
043     */
044    public long mark(long watermarkValue) {
045        return mark(Watermark.ofValue(watermarkValue));
046    }
047
048    /**
049     * Take in account the watermark.<br/>
050     * Not thread safe.
051     */
052    public long mark(Watermark watermark) {
053        if (low == Watermark.LOWEST) {
054            low = high = watermark;
055        } else if (watermark.compareTo(low) < 0) {
056            if (watermark.compareTo(lowest) < 0) {
057                // low watermark must increase to be monotonic
058                if (log.isTraceEnabled()) {
059                    log.trace("receive too low watermark, rejected " + watermark + " lowest: " + lowest);
060                }
061                low = lowest;
062            } else {
063                low = watermark;
064            }
065        }
066        if (watermark.compareTo(high) > 0) {
067            high = watermark;
068        }
069        return low.getValue();
070    }
071
072    /**
073     * Move the low watermark to the highest mark.
074     * Returns the low watermark that should be monotonic (the value returned here never decrease).<br/>
075     * Not thread safe.
076     */
077    public long checkpoint() {
078        low = Watermark.completedOf(high);
079        lowest = low;
080        // System.out.println(low);
081        return low.getValue();
082    }
083
084    public boolean isDone(long timestamp) {
085        return low.isDone(timestamp);
086    }
087
088    /**
089     * Returns the low mark. The value can decrease but not under the last checkpoint value.<br/>
090     * Thread safe usage.
091     */
092    public Watermark getLow() {
093        return low;
094    }
095
096    public Watermark getHigh() {
097        return high;
098    }
099
100    @Override
101    public String toString() {
102        return "WatermarkInterval{" +
103                "low=" + low +
104                ", high=" + high +
105                '}';
106    }
107
108}