001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static java.lang.Math.min; 022 023import java.io.Externalizable; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectOutput; 027import java.nio.charset.StandardCharsets; 028import java.text.SimpleDateFormat; 029import java.util.Arrays; 030import java.util.Date; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Objects; 035 036import org.apache.avro.reflect.Nullable; 037 038import io.opencensus.trace.BlankSpan; 039import io.opencensus.trace.Tracing; 040 041/** 042 * Basic data object that contains: key, watermark, flag and data. 043 * 044 * @since 9.3 045 */ 046@SuppressWarnings("deprecation") 047public class Record implements Externalizable { 048 049 protected static final byte[] NO_DATA = new byte[0]; 050 051 // Externalizable do rely on serialVersionUID 052 static final long serialVersionUID = 2020_05_23L; 053 054 protected long watermark; 055 056 protected String key; 057 058 @Nullable 059 protected byte[] data; 060 061 // The enumSet representation of the flags is transient because serializer don't handle this type 062 protected transient EnumSet<Flag> flags; 063 064 protected byte flagsAsByte; 065 066 /** @since 11.1 used for tracing context propagation */ 067 @Nullable 068 protected byte[] traceContext; 069 070 @Nullable 071 protected String appenderThread; 072 073 public Record() { 074 // Empty constructor required for deserialization 075 } 076 077 /** 078 * Creates a record using current watermark corresponding to the current time, with a default flag 079 */ 080 public Record(String key, byte[] data) { 081 this(key, data, Watermark.ofNow().getValue(), EnumSet.of(Flag.DEFAULT)); 082 } 083 084 /** 085 * Creates a record using a default flag 086 */ 087 public Record(String key, byte[] data, long watermark) { 088 this(key, data, watermark, EnumSet.of(Flag.DEFAULT)); 089 } 090 091 public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) { 092 this.key = key; 093 this.watermark = watermark; 094 setData(data); 095 setFlags(flags); 096 if (!(Tracing.getTracer().getCurrentSpan() instanceof BlankSpan)) { 097 traceContext = Tracing.getPropagationComponent() 098 .getBinaryFormat() 099 .toByteArray(Tracing.getTracer().getCurrentSpan().getContext()); 100 appenderThread = Thread.currentThread().getName(); 101 } 102 } 103 104 /** 105 * Creates a record using current timestamp and default flag 106 */ 107 public static Record of(String key, byte[] data) { 108 return new Record(key, data); 109 } 110 111 public long getWatermark() { 112 return watermark; 113 } 114 115 public void setWatermark(long watermark) { 116 this.watermark = watermark; 117 } 118 119 public EnumSet<Flag> getFlags() { 120 if (flags == null) { 121 flags = decodeFlags(flagsAsByte); 122 } 123 return flags; 124 } 125 126 public void setFlags(EnumSet<Flag> flags) { 127 this.flags = flags; 128 this.flagsAsByte = (byte) encodeFlags(flags); 129 } 130 131 public void setFlags(byte flagsAsByte) { 132 this.flagsAsByte = flagsAsByte; 133 this.flags = decodeFlags(flagsAsByte); 134 } 135 136 public String getKey() { 137 return key; 138 } 139 140 public void setKey(String key) { 141 this.key = key; 142 } 143 144 public byte[] getData() { 145 if (data == null) { 146 return NO_DATA; 147 } 148 return data; 149 } 150 151 public void setData(byte[] data) { 152 this.data = data; 153 } 154 155 @Override 156 public String toString() { 157 String wmDate = ""; 158 if (watermark > 0) { 159 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 160 Watermark wm = Watermark.ofValue(watermark); 161 wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp())); 162 } 163 return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + getFlags() + ", key='" + key + '\'' 164 + ", data.length=" + ((data == null) ? 0 : data.length) + ", data=\"" + dataOverview(127) + "\"}"; 165 } 166 167 public String dataOverview(int maxLength) { 168 String overview = ""; 169 if (data != null && data.length > 0) { 170 String dataAsString = new String(data, StandardCharsets.UTF_8); 171 overview = dataAsString.substring(0, min(dataAsString.length(), maxLength)); 172 overview = overview.replaceAll("[^\\x20-\\x7e]", "."); 173 } 174 return overview; 175 } 176 177 @Override 178 public void writeExternal(ObjectOutput out) throws IOException { 179 out.writeLong(watermark); 180 // use a short for backward compatibility 181 out.writeShort(flagsAsByte); 182 out.writeObject(key); 183 if (data == null || data.length == 0) { 184 out.writeInt(0); 185 } else { 186 out.writeInt(data.length); 187 out.write(data); 188 } 189 } 190 191 @Override 192 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 193 this.watermark = in.readLong(); 194 // use a short for backward compatibility 195 this.flagsAsByte = (byte) in.readShort(); 196 this.key = (String) in.readObject(); 197 int dataLength = in.readInt(); 198 if (dataLength == 0) { 199 this.data = null; 200 } else { 201 this.data = new byte[dataLength]; 202 // not using in.readFully because it is not impl by Chronicle WireObjectInput 203 int pos = 0; 204 while (pos < dataLength) { 205 int byteRead = in.read(this.data, pos, dataLength - pos); 206 if (byteRead == -1) { 207 throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes"); 208 } 209 pos += byteRead; 210 } 211 } 212 } 213 214 protected short encodeFlags(EnumSet<Flag> enumSet) { 215 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 216 short ret = 0; 217 if (enumSet != null) { 218 for (Flag val : enumSet) { 219 ret = (short) (ret | (1 << val.ordinal())); 220 } 221 } 222 return ret; 223 } 224 225 protected EnumSet<Flag> decodeFlags(byte encoded) { 226 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 227 Map<Integer, Flag> ordinalMap = new HashMap<>(); 228 for (Flag val : Flag.ALL_OPTS) { 229 ordinalMap.put(val.ordinal(), val); 230 } 231 EnumSet<Flag> ret = EnumSet.noneOf(Flag.class); 232 int ordinal = 0; 233 for (byte i = 1; i != 0; i <<= 1) { 234 if ((i & encoded) != 0) { 235 ret.add(ordinalMap.get(ordinal)); 236 } 237 ++ordinal; 238 } 239 return ret; 240 } 241 242 @Override 243 public boolean equals(Object o) { 244 if (this == o) { 245 return true; 246 } 247 if (o == null || getClass() != o.getClass()) { 248 return false; 249 } 250 Record record = (Record) o; 251 return watermark == record.watermark && flagsAsByte == record.flagsAsByte && Objects.equals(key, record.key) 252 && Arrays.equals(data, record.data); 253 } 254 255 @Override 256 public int hashCode() { 257 int result = Objects.hash(watermark, flagsAsByte, key); 258 result = 31 * result + Arrays.hashCode(data); 259 return result; 260 } 261 262 public byte getFlagsAsByte() { 263 return flagsAsByte; 264 } 265 266 public enum Flag { 267 // limited to 8 flags so it can be encoded as a byte 268 DEFAULT, 269 COMMIT, 270 POISON_PILL, 271 EXTERNAL_VALUE, // The record value is stored outside of the record 272 INTERNAL1, // Reserved for internal use 273 INTERNAL2, 274 USER1, // Available for users 275 USER2; 276 277 public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class); 278 } 279 280 public byte[] getTraceContext() { 281 return traceContext; 282 } 283 284 public String getAppenderThread() { 285 return appenderThread; 286 } 287 288}