001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static java.lang.Math.min; 022 023import java.io.Externalizable; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectOutput; 027import java.nio.charset.StandardCharsets; 028import java.text.SimpleDateFormat; 029import java.util.Arrays; 030import java.util.Date; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Objects; 035 036/** 037 * Basic data object that contains: key, watermark, flag and data. 038 * 039 * @since 9.3 040 */ 041@SuppressWarnings("deprecation") 042public class Record implements Externalizable { 043 protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT); 044 045 protected static final byte[] NO_DATA = new byte[0]; 046 047 // Externalizable do rely on serialVersionUID 048 static final long serialVersionUID = 2017_05_29L; 049 050 /** @deprecated 10.2 use {@link #getWatermark()} or {@link #setWatermark(long)} instead */ 051 @Deprecated 052 public long watermark; 053 054 /** @deprecated 10.2 use {@link #getKey()} or {@link #setKey(String)} instead */ 055 @Deprecated 056 public String key; 057 058 /** @deprecated 10.2 use {@link #getData()} or {@link #setData(byte[])} instead */ 059 @Deprecated 060 // We can not use null because Nullable on byte[] requires avro 1.7.6 cf AVRO-1401 061 public byte[] data = NO_DATA; 062 063 /** @deprecated 10.2 use {@link #getFlags()} or {@link #setFlags(EnumSet)} instead */ 064 @SuppressWarnings("DeprecatedIsStillUsed") 065 @Deprecated 066 // The enumSet representation of the flags is transient because serializer don't handle this type 067 public transient EnumSet<Flag> flags; 068 069 protected byte flagsAsByte; 070 071 public Record() { 072 // Empty constructor required for deserialization 073 } 074 075 /** 076 * Creates a record using current watermark corresponding to the current time, with a default flag 077 */ 078 public Record(String key, byte[] data) { 079 this(key, data, Watermark.ofNow().getValue(), DEFAULT_FLAG); 080 } 081 082 /** 083 * Creates a record using a default flag 084 */ 085 public Record(String key, byte[] data, long watermark) { 086 this(key, data, watermark, DEFAULT_FLAG); 087 } 088 089 public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) { 090 this.key = key; 091 this.watermark = watermark; 092 setData(data); 093 setFlags(flags); 094 } 095 096 /** 097 * Creates a record using current timestamp and default flag 098 */ 099 public static Record of(String key, byte[] data) { 100 return new Record(key, data); 101 } 102 103 public long getWatermark() { 104 return watermark; 105 } 106 107 public void setWatermark(long watermark) { 108 this.watermark = watermark; 109 } 110 111 public EnumSet<Flag> getFlags() { 112 if (flags == null) { 113 flags = decodeFlags(flagsAsByte); 114 } 115 return flags; 116 } 117 118 public void setFlags(EnumSet<Flag> flags) { 119 this.flags = flags; 120 this.flagsAsByte = (byte) encodeFlags(flags); 121 } 122 123 public String getKey() { 124 return key; 125 } 126 127 public void setKey(String key) { 128 this.key = key; 129 } 130 131 public byte[] getData() { 132 return data; 133 } 134 135 public void setData(byte[] data) { 136 if (data != null) { 137 this.data = data; 138 } else { 139 this.data = NO_DATA; 140 } 141 } 142 143 @Override 144 public String toString() { 145 String wmDate = ""; 146 if (watermark > 0) { 147 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 148 Watermark wm = Watermark.ofValue(watermark); 149 wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp())); 150 } 151 return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + getFlags() + ", key='" + key + '\'' 152 + ", data.length=" + ((data == null) ? 0 : data.length) + ", data=\"" + dataOverview(127) + "\"}"; 153 } 154 155 public String dataOverview(int maxLength) { 156 String overview = ""; 157 if (data != null && data.length > 0) { 158 String dataAsString = new String(data, StandardCharsets.UTF_8); 159 overview = dataAsString.substring(0, min(dataAsString.length(), maxLength)); 160 overview = overview.replaceAll("[^\\x20-\\x7e]", "."); 161 } 162 return overview; 163 } 164 165 @Override 166 public void writeExternal(ObjectOutput out) throws IOException { 167 out.writeLong(watermark); 168 // use a short for backward compatibility 169 out.writeShort(flagsAsByte); 170 out.writeObject(key); 171 if (data == null || data.length == 0) { 172 out.writeInt(0); 173 } else { 174 out.writeInt(data.length); 175 out.write(data); 176 } 177 } 178 179 @Override 180 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 181 this.watermark = in.readLong(); 182 // use a short for backward compatibility 183 this.flagsAsByte = (byte) in.readShort(); 184 this.key = (String) in.readObject(); 185 int dataLength = in.readInt(); 186 if (dataLength == 0) { 187 this.data = NO_DATA; 188 } else { 189 this.data = new byte[dataLength]; 190 // not using in.readFully because it is not impl by Chronicle WireObjectInput 191 int pos = 0; 192 while (pos < dataLength) { 193 int byteRead = in.read(this.data, pos, dataLength - pos); 194 if (byteRead == -1) { 195 throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes"); 196 } 197 pos += byteRead; 198 } 199 } 200 } 201 202 protected short encodeFlags(EnumSet<Flag> enumSet) { 203 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 204 short ret = 0; 205 if (enumSet != null) { 206 for (Flag val : enumSet) { 207 ret = (short) (ret | (1 << val.ordinal())); 208 } 209 } 210 return ret; 211 } 212 213 protected EnumSet<Flag> decodeFlags(byte encoded) { 214 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 215 Map<Integer, Flag> ordinalMap = new HashMap<>(); 216 for (Flag val : Flag.ALL_OPTS) { 217 ordinalMap.put(val.ordinal(), val); 218 } 219 EnumSet<Flag> ret = EnumSet.noneOf(Flag.class); 220 int ordinal = 0; 221 for (byte i = 1; i != 0; i <<= 1) { 222 if ((i & encoded) != 0) { 223 ret.add(ordinalMap.get(ordinal)); 224 } 225 ++ordinal; 226 } 227 return ret; 228 } 229 230 @Override 231 public boolean equals(Object o) { 232 if (this == o) { 233 return true; 234 } 235 if (o == null || getClass() != o.getClass()) { 236 return false; 237 } 238 Record record = (Record) o; 239 return watermark == record.watermark && flagsAsByte == record.flagsAsByte && Objects.equals(key, record.key) 240 && Arrays.equals(data, record.data); 241 } 242 243 @Override 244 public int hashCode() { 245 int result = Objects.hash(watermark, flagsAsByte, key); 246 result = 31 * result + Arrays.hashCode(data); 247 return result; 248 } 249 250 public enum Flag { 251 // limited to 8 flags so it can be encoded as a byte 252 DEFAULT, COMMIT, POISON_PILL, SKIP, TRACE, PAUSE, USER1, USER2; 253 254 public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class); 255 } 256}