001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation;
020
021import java.util.Objects;
022
023/**
024 * Watermark represents a point in time.
025 * This point in time is composed of a millisecond timestamp and a sequence.
026 * There is also a state to denote if the point in time is reached (completed) or not.
027 *
028 * Watermark are immutable.
029 *
030 * @since 9.2
031 */
032final public class Watermark implements Comparable<Watermark> {
033    final private long timestamp;
034    final private short sequence;
035    final private boolean completed;
036    final private long value;
037    final public static Watermark LOWEST = new Watermark(0, (short) 0, false);
038
039    private Watermark(long timestamp, short sequence, boolean completed) {
040        if (timestamp < 0) {
041            throw new IllegalArgumentException("timestamp must be positive");
042        }
043        this.timestamp = timestamp;
044        this.sequence = sequence;
045        this.completed = completed;
046        this.value = timestamp << 17 | (sequence & 0xFFFF) << 1 | (completed ? 1 : 0);
047    }
048
049    static public Watermark ofValue(long watermarkValue) {
050        if (watermarkValue < 0) {
051            throw new IllegalArgumentException("Watermark must be positive");
052        }
053        return new Watermark(
054                watermarkValue >> 17,
055                (short) ((watermarkValue >> 1) & 0xFFFF),
056                (watermarkValue & 1L) == 1L);
057    }
058
059    static public Watermark ofTimestamp(long timestamp) {
060        return ofTimestamp(timestamp, (short) 0);
061    }
062
063    static public Watermark ofTimestamp(long timestamp, short sequence) {
064        return new Watermark(timestamp, sequence, false);
065    }
066
067    static public Watermark completedOf(Watermark watermark) {
068        Objects.requireNonNull(watermark);
069        return new Watermark(watermark.getTimestamp(), watermark.getSequence(), true);
070    }
071
072    public long getValue() {
073        return value;
074    }
075
076    public boolean isCompleted() {
077        return completed;
078    }
079
080    public short getSequence() {
081        return sequence;
082    }
083
084    public long getTimestamp() {
085        return timestamp;
086    }
087
088    public boolean isDone(long timestamp) {
089        return Watermark.ofTimestamp(timestamp).compareTo(this) < 0;
090    }
091
092    @Override
093    public boolean equals(Object o) {
094        if (this == o) return true;
095        if (o == null || getClass() != o.getClass()) return false;
096
097        Watermark watermark = (Watermark) o;
098        if (completed != watermark.completed) return false;
099        if (timestamp != watermark.timestamp) return false;
100        return sequence == watermark.sequence;
101    }
102
103    @Override
104    public int hashCode() {
105        return (int) (value ^ (value >>> 32));
106    }
107
108    @Override
109    public String toString() {
110        return "Watermark{" +
111                "completed=" + completed +
112                ", timestamp=" + timestamp +
113                ", sequence=" + sequence +
114                ", value=" + value +
115                '}';
116    }
117
118    @Override
119    public int compareTo(Watermark o) {
120        if (o == null) {
121            return Integer.MAX_VALUE;
122        }
123        long diff = value - o.value;
124        // cast diff to int when possible
125        int ret = (int) diff;
126        if (ret == diff) {
127            return ret;
128        }
129        if (diff > 0) {
130            return Integer.MAX_VALUE;
131        }
132        return Integer.MIN_VALUE;
133    }
134
135}