001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static java.lang.Math.min; 022 023import java.io.Externalizable; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectOutput; 027import java.io.UnsupportedEncodingException; 028import java.text.SimpleDateFormat; 029import java.util.Arrays; 030import java.util.Date; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Objects; 035 036/** 037 * Basic data object that contains: key, watermark, flag and data. 038 * 039 * @since 9.3 040 */ 041@SuppressWarnings("deprecation") 042public class Record implements Externalizable { 043 protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT); 044 045 protected static final byte[] NO_DATA = new byte[0]; 046 047 // Externalizable do rely on serialVersionUID 048 static final long serialVersionUID = 2017_05_29L; 049 050 /** @deprecated 10.2 use {@link #getWatermark()} or {@link #setWatermark(long)} instead */ 051 @Deprecated 052 public long watermark; 053 054 /** @deprecated 10.2 use {@link #getKey()} or {@link #setKey(String)} instead */ 055 @Deprecated 056 public String key; 057 058 /** @deprecated 10.2 use {@link #getData()} or {@link #setData(byte[])} instead */ 059 @Deprecated 060 // We can not use null because Nullable on byte[] requires avro 1.7.6 cf AVRO-1401 061 public byte[] data = NO_DATA; 062 063 /** @deprecated 10.2 use {@link #getFlags()} or {@link #setFlags(EnumSet)} instead */ 064 @SuppressWarnings("DeprecatedIsStillUsed") 065 @Deprecated 066 // The enumSet representation of the flags is transient because serializer don't handle this type 067 public transient EnumSet<Flag> flags; 068 069 protected byte flagsAsByte; 070 071 public Record() { 072 // Empty constructor required for deserialization 073 } 074 075 /** 076 * Creates a record using current watermark corresponding to the current time, with a default flag 077 */ 078 public Record(String key, byte[] data) { 079 this(key, data, Watermark.ofNow().getValue(), DEFAULT_FLAG); 080 } 081 082 /** 083 * Creates a record using a default flag 084 */ 085 public Record(String key, byte[] data, long watermark) { 086 this(key, data, watermark, DEFAULT_FLAG); 087 } 088 089 public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) { 090 this.key = key; 091 this.watermark = watermark; 092 setData(data); 093 setFlags(flags); 094 } 095 096 /** 097 * Creates a record using current timestamp and default flag 098 */ 099 public static Record of(String key, byte[] data) { 100 return new Record(key, data); 101 } 102 103 public long getWatermark() { 104 return watermark; 105 } 106 107 public void setWatermark(long watermark) { 108 this.watermark = watermark; 109 } 110 111 public EnumSet<Flag> getFlags() { 112 if (flags == null) { 113 flags = decodeFlags(flagsAsByte); 114 } 115 return flags; 116 } 117 118 public void setFlags(EnumSet<Flag> flags) { 119 this.flags = flags; 120 this.flagsAsByte = (byte) encodeFlags(flags); 121 } 122 123 public String getKey() { 124 return key; 125 } 126 127 public void setKey(String key) { 128 this.key = key; 129 } 130 131 public byte[] getData() { 132 return data; 133 } 134 135 public void setData(byte[] data) { 136 if (data != null) { 137 this.data = data; 138 } else { 139 this.data = NO_DATA; 140 } 141 } 142 143 @Override 144 public String toString() { 145 String overview = ""; 146 String wmDate = ""; 147 if (data != null && data.length > 0) { 148 try { 149 overview = ", data=\"" + new String(data, "UTF-8").substring(0, min(data.length, 127)) + '"'; 150 } catch (UnsupportedEncodingException e) { 151 overview = "unsupported encoding"; 152 } 153 overview = overview.replaceAll("[^\\x20-\\x7e]", "."); 154 } 155 if (watermark > 0) { 156 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 157 Watermark wm = Watermark.ofValue(watermark); 158 wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp())); 159 } 160 return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + getFlags() + ", key='" + key + '\'' 161 + ", data.length=" + ((data == null) ? 0 : data.length) + overview + '}'; 162 } 163 164 @Override 165 public void writeExternal(ObjectOutput out) throws IOException { 166 out.writeLong(watermark); 167 // use a short for backward compatibility 168 out.writeShort(flagsAsByte); 169 out.writeObject(key); 170 if (data == null || data.length == 0) { 171 out.writeInt(0); 172 } else { 173 out.writeInt(data.length); 174 out.write(data); 175 } 176 } 177 178 @Override 179 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 180 this.watermark = in.readLong(); 181 // use a short for backward compatibility 182 this.flagsAsByte = (byte) in.readShort(); 183 this.key = (String) in.readObject(); 184 int dataLength = in.readInt(); 185 if (dataLength == 0) { 186 this.data = NO_DATA; 187 } else { 188 this.data = new byte[dataLength]; 189 // not using in.readFully because it is not impl by Chronicle WireObjectInput 190 int pos = 0; 191 while (pos < dataLength) { 192 int byteRead = in.read(this.data, pos, dataLength - pos); 193 if (byteRead == -1) { 194 throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes"); 195 } 196 pos += byteRead; 197 } 198 } 199 } 200 201 protected short encodeFlags(EnumSet<Flag> enumSet) { 202 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 203 short ret = 0; 204 if (enumSet != null) { 205 for (Flag val : enumSet) { 206 ret = (short) (ret | (1 << val.ordinal())); 207 } 208 } 209 return ret; 210 } 211 212 protected EnumSet<Flag> decodeFlags(short encoded) { 213 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 214 Map<Integer, Flag> ordinalMap = new HashMap<>(); 215 for (Flag val : Flag.ALL_OPTS) { 216 ordinalMap.put(val.ordinal(), val); 217 } 218 EnumSet<Flag> ret = EnumSet.noneOf(Flag.class); 219 int ordinal = 0; 220 for (int i = 1; i != 0; i <<= 1) { 221 if ((i & encoded) != 0) { 222 ret.add(ordinalMap.get(ordinal)); 223 } 224 ++ordinal; 225 } 226 return ret; 227 } 228 229 @Override 230 public boolean equals(Object o) { 231 if (this == o) { 232 return true; 233 } 234 if (o == null || getClass() != o.getClass()) { 235 return false; 236 } 237 Record record = (Record) o; 238 return watermark == record.watermark && flagsAsByte == record.flagsAsByte && Objects.equals(key, record.key) 239 && Arrays.equals(data, record.data); 240 } 241 242 @Override 243 public int hashCode() { 244 int result = Objects.hash(watermark, flagsAsByte, key); 245 result = 31 * result + Arrays.hashCode(data); 246 return result; 247 } 248 249 public enum Flag { 250 // limited to 8 flags so it can be encoded as a byte 251 DEFAULT, COMMIT, POISON_PILL, SKIP, TRACE, PAUSE, USER1, USER2; 252 253 public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class); 254 } 255}