001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.computation; 020 021import static java.lang.Math.min; 022 023import java.io.Externalizable; 024import java.io.IOException; 025import java.io.ObjectInput; 026import java.io.ObjectOutput; 027import java.io.UnsupportedEncodingException; 028import java.text.SimpleDateFormat; 029import java.util.Date; 030import java.util.EnumSet; 031import java.util.HashMap; 032import java.util.Map; 033 034/** 035 * Basic data object that contains: key, watermark, flag and data. 036 * 037 * @since 9.3 038 */ 039public class Record implements Externalizable { 040 protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT); 041 042 // Externalizable do rely on serialVersionUID 043 static final long serialVersionUID = 20170529L; 044 045 public long watermark; 046 047 public EnumSet<Flag> flags; 048 049 public String key; 050 051 public byte[] data; 052 053 public Record() { 054 055 } 056 057 public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) { 058 this.key = key; 059 this.data = data; 060 this.watermark = watermark; 061 this.flags = flags; 062 } 063 064 public static Record of(String key, byte[] data) { 065 return new Record(key, data, 0, DEFAULT_FLAG); 066 } 067 068 @Override 069 public String toString() { 070 String overview = ""; 071 String wmDate = ""; 072 if (data != null) { 073 try { 074 overview = ", data=\"" + new String(data, "UTF-8").substring(0, min(data.length, 127)) + '"'; 075 } catch (UnsupportedEncodingException e) { 076 overview = "unsupported encoding"; 077 } 078 overview = overview.replaceAll("[^\\x20-\\x7e]", "."); 079 } 080 if (watermark > 0) { 081 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 082 Watermark wm = Watermark.ofValue(watermark); 083 wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp())); 084 } 085 return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + flags + ", key='" + key + '\'' 086 + ", data.length=" + ((data == null) ? 0 : data.length) + overview + '}'; 087 } 088 089 @Override 090 public void writeExternal(ObjectOutput out) throws IOException { 091 out.writeLong(watermark); 092 out.writeShort(encodeFlags()); 093 out.writeObject(key); 094 if (data == null || data.length == 0) { 095 out.writeInt(0); 096 } else { 097 out.writeInt(data.length); 098 out.write(data); 099 } 100 } 101 102 protected short encodeFlags() { 103 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 104 short ret = 0; 105 if (flags != null) { 106 for (Flag val : flags) { 107 ret = (short) (ret | (1 << val.ordinal())); 108 } 109 } 110 return ret; 111 } 112 113 @Override 114 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 115 this.watermark = in.readLong(); 116 this.flags = decodeFlags(in.readShort()); 117 this.key = (String) in.readObject(); 118 int dataLength = in.readInt(); 119 if (dataLength == 0) { 120 this.data = null; 121 } else { 122 this.data = new byte[dataLength]; 123 // not using in.readFully because it is not impl by Chronicle WireObjectInput 124 int pos = 0; 125 while (pos < dataLength) { 126 int byteRead = in.read(this.data, pos, dataLength - pos); 127 if (byteRead == -1) { 128 throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes"); 129 } 130 pos += byteRead; 131 } 132 } 133 } 134 135 protected EnumSet<Flag> decodeFlags(short encoded) { 136 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 137 Map<Integer, Flag> ordinalMap = new HashMap<>(); 138 for (Flag val : Flag.ALL_OPTS) { 139 ordinalMap.put(val.ordinal(), val); 140 } 141 EnumSet<Flag> ret = EnumSet.noneOf(Flag.class); 142 int ordinal = 0; 143 for (int i = 1; i != 0; i <<= 1) { 144 if ((i & encoded) != 0) { 145 ret.add(ordinalMap.get(ordinal)); 146 } 147 ++ordinal; 148 } 149 return ret; 150 } 151 152 public enum Flag { 153 DEFAULT, COMMIT, POISON_PILL; 154 155 public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class); 156 } 157 158}