001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static java.lang.Math.min; 022 023import java.io.Externalizable; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectOutput; 027import java.io.UnsupportedEncodingException; 028import java.text.SimpleDateFormat; 029import java.util.Arrays; 030import java.util.Date; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Objects; 035 036/** 037 * Basic data object that contains: key, watermark, flag and data. 038 * 039 * @since 9.3 040 */ 041public class Record implements Externalizable { 042 protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT); 043 044 // Externalizable do rely on serialVersionUID 045 static final long serialVersionUID = 20170529L; 046 047 public long watermark; 048 049 public EnumSet<Flag> flags; 050 051 public String key; 052 053 public byte[] data; 054 055 public Record() { 056 057 } 058 059 public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) { 060 this.key = key; 061 this.data = data; 062 this.watermark = watermark; 063 this.flags = (flags == null) ? DEFAULT_FLAG : flags; 064 } 065 066 public static Record of(String key, byte[] data) { 067 return new Record(key, data, 0, DEFAULT_FLAG); 068 } 069 070 @Override 071 public String toString() { 072 String overview = ""; 073 String wmDate = ""; 074 if (data != null) { 075 try { 076 overview = ", data=\"" + new String(data, "UTF-8").substring(0, min(data.length, 127)) + '"'; 077 } catch (UnsupportedEncodingException e) { 078 overview = "unsupported encoding"; 079 } 080 overview = overview.replaceAll("[^\\x20-\\x7e]", "."); 081 } 082 if (watermark > 0) { 083 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 084 Watermark wm = Watermark.ofValue(watermark); 085 wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp())); 086 } 087 return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + flags + ", key='" + key + '\'' 088 + ", data.length=" + ((data == null) ? 0 : data.length) + overview + '}'; 089 } 090 091 @Override 092 public void writeExternal(ObjectOutput out) throws IOException { 093 out.writeLong(watermark); 094 out.writeShort(encodeFlags()); 095 out.writeObject(key); 096 if (data == null || data.length == 0) { 097 out.writeInt(0); 098 } else { 099 out.writeInt(data.length); 100 out.write(data); 101 } 102 } 103 104 protected short encodeFlags() { 105 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 106 short ret = 0; 107 if (flags != null) { 108 for (Flag val : flags) { 109 ret = (short) (ret | (1 << val.ordinal())); 110 } 111 } 112 return ret; 113 } 114 115 @Override 116 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 117 this.watermark = in.readLong(); 118 this.flags = decodeFlags(in.readShort()); 119 this.key = (String) in.readObject(); 120 int dataLength = in.readInt(); 121 if (dataLength == 0) { 122 this.data = null; 123 } else { 124 this.data = new byte[dataLength]; 125 // not using in.readFully because it is not impl by Chronicle WireObjectInput 126 int pos = 0; 127 while (pos < dataLength) { 128 int byteRead = in.read(this.data, pos, dataLength - pos); 129 if (byteRead == -1) { 130 throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes"); 131 } 132 pos += byteRead; 133 } 134 } 135 } 136 137 protected EnumSet<Flag> decodeFlags(short encoded) { 138 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 139 Map<Integer, Flag> ordinalMap = new HashMap<>(); 140 for (Flag val : Flag.ALL_OPTS) { 141 ordinalMap.put(val.ordinal(), val); 142 } 143 EnumSet<Flag> ret = EnumSet.noneOf(Flag.class); 144 int ordinal = 0; 145 for (int i = 1; i != 0; i <<= 1) { 146 if ((i & encoded) != 0) { 147 ret.add(ordinalMap.get(ordinal)); 148 } 149 ++ordinal; 150 } 151 return ret; 152 } 153 154 public enum Flag { 155 DEFAULT, COMMIT, POISON_PILL; 156 157 public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class); 158 } 159 160 @Override 161 public boolean equals(Object o) { 162 if (this == o) { 163 return true; 164 } 165 if (o == null || getClass() != o.getClass()) { 166 return false; 167 } 168 Record record = (Record) o; 169 return watermark == record.watermark && Objects.equals(flags, record.flags) && Objects.equals(key, record.key) 170 && Arrays.equals(data, record.data); 171 } 172 173 @Override 174 public int hashCode() { 175 int result = Objects.hash(watermark, flags, key); 176 result = 31 * result + Arrays.hashCode(data); 177 return result; 178 } 179}