001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static java.lang.Math.min;
022
023import java.io.Externalizable;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectOutput;
027import java.nio.charset.StandardCharsets;
028import java.text.SimpleDateFormat;
029import java.util.Arrays;
030import java.util.Date;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.Objects;
035
036import org.apache.avro.reflect.Nullable;
037
038import io.opencensus.trace.BlankSpan;
039import io.opencensus.trace.Tracing;
040
041/**
042 * Basic data object that contains: key, watermark, flag and data.
043 *
044 * @since 9.3
045 */
046@SuppressWarnings("deprecation")
047public class Record implements Externalizable {
048
049    protected static final byte[] NO_DATA = new byte[0];
050
051    // Externalizable do rely on serialVersionUID
052    static final long serialVersionUID = 2020_05_23L;
053
054    protected long watermark;
055
056    protected String key;
057
058    @Nullable
059    protected byte[] data;
060
061    // The enumSet representation of the flags is transient because serializer don't handle this type
062    protected transient EnumSet<Flag> flags;
063
064    protected byte flagsAsByte;
065
066    /** @since 11.1 used for tracing context propagation */
067    @Nullable
068    protected byte[] traceContext;
069
070    @Nullable
071    protected String appenderThread;
072
073    public Record() {
074        // Empty constructor required for deserialization
075    }
076
077    /**
078     * Creates a record using current watermark corresponding to the current time, with a default flag
079     */
080    public Record(String key, byte[] data) {
081        this(key, data, Watermark.ofNow().getValue(), EnumSet.of(Flag.DEFAULT));
082    }
083
084    /**
085     * Creates a record using a default flag
086     */
087    public Record(String key, byte[] data, long watermark) {
088        this(key, data, watermark, EnumSet.of(Flag.DEFAULT));
089    }
090
091    public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) {
092        this.key = key;
093        this.watermark = watermark;
094        setData(data);
095        setFlags(flags);
096        if (!(Tracing.getTracer().getCurrentSpan() instanceof BlankSpan)) {
097            traceContext = Tracing.getPropagationComponent()
098                                  .getBinaryFormat()
099                                  .toByteArray(Tracing.getTracer().getCurrentSpan().getContext());
100            appenderThread = Thread.currentThread().getName();
101        }
102    }
103
104    /**
105     * Creates a record using current timestamp and default flag
106     */
107    public static Record of(String key, byte[] data) {
108        return new Record(key, data);
109    }
110
111    public long getWatermark() {
112        return watermark;
113    }
114
115    public void setWatermark(long watermark) {
116        this.watermark = watermark;
117    }
118
119    public EnumSet<Flag> getFlags() {
120        if (flags == null) {
121            flags = decodeFlags(flagsAsByte);
122        }
123        return flags;
124    }
125
126    public void setFlags(EnumSet<Flag> flags) {
127        this.flags = flags;
128        this.flagsAsByte = (byte) encodeFlags(flags);
129    }
130
131    public void setFlags(byte flagsAsByte) {
132        this.flagsAsByte = flagsAsByte;
133        this.flags = decodeFlags(flagsAsByte);
134    }
135
136    public String getKey() {
137        return key;
138    }
139
140    public void setKey(String key) {
141        this.key = key;
142    }
143
144    public byte[] getData() {
145        if (data == null) {
146            return NO_DATA;
147        }
148        return data;
149    }
150
151    public void setData(byte[] data) {
152       this.data = data;
153    }
154
155    @Override
156    public String toString() {
157        String wmDate = "";
158        if (watermark > 0) {
159            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
160            Watermark wm = Watermark.ofValue(watermark);
161            wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp()));
162        }
163        return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + getFlags() + ", key='" + key + '\''
164                + ", data.length=" + ((data == null) ? 0 : data.length) + ", data=\"" + dataOverview(127) + "\"}";
165    }
166
167    public String dataOverview(int maxLength) {
168        String overview = "";
169        if (data != null && data.length > 0) {
170            String dataAsString = new String(data, StandardCharsets.UTF_8);
171            overview = dataAsString.substring(0, min(dataAsString.length(), maxLength));
172            overview = overview.replaceAll("[^\\x20-\\x7e]", ".");
173        }
174        return overview;
175    }
176
177    @Override
178    public void writeExternal(ObjectOutput out) throws IOException {
179        out.writeLong(watermark);
180        // use a short for backward compatibility
181        out.writeShort(flagsAsByte);
182        out.writeObject(key);
183        if (data == null || data.length == 0) {
184            out.writeInt(0);
185        } else {
186            out.writeInt(data.length);
187            out.write(data);
188        }
189    }
190
191    @Override
192    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
193        this.watermark = in.readLong();
194        // use a short for backward compatibility
195        this.flagsAsByte = (byte) in.readShort();
196        this.key = (String) in.readObject();
197        int dataLength = in.readInt();
198        if (dataLength == 0) {
199            this.data = null;
200        } else {
201            this.data = new byte[dataLength];
202            // not using in.readFully because it is not impl by Chronicle WireObjectInput
203            int pos = 0;
204            while (pos < dataLength) {
205                int byteRead = in.read(this.data, pos, dataLength - pos);
206                if (byteRead == -1) {
207                    throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes");
208                }
209                pos += byteRead;
210            }
211        }
212    }
213
214    protected short encodeFlags(EnumSet<Flag> enumSet) {
215        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
216        short ret = 0;
217        if (enumSet != null) {
218            for (Flag val : enumSet) {
219                ret = (short) (ret | (1 << val.ordinal()));
220            }
221        }
222        return ret;
223    }
224
225    protected EnumSet<Flag> decodeFlags(byte encoded) {
226        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
227        Map<Integer, Flag> ordinalMap = new HashMap<>();
228        for (Flag val : Flag.ALL_OPTS) {
229            ordinalMap.put(val.ordinal(), val);
230        }
231        EnumSet<Flag> ret = EnumSet.noneOf(Flag.class);
232        int ordinal = 0;
233        for (byte i = 1; i != 0; i <<= 1) {
234            if ((i & encoded) != 0) {
235                ret.add(ordinalMap.get(ordinal));
236            }
237            ++ordinal;
238        }
239        return ret;
240    }
241
242    @Override
243    public boolean equals(Object o) {
244        if (this == o) {
245            return true;
246        }
247        if (o == null || getClass() != o.getClass()) {
248            return false;
249        }
250        Record record = (Record) o;
251        return watermark == record.watermark && flagsAsByte == record.flagsAsByte && Objects.equals(key, record.key)
252                && Arrays.equals(data, record.data);
253    }
254
255    @Override
256    public int hashCode() {
257        int result = Objects.hash(watermark, flagsAsByte, key);
258        result = 31 * result + Arrays.hashCode(data);
259        return result;
260    }
261
262    public byte getFlagsAsByte() {
263        return flagsAsByte;
264    }
265
266    public enum Flag {
267        // limited to 8 flags so it can be encoded as a byte
268        DEFAULT,
269        COMMIT,
270        POISON_PILL,
271        EXTERNAL_VALUE, // The record value is stored outside of the record
272        INTERNAL1, // Reserved for internal use
273        INTERNAL2,
274        USER1, // Available for users
275        USER2;
276
277        public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class);
278    }
279
280    public byte[] getTraceContext() {
281        return traceContext;
282    }
283
284    public String getAppenderThread() {
285        return appenderThread;
286    }
287
288}