001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static java.lang.Math.min;
022
023import java.io.Externalizable;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectOutput;
027import java.io.UnsupportedEncodingException;
028import java.text.SimpleDateFormat;
029import java.util.Date;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.Map;
033
034/**
035 * Basic data object that contains: key, watermark, flag and data.
036 *
037 * @since 9.3
038 */
039public class Record implements Externalizable {
040    protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT);
041
042    // Externalizable do rely on serialVersionUID
043    static final long serialVersionUID = 20170529L;
044
045    public long watermark;
046
047    public EnumSet<Flag> flags;
048
049    public String key;
050
051    public byte[] data;
052
053    public Record() {
054
055    }
056
057    public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) {
058        this.key = key;
059        this.data = data;
060        this.watermark = watermark;
061        this.flags = flags;
062    }
063
064    public static Record of(String key, byte[] data) {
065        return new Record(key, data, 0, DEFAULT_FLAG);
066    }
067
068    @Override
069    public String toString() {
070        String overview = "";
071        String wmDate = "";
072        if (data != null) {
073            try {
074                overview = ", data=\"" + new String(data, "UTF-8").substring(0, min(data.length, 127)) + '"';
075            } catch (UnsupportedEncodingException e) {
076                overview = "unsupported encoding";
077            }
078            overview = overview.replaceAll("[^\\x20-\\x7e]", ".");
079        }
080        if (watermark > 0) {
081            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
082            Watermark wm = Watermark.ofValue(watermark);
083            wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp()));
084        }
085        return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + flags + ", key='" + key + '\''
086                + ", data.length=" + ((data == null) ? 0 : data.length) + overview + '}';
087    }
088
089    @Override
090    public void writeExternal(ObjectOutput out) throws IOException {
091        out.writeLong(watermark);
092        out.writeShort(encodeFlags());
093        out.writeObject(key);
094        if (data == null || data.length == 0) {
095            out.writeInt(0);
096        } else {
097            out.writeInt(data.length);
098            out.write(data);
099        }
100    }
101
102    protected short encodeFlags() {
103        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
104        short ret = 0;
105        if (flags != null) {
106            for (Flag val : flags) {
107                ret = (short) (ret | (1 << val.ordinal()));
108            }
109        }
110        return ret;
111    }
112
113    @Override
114    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
115        this.watermark = in.readLong();
116        this.flags = decodeFlags(in.readShort());
117        this.key = (String) in.readObject();
118        int dataLength = in.readInt();
119        if (dataLength == 0) {
120            this.data = null;
121        } else {
122            this.data = new byte[dataLength];
123            // not using in.readFully because it is not impl by Chronicle WireObjectInput
124            int pos = 0;
125            while (pos < dataLength) {
126                int byteRead = in.read(this.data, pos, dataLength - pos);
127                if (byteRead == -1) {
128                    throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes");
129                }
130                pos += byteRead;
131            }
132        }
133    }
134
135    protected EnumSet<Flag> decodeFlags(short encoded) {
136        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
137        Map<Integer, Flag> ordinalMap = new HashMap<>();
138        for (Flag val : Flag.ALL_OPTS) {
139            ordinalMap.put(val.ordinal(), val);
140        }
141        EnumSet<Flag> ret = EnumSet.noneOf(Flag.class);
142        int ordinal = 0;
143        for (int i = 1; i != 0; i <<= 1) {
144            if ((i & encoded) != 0) {
145                ret.add(ordinalMap.get(ordinal));
146            }
147            ++ordinal;
148        }
149        return ret;
150    }
151
152    public enum Flag {
153        DEFAULT, COMMIT, POISON_PILL;
154
155        public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class);
156    }
157
158}