001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import java.util.Objects;
022
023/**
024 * Watermark represents a point in time. This point in time is composed of a millisecond timestamp and a sequence. There
025 * is also a state to denote if the point in time is reached (completed) or not. Watermark are immutable.
026 *
027 * @since 9.3
028 */
029public final class Watermark implements Comparable<Watermark> {
030    public static final Watermark LOWEST = new Watermark(0, (short) 0, false);
031
032    protected final long timestamp;
033
034    protected final short sequence;
035
036    protected final boolean completed;
037
038    protected final long value;
039
040    private Watermark(long timestamp, short sequence, boolean completed) {
041        if (timestamp < 0) {
042            throw new IllegalArgumentException("timestamp must be positive");
043        }
044        this.timestamp = timestamp;
045        this.sequence = sequence;
046        this.completed = completed;
047        this.value = timestamp << 17 | (sequence & 0xFFFF) << 1 | (completed ? 1 : 0);
048    }
049
050    public static Watermark ofValue(long watermarkValue) {
051        if (watermarkValue < 0) {
052            throw new IllegalArgumentException("Watermark must be positive");
053        }
054        return new Watermark(watermarkValue >> 17, (short) ((watermarkValue >> 1) & 0xFFFF),
055                (watermarkValue & 1L) == 1L);
056    }
057
058    public static Watermark ofNow() {
059        return ofTimestamp(System.currentTimeMillis(), (short) 0);
060    }
061
062    public static Watermark ofTimestamp(long timestamp) {
063        return ofTimestamp(timestamp, (short) 0);
064    }
065
066    public static Watermark ofTimestamp(long timestamp, short sequence) {
067        return new Watermark(timestamp, sequence, false);
068    }
069
070    public static Watermark completedOf(Watermark watermark) {
071        Objects.requireNonNull(watermark);
072        return new Watermark(watermark.getTimestamp(), watermark.getSequence(), true);
073    }
074
075    public long getValue() {
076        return value;
077    }
078
079    public boolean isCompleted() {
080        return completed;
081    }
082
083    public short getSequence() {
084        return sequence;
085    }
086
087    public long getTimestamp() {
088        return timestamp;
089    }
090
091    public boolean isDone(long timestamp) {
092        return Watermark.ofTimestamp(timestamp).compareTo(this) < 0;
093    }
094
095    @Override
096    public boolean equals(Object o) {
097        if (this == o)
098            return true;
099        if (o == null || getClass() != o.getClass())
100            return false;
101
102        Watermark watermark = (Watermark) o;
103        return completed == watermark.completed && timestamp == watermark.timestamp && sequence == watermark.sequence;
104    }
105
106    @Override
107    public int hashCode() {
108        return (int) (value ^ (value >>> 32));
109    }
110
111    @Override
112    public String toString() {
113        return "Watermark{" + "completed=" + completed + ", timestamp=" + timestamp + ", sequence=" + sequence
114                + ", value=" + value + '}';
115    }
116
117    @SuppressWarnings("NullableProblems")
118    @Override
119    public int compareTo(Watermark o) {
120        if (o == null) {
121            return Integer.MAX_VALUE;
122        }
123        long diff = value - o.value;
124        // cast diff to int when possible
125        int ret = (int) diff;
126        if (ret == diff) {
127            return ret;
128        }
129        if (diff > 0) {
130            return 1;
131        }
132        return -1;
133    }
134
135}