001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation.internals;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.nuxeo.lib.stream.computation.Watermark;
024
025/**
026 * Keep track of minimum and maximum watermark level. On checkpoint move the low watermark to the previous maximum mark.
027 *
028 * @since 9.3
029 */
030public class WatermarkMonotonicInterval {
031    private static final Log log = LogFactory.getLog(WatermarkMonotonicInterval.class);
032
033    protected volatile Watermark low = Watermark.LOWEST;
034
035    protected Watermark lowest = Watermark.LOWEST;
036
037    protected Watermark high = Watermark.LOWEST;
038
039    /**
040     * Take in account the watermark.<br/>
041     * Not thread safe.
042     */
043    public long mark(long watermarkValue) {
044        return mark(Watermark.ofValue(watermarkValue));
045    }
046
047    /**
048     * Take in account the watermark.<br/>
049     * Not thread safe.
050     */
051    public long mark(Watermark watermark) {
052        if (Watermark.LOWEST.equals(low)) {
053            low = high = watermark;
054        } else if (watermark.compareTo(low) < 0) {
055            if (watermark.compareTo(lowest) < 0) {
056                // low watermark must increase to be monotonic
057                if (log.isTraceEnabled()) {
058                    log.trace("receive too low watermark, rejected " + watermark + " lowest: " + lowest);
059                }
060                low = lowest;
061            } else {
062                low = watermark;
063            }
064        }
065        if (watermark.compareTo(high) > 0) {
066            high = watermark;
067        }
068        return low.getValue();
069    }
070
071    /**
072     * Move the low watermark to the highest mark. Returns the low watermark that should be monotonic (the value
073     * returned here never decrease).<br/>
074     * Not thread safe.
075     */
076    public long checkpoint() {
077        low = Watermark.completedOf(high);
078        lowest = low;
079        return low.getValue();
080    }
081
082    public boolean isDone(long timestamp) {
083        return low.isDone(timestamp);
084    }
085
086    /**
087     * Returns the low mark. The value can decrease but not under the last checkpoint value.<br/>
088     * Thread safe usage.
089     */
090    public Watermark getLow() {
091        return low;
092    }
093
094    public Watermark getHigh() {
095        return high;
096    }
097
098    @Override
099    public String toString() {
100        return "WatermarkInterval{" + "low=" + low + ", high=" + high + '}';
101    }
102
103}