001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import java.util.Objects; 022 023/** 024 * Watermark represents a point in time. This point in time is composed of a millisecond timestamp and a sequence. There 025 * is also a state to denote if the point in time is reached (completed) or not. Watermark are immutable. 026 * 027 * @since 9.3 028 */ 029public final class Watermark implements Comparable<Watermark> { 030 public static final Watermark LOWEST = new Watermark(0, (short) 0, false); 031 032 protected final long timestamp; 033 034 protected final short sequence; 035 036 protected final boolean completed; 037 038 protected final long value; 039 040 private Watermark(long timestamp, short sequence, boolean completed) { 041 if (timestamp < 0) { 042 throw new IllegalArgumentException("timestamp must be positive"); 043 } 044 this.timestamp = timestamp; 045 this.sequence = sequence; 046 this.completed = completed; 047 this.value = timestamp << 17 | (sequence & 0xFFFF) << 1 | (completed ? 1 : 0); 048 } 049 050 public static Watermark ofValue(long watermarkValue) { 051 if (watermarkValue < 0) { 052 throw new IllegalArgumentException("Watermark must be positive"); 053 } 054 return new Watermark(watermarkValue >> 17, (short) ((watermarkValue >> 1) & 0xFFFF), 055 (watermarkValue & 1L) == 1L); 056 } 057 058 public static Watermark ofNow() { 059 return ofTimestamp(System.currentTimeMillis(), (short) 0); 060 } 061 062 public static Watermark ofTimestamp(long timestamp) { 063 return ofTimestamp(timestamp, (short) 0); 064 } 065 066 public static Watermark ofTimestamp(long timestamp, short sequence) { 067 return new Watermark(timestamp, sequence, false); 068 } 069 070 public static Watermark completedOf(Watermark watermark) { 071 Objects.requireNonNull(watermark); 072 return new Watermark(watermark.getTimestamp(), watermark.getSequence(), true); 073 } 074 075 public long getValue() { 076 return value; 077 } 078 079 public boolean isCompleted() { 080 return completed; 081 } 082 083 public short getSequence() { 084 return sequence; 085 } 086 087 public long getTimestamp() { 088 return timestamp; 089 } 090 091 public boolean isDone(long timestamp) { 092 return Watermark.ofTimestamp(timestamp).compareTo(this) < 0; 093 } 094 095 @Override 096 public boolean equals(Object o) { 097 if (this == o) 098 return true; 099 if (o == null || getClass() != o.getClass()) 100 return false; 101 102 Watermark watermark = (Watermark) o; 103 return completed == watermark.completed && timestamp == watermark.timestamp && sequence == watermark.sequence; 104 } 105 106 @Override 107 public int hashCode() { 108 return (int) (value ^ (value >>> 32)); 109 } 110 111 @Override 112 public String toString() { 113 return "Watermark{" + "completed=" + completed + ", timestamp=" + timestamp + ", sequence=" + sequence 114 + ", value=" + value + '}'; 115 } 116 117 @SuppressWarnings("NullableProblems") 118 @Override 119 public int compareTo(Watermark o) { 120 if (o == null) { 121 return Integer.MAX_VALUE; 122 } 123 long diff = value - o.value; 124 // cast diff to int when possible 125 int ret = (int) diff; 126 if (ret == diff) { 127 return ret; 128 } 129 if (diff > 0) { 130 return 1; 131 } 132 return -1; 133 } 134 135}