001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static java.lang.Math.min;
022
023import java.io.Externalizable;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectOutput;
027import java.io.UnsupportedEncodingException;
028import java.text.SimpleDateFormat;
029import java.util.Arrays;
030import java.util.Date;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.Objects;
035
036/**
037 * Basic data object that contains: key, watermark, flag and data.
038 *
039 * @since 9.3
040 */
041@SuppressWarnings("deprecation")
042public class Record implements Externalizable {
043    protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT);
044
045    protected static final byte[] NO_DATA = new byte[0];
046
047    // Externalizable do rely on serialVersionUID
048    static final long serialVersionUID = 2017_05_29L;
049
050    /** @deprecated 10.2 use {@link #getWatermark()} or {@link #setWatermark(long)} instead */
051    @Deprecated
052    public long watermark;
053
054    /** @deprecated 10.2 use {@link #getKey()} or {@link #setKey(String)} instead */
055    @Deprecated
056    public String key;
057
058    /** @deprecated 10.2 use {@link #getData()} or {@link #setData(byte[])} instead */
059    @Deprecated
060    // We can not use null because Nullable on byte[] requires avro 1.7.6 cf AVRO-1401
061    public byte[] data = NO_DATA;
062
063    /** @deprecated 10.2 use {@link #getFlags()} or {@link #setFlags(EnumSet)} instead */
064    @SuppressWarnings("DeprecatedIsStillUsed")
065    @Deprecated
066    // The enumSet representation of the flags is transient because serializer don't handle this type
067    public transient EnumSet<Flag> flags;
068
069    protected byte flagsAsByte;
070
071    public Record() {
072        // Empty constructor required for deserialization
073    }
074
075    /**
076     * Creates a record using current watermark corresponding to the current time, with a default flag
077     */
078    public Record(String key, byte[] data) {
079        this(key, data, Watermark.ofNow().getValue(), DEFAULT_FLAG);
080    }
081
082    /**
083     * Creates a record using a default flag
084     */
085    public Record(String key, byte[] data, long watermark) {
086        this(key, data, watermark, DEFAULT_FLAG);
087    }
088
089    public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) {
090        this.key = key;
091        this.watermark = watermark;
092        setData(data);
093        setFlags(flags);
094    }
095
096    /**
097     * Creates a record using current timestamp and default flag
098     */
099    public static Record of(String key, byte[] data) {
100        return new Record(key, data);
101    }
102
103    public long getWatermark() {
104        return watermark;
105    }
106
107    public void setWatermark(long watermark) {
108        this.watermark = watermark;
109    }
110
111    public EnumSet<Flag> getFlags() {
112        if (flags == null) {
113            flags = decodeFlags(flagsAsByte);
114        }
115        return flags;
116    }
117
118    public void setFlags(EnumSet<Flag> flags) {
119        this.flags = flags;
120        this.flagsAsByte = (byte) encodeFlags(flags);
121    }
122
123    public String getKey() {
124        return key;
125    }
126
127    public void setKey(String key) {
128        this.key = key;
129    }
130
131    public byte[] getData() {
132        return data;
133    }
134
135    public void setData(byte[] data) {
136        if (data != null) {
137            this.data = data;
138        } else {
139            this.data = NO_DATA;
140        }
141    }
142
143    @Override
144    public String toString() {
145        String overview = "";
146        String wmDate = "";
147        if (data != null && data.length > 0) {
148            try {
149                overview = ", data=\"" + new String(data, "UTF-8").substring(0, min(data.length, 127)) + '"';
150            } catch (UnsupportedEncodingException e) {
151                overview = "unsupported encoding";
152            }
153            overview = overview.replaceAll("[^\\x20-\\x7e]", ".");
154        }
155        if (watermark > 0) {
156            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
157            Watermark wm = Watermark.ofValue(watermark);
158            wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp()));
159        }
160        return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + getFlags() + ", key='" + key + '\''
161                + ", data.length=" + ((data == null) ? 0 : data.length) + overview + '}';
162    }
163
164    @Override
165    public void writeExternal(ObjectOutput out) throws IOException {
166        out.writeLong(watermark);
167        // use a short for backward compatibility
168        out.writeShort(flagsAsByte);
169        out.writeObject(key);
170        if (data == null || data.length == 0) {
171            out.writeInt(0);
172        } else {
173            out.writeInt(data.length);
174            out.write(data);
175        }
176    }
177
178    @Override
179    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
180        this.watermark = in.readLong();
181        // use a short for backward compatibility
182        this.flagsAsByte = (byte) in.readShort();
183        this.key = (String) in.readObject();
184        int dataLength = in.readInt();
185        if (dataLength == 0) {
186            this.data = NO_DATA;
187        } else {
188            this.data = new byte[dataLength];
189            // not using in.readFully because it is not impl by Chronicle WireObjectInput
190            int pos = 0;
191            while (pos < dataLength) {
192                int byteRead = in.read(this.data, pos, dataLength - pos);
193                if (byteRead == -1) {
194                    throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes");
195                }
196                pos += byteRead;
197            }
198        }
199    }
200
201    protected short encodeFlags(EnumSet<Flag> enumSet) {
202        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
203        short ret = 0;
204        if (enumSet != null) {
205            for (Flag val : enumSet) {
206                ret = (short) (ret | (1 << val.ordinal()));
207            }
208        }
209        return ret;
210    }
211
212    protected EnumSet<Flag> decodeFlags(short encoded) {
213        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
214        Map<Integer, Flag> ordinalMap = new HashMap<>();
215        for (Flag val : Flag.ALL_OPTS) {
216            ordinalMap.put(val.ordinal(), val);
217        }
218        EnumSet<Flag> ret = EnumSet.noneOf(Flag.class);
219        int ordinal = 0;
220        for (int i = 1; i != 0; i <<= 1) {
221            if ((i & encoded) != 0) {
222                ret.add(ordinalMap.get(ordinal));
223            }
224            ++ordinal;
225        }
226        return ret;
227    }
228
229    @Override
230    public boolean equals(Object o) {
231        if (this == o) {
232            return true;
233        }
234        if (o == null || getClass() != o.getClass()) {
235            return false;
236        }
237        Record record = (Record) o;
238        return watermark == record.watermark && flagsAsByte == record.flagsAsByte && Objects.equals(key, record.key)
239                && Arrays.equals(data, record.data);
240    }
241
242    @Override
243    public int hashCode() {
244        int result = Objects.hash(watermark, flagsAsByte, key);
245        result = 31 * result + Arrays.hashCode(data);
246        return result;
247    }
248
249    public enum Flag {
250        // limited to 8 flags so it can be encoded as a byte
251        DEFAULT, COMMIT, POISON_PILL, SKIP, TRACE, PAUSE, USER1, USER2;
252
253        public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class);
254    }
255}