001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static java.lang.Math.min;
022
023import java.io.Externalizable;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectOutput;
027import java.nio.charset.StandardCharsets;
028import java.text.SimpleDateFormat;
029import java.util.Arrays;
030import java.util.Date;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.Objects;
035
036/**
037 * Basic data object that contains: key, watermark, flag and data.
038 *
039 * @since 9.3
040 */
041@SuppressWarnings("deprecation")
042public class Record implements Externalizable {
043    protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT);
044
045    protected static final byte[] NO_DATA = new byte[0];
046
047    // Externalizable do rely on serialVersionUID
048    static final long serialVersionUID = 2017_05_29L;
049
050    /** @deprecated 10.2 use {@link #getWatermark()} or {@link #setWatermark(long)} instead */
051    @Deprecated
052    public long watermark;
053
054    /** @deprecated 10.2 use {@link #getKey()} or {@link #setKey(String)} instead */
055    @Deprecated
056    public String key;
057
058    /** @deprecated 10.2 use {@link #getData()} or {@link #setData(byte[])} instead */
059    @Deprecated
060    // We can not use null because Nullable on byte[] requires avro 1.7.6 cf AVRO-1401
061    public byte[] data = NO_DATA;
062
063    /** @deprecated 10.2 use {@link #getFlags()} or {@link #setFlags(EnumSet)} instead */
064    @SuppressWarnings("DeprecatedIsStillUsed")
065    @Deprecated
066    // The enumSet representation of the flags is transient because serializer don't handle this type
067    public transient EnumSet<Flag> flags;
068
069    protected byte flagsAsByte;
070
071    public Record() {
072        // Empty constructor required for deserialization
073    }
074
075    /**
076     * Creates a record using current watermark corresponding to the current time, with a default flag
077     */
078    public Record(String key, byte[] data) {
079        this(key, data, Watermark.ofNow().getValue(), DEFAULT_FLAG);
080    }
081
082    /**
083     * Creates a record using a default flag
084     */
085    public Record(String key, byte[] data, long watermark) {
086        this(key, data, watermark, DEFAULT_FLAG);
087    }
088
089    public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) {
090        this.key = key;
091        this.watermark = watermark;
092        setData(data);
093        setFlags(flags);
094    }
095
096    /**
097     * Creates a record using current timestamp and default flag
098     */
099    public static Record of(String key, byte[] data) {
100        return new Record(key, data);
101    }
102
103    public long getWatermark() {
104        return watermark;
105    }
106
107    public void setWatermark(long watermark) {
108        this.watermark = watermark;
109    }
110
111    public EnumSet<Flag> getFlags() {
112        if (flags == null) {
113            flags = decodeFlags(flagsAsByte);
114        }
115        return flags;
116    }
117
118    public void setFlags(EnumSet<Flag> flags) {
119        this.flags = flags;
120        this.flagsAsByte = (byte) encodeFlags(flags);
121    }
122
123    public String getKey() {
124        return key;
125    }
126
127    public void setKey(String key) {
128        this.key = key;
129    }
130
131    public byte[] getData() {
132        return data;
133    }
134
135    public void setData(byte[] data) {
136        if (data != null) {
137            this.data = data;
138        } else {
139            this.data = NO_DATA;
140        }
141    }
142
143    @Override
144    public String toString() {
145        String wmDate = "";
146        if (watermark > 0) {
147            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
148            Watermark wm = Watermark.ofValue(watermark);
149            wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp()));
150        }
151        return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + getFlags() + ", key='" + key + '\''
152                + ", data.length=" + ((data == null) ? 0 : data.length) + ", data=\"" + dataOverview(127) + "\"}";
153    }
154
155    public String dataOverview(int maxLength) {
156        String overview = "";
157        if (data != null && data.length > 0) {
158            String dataAsString = new String(data, StandardCharsets.UTF_8);
159            overview = dataAsString.substring(0, min(dataAsString.length(), maxLength));
160            overview = overview.replaceAll("[^\\x20-\\x7e]", ".");
161        }
162        return overview;
163    }
164
165    @Override
166    public void writeExternal(ObjectOutput out) throws IOException {
167        out.writeLong(watermark);
168        // use a short for backward compatibility
169        out.writeShort(flagsAsByte);
170        out.writeObject(key);
171        if (data == null || data.length == 0) {
172            out.writeInt(0);
173        } else {
174            out.writeInt(data.length);
175            out.write(data);
176        }
177    }
178
179    @Override
180    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
181        this.watermark = in.readLong();
182        // use a short for backward compatibility
183        this.flagsAsByte = (byte) in.readShort();
184        this.key = (String) in.readObject();
185        int dataLength = in.readInt();
186        if (dataLength == 0) {
187            this.data = NO_DATA;
188        } else {
189            this.data = new byte[dataLength];
190            // not using in.readFully because it is not impl by Chronicle WireObjectInput
191            int pos = 0;
192            while (pos < dataLength) {
193                int byteRead = in.read(this.data, pos, dataLength - pos);
194                if (byteRead == -1) {
195                    throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes");
196                }
197                pos += byteRead;
198            }
199        }
200    }
201
202    protected short encodeFlags(EnumSet<Flag> enumSet) {
203        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
204        short ret = 0;
205        if (enumSet != null) {
206            for (Flag val : enumSet) {
207                ret = (short) (ret | (1 << val.ordinal()));
208            }
209        }
210        return ret;
211    }
212
213    protected EnumSet<Flag> decodeFlags(byte encoded) {
214        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
215        Map<Integer, Flag> ordinalMap = new HashMap<>();
216        for (Flag val : Flag.ALL_OPTS) {
217            ordinalMap.put(val.ordinal(), val);
218        }
219        EnumSet<Flag> ret = EnumSet.noneOf(Flag.class);
220        int ordinal = 0;
221        for (byte i = 1; i != 0; i <<= 1) {
222            if ((i & encoded) != 0) {
223                ret.add(ordinalMap.get(ordinal));
224            }
225            ++ordinal;
226        }
227        return ret;
228    }
229
230    @Override
231    public boolean equals(Object o) {
232        if (this == o) {
233            return true;
234        }
235        if (o == null || getClass() != o.getClass()) {
236            return false;
237        }
238        Record record = (Record) o;
239        return watermark == record.watermark && flagsAsByte == record.flagsAsByte && Objects.equals(key, record.key)
240                && Arrays.equals(data, record.data);
241    }
242
243    @Override
244    public int hashCode() {
245        int result = Objects.hash(watermark, flagsAsByte, key);
246        result = 31 * result + Arrays.hashCode(data);
247        return result;
248    }
249
250    public enum Flag {
251        // limited to 8 flags so it can be encoded as a byte
252        DEFAULT, COMMIT, POISON_PILL, SKIP, TRACE, PAUSE, USER1, USER2;
253
254        public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class);
255    }
256}