001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.ecm.platform.importer.mqueues.computation; 020 021import java.io.Externalizable; 022import java.io.IOException; 023import java.io.ObjectInput; 024import java.io.ObjectOutput; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.Map; 028 029/** 030 * @since 9.2 031 */ 032public class Record implements Externalizable { 033 // Externalizable do rely on serialVersionUID 034 static final long serialVersionUID = 20170529L; 035 private static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT); 036 037 public long watermark; 038 public EnumSet<Flag> flags; 039 public String key; 040 public byte[] data; 041 042 public enum Flag { 043 DEFAULT, BEGIN, COMMIT, POISON_PILL, INTERNAL; 044 045 public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class); 046 } 047 048 static public Record of(String key, byte[] data) { 049 return new Record(key, data, 0, null); 050 } 051 052 public Record() { 053 054 } 055 056 public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) { 057 this.key = key; 058 this.data = data; 059 this.watermark = watermark; 060 this.flags = flags; 061 } 062 063 @Override 064 public String toString() { 065 return "Record{" + 066 "watermark=" + watermark + 067 ", flags=" + flags + 068 ", key='" + key + '\'' + 069 ", data.length=" + ((data == null) ? 0 : data.length) + 070 '}'; 071 } 072 073 @Override 074 public void writeExternal(ObjectOutput out) throws IOException { 075 out.writeLong(watermark); 076 out.writeShort(encodeFlags()); 077 out.writeObject(key); 078 if (data == null || data.length == 0) { 079 out.writeInt(0); 080 } else { 081 out.writeInt(data.length); 082 out.write(data); 083 } 084 } 085 086 private short encodeFlags() { 087 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 088 short ret = 0; 089 if (flags != null) { 090 for (Flag val : flags) { 091 ret |= (1 << val.ordinal()); 092 } 093 } 094 return ret; 095 } 096 097 @Override 098 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 099 this.watermark = in.readLong(); 100 this.flags = decodeFlags(in.readShort()); 101 this.key = (String) in.readObject(); 102 int dataLength = in.readInt(); 103 if (dataLength == 0) { 104 this.data = null; 105 } else { 106 this.data = new byte[dataLength]; 107 // not using in.readFully because it is not impl by Chronicle WireObjectInput 108 int pos = 0; 109 while (pos < dataLength) { 110 pos += in.read(this.data, pos, dataLength - pos); 111 } 112 } 113 } 114 115 private EnumSet<Flag> decodeFlags(short encoded) { 116 // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database 117 Map<Integer, Flag> ordinalMap = new HashMap<>(); 118 for (Flag val : EnumSet.allOf(Flag.class)) { 119 ordinalMap.put(val.ordinal(), val); 120 } 121 EnumSet<Flag> ret = EnumSet.noneOf(Flag.class); 122 int ordinal = 0; 123 for (int i = 1; i != 0; i <<= 1) { 124 if ((i & encoded) != 0) { 125 ret.add(ordinalMap.get(ordinal)); 126 } 127 ++ordinal; 128 } 129 return ret; 130 } 131 132}