001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.util.Objects; 022 023/** 024 * Watermark represents a point in time. This point in time is composed of a millisecond timestamp and a sequence. There 025 * is also a state to denote if the point in time is reached (completed) or not. Watermark are immutable. 026 * 027 * @since 9.3 028 */ 029final public class Watermark implements Comparable<Watermark> { 030 final public static Watermark LOWEST = new Watermark(0, (short) 0, false); 031 032 final protected long timestamp; 033 034 final protected short sequence; 035 036 final protected boolean completed; 037 038 final protected long value; 039 040 private Watermark(long timestamp, short sequence, boolean completed) { 041 if (timestamp < 0) { 042 throw new IllegalArgumentException("timestamp must be positive"); 043 } 044 this.timestamp = timestamp; 045 this.sequence = sequence; 046 this.completed = completed; 047 this.value = timestamp << 17 | (sequence & 0xFFFF) << 1 | (completed ? 1 : 0); 048 } 049 050 public static Watermark ofValue(long watermarkValue) { 051 if (watermarkValue < 0) { 052 throw new IllegalArgumentException("Watermark must be positive"); 053 } 054 return new Watermark(watermarkValue >> 17, (short) ((watermarkValue >> 1) & 0xFFFF), 055 (watermarkValue & 1L) == 1L); 056 } 057 058 public static Watermark ofTimestamp(long timestamp) { 059 return ofTimestamp(timestamp, (short) 0); 060 } 061 062 public static Watermark ofTimestamp(long timestamp, short sequence) { 063 return new Watermark(timestamp, sequence, false); 064 } 065 066 public static Watermark completedOf(Watermark watermark) { 067 Objects.requireNonNull(watermark); 068 return new Watermark(watermark.getTimestamp(), watermark.getSequence(), true); 069 } 070 071 public long getValue() { 072 return value; 073 } 074 075 public boolean isCompleted() { 076 return completed; 077 } 078 079 public short getSequence() { 080 return sequence; 081 } 082 083 public long getTimestamp() { 084 return timestamp; 085 } 086 087 public boolean isDone(long timestamp) { 088 return Watermark.ofTimestamp(timestamp).compareTo(this) < 0; 089 } 090 091 @Override 092 public boolean equals(Object o) { 093 if (this == o) 094 return true; 095 if (o == null || getClass() != o.getClass()) 096 return false; 097 098 Watermark watermark = (Watermark) o; 099 return completed == watermark.completed && timestamp == watermark.timestamp && sequence == watermark.sequence; 100 } 101 102 @Override 103 public int hashCode() { 104 return (int) (value ^ (value >>> 32)); 105 } 106 107 @Override 108 public String toString() { 109 return "Watermark{" + "completed=" + completed + ", timestamp=" + timestamp + ", sequence=" + sequence 110 + ", value=" + value + '}'; 111 } 112 113 @SuppressWarnings("NullableProblems") 114 @Override 115 public int compareTo(Watermark o) { 116 if (o == null) { 117 return Integer.MAX_VALUE; 118 } 119 long diff = value - o.value; 120 // cast diff to int when possible 121 int ret = (int) diff; 122 if (ret == diff) { 123 return ret; 124 } 125 if (diff > 0) { 126 return Integer.MAX_VALUE; 127 } 128 return Integer.MIN_VALUE; 129 } 130 131}