001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.Objects;
022
023/**
024 * Watermark represents a point in time. This point in time is composed of a millisecond timestamp and a sequence. There
025 * is also a state to denote if the point in time is reached (completed) or not. Watermark are immutable.
026 *
027 * @since 9.3
028 */
029final public class Watermark implements Comparable<Watermark> {
030    final public static Watermark LOWEST = new Watermark(0, (short) 0, false);
031
032    final protected long timestamp;
033
034    final protected short sequence;
035
036    final protected boolean completed;
037
038    final protected long value;
039
040    private Watermark(long timestamp, short sequence, boolean completed) {
041        if (timestamp < 0) {
042            throw new IllegalArgumentException("timestamp must be positive");
043        }
044        this.timestamp = timestamp;
045        this.sequence = sequence;
046        this.completed = completed;
047        this.value = timestamp << 17 | (sequence & 0xFFFF) << 1 | (completed ? 1 : 0);
048    }
049
050    public static Watermark ofValue(long watermarkValue) {
051        if (watermarkValue < 0) {
052            throw new IllegalArgumentException("Watermark must be positive");
053        }
054        return new Watermark(watermarkValue >> 17, (short) ((watermarkValue >> 1) & 0xFFFF),
055                (watermarkValue & 1L) == 1L);
056    }
057
058    public static Watermark ofTimestamp(long timestamp) {
059        return ofTimestamp(timestamp, (short) 0);
060    }
061
062    public static Watermark ofTimestamp(long timestamp, short sequence) {
063        return new Watermark(timestamp, sequence, false);
064    }
065
066    public static Watermark completedOf(Watermark watermark) {
067        Objects.requireNonNull(watermark);
068        return new Watermark(watermark.getTimestamp(), watermark.getSequence(), true);
069    }
070
071    public long getValue() {
072        return value;
073    }
074
075    public boolean isCompleted() {
076        return completed;
077    }
078
079    public short getSequence() {
080        return sequence;
081    }
082
083    public long getTimestamp() {
084        return timestamp;
085    }
086
087    public boolean isDone(long timestamp) {
088        return Watermark.ofTimestamp(timestamp).compareTo(this) < 0;
089    }
090
091    @Override
092    public boolean equals(Object o) {
093        if (this == o)
094            return true;
095        if (o == null || getClass() != o.getClass())
096            return false;
097
098        Watermark watermark = (Watermark) o;
099        return completed == watermark.completed && timestamp == watermark.timestamp && sequence == watermark.sequence;
100    }
101
102    @Override
103    public int hashCode() {
104        return (int) (value ^ (value >>> 32));
105    }
106
107    @Override
108    public String toString() {
109        return "Watermark{" + "completed=" + completed + ", timestamp=" + timestamp + ", sequence=" + sequence
110                + ", value=" + value + '}';
111    }
112
113    @SuppressWarnings("NullableProblems")
114    @Override
115    public int compareTo(Watermark o) {
116        if (o == null) {
117            return Integer.MAX_VALUE;
118        }
119        long diff = value - o.value;
120        // cast diff to int when possible
121        int ret = (int) diff;
122        if (ret == diff) {
123            return ret;
124        }
125        if (diff > 0) {
126            return Integer.MAX_VALUE;
127        }
128        return Integer.MIN_VALUE;
129    }
130
131}