001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation; 020 021import java.util.Objects; 022 023/** 024 * Watermark represents a point in time. 025 * This point in time is composed of a millisecond timestamp and a sequence. 026 * There is also a state to denote if the point in time is reached (completed) or not. 027 * 028 * Watermark are immutable. 029 * 030 * @since 9.2 031 */ 032final public class Watermark implements Comparable<Watermark> { 033 final private long timestamp; 034 final private short sequence; 035 final private boolean completed; 036 final private long value; 037 final public static Watermark LOWEST = new Watermark(0, (short) 0, false); 038 039 private Watermark(long timestamp, short sequence, boolean completed) { 040 if (timestamp < 0) { 041 throw new IllegalArgumentException("timestamp must be positive"); 042 } 043 this.timestamp = timestamp; 044 this.sequence = sequence; 045 this.completed = completed; 046 this.value = timestamp << 17 | (sequence & 0xFFFF) << 1 | (completed ? 1 : 0); 047 } 048 049 static public Watermark ofValue(long watermarkValue) { 050 if (watermarkValue < 0) { 051 throw new IllegalArgumentException("Watermark must be positive"); 052 } 053 return new Watermark( 054 watermarkValue >> 17, 055 (short) ((watermarkValue >> 1) & 0xFFFF), 056 (watermarkValue & 1L) == 1L); 057 } 058 059 static public Watermark ofTimestamp(long timestamp) { 060 return ofTimestamp(timestamp, (short) 0); 061 } 062 063 static public Watermark ofTimestamp(long timestamp, short sequence) { 064 return new Watermark(timestamp, sequence, false); 065 } 066 067 static public Watermark completedOf(Watermark watermark) { 068 Objects.requireNonNull(watermark); 069 return new Watermark(watermark.getTimestamp(), watermark.getSequence(), true); 070 } 071 072 public long getValue() { 073 return value; 074 } 075 076 public boolean isCompleted() { 077 return completed; 078 } 079 080 public short getSequence() { 081 return sequence; 082 } 083 084 public long getTimestamp() { 085 return timestamp; 086 } 087 088 public boolean isDone(long timestamp) { 089 return Watermark.ofTimestamp(timestamp).compareTo(this) < 0; 090 } 091 092 @Override 093 public boolean equals(Object o) { 094 if (this == o) return true; 095 if (o == null || getClass() != o.getClass()) return false; 096 097 Watermark watermark = (Watermark) o; 098 if (completed != watermark.completed) return false; 099 if (timestamp != watermark.timestamp) return false; 100 return sequence == watermark.sequence; 101 } 102 103 @Override 104 public int hashCode() { 105 return (int) (value ^ (value >>> 32)); 106 } 107 108 @Override 109 public String toString() { 110 return "Watermark{" + 111 "completed=" + completed + 112 ", timestamp=" + timestamp + 113 ", sequence=" + sequence + 114 ", value=" + value + 115 '}'; 116 } 117 118 @Override 119 public int compareTo(Watermark o) { 120 if (o == null) { 121 return Integer.MAX_VALUE; 122 } 123 long diff = value - o.value; 124 // cast diff to int when possible 125 int ret = (int) diff; 126 if (ret == diff) { 127 return ret; 128 } 129 if (diff > 0) { 130 return Integer.MAX_VALUE; 131 } 132 return Integer.MIN_VALUE; 133 } 134 135}