001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.computation;
020
021import static java.lang.Math.min;
022
023import java.io.Externalizable;
024import java.io.IOException;
025import java.io.ObjectInput;
026import java.io.ObjectOutput;
027import java.io.UnsupportedEncodingException;
028import java.text.SimpleDateFormat;
029import java.util.Arrays;
030import java.util.Date;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.Objects;
035
036/**
037 * Basic data object that contains: key, watermark, flag and data.
038 *
039 * @since 9.3
040 */
041public class Record implements Externalizable {
042    protected static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT);
043
044    // Externalizable do rely on serialVersionUID
045    static final long serialVersionUID = 20170529L;
046
047    public long watermark;
048
049    public EnumSet<Flag> flags;
050
051    public String key;
052
053    public byte[] data;
054
055    public Record() {
056
057    }
058
059    public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) {
060        this.key = key;
061        this.data = data;
062        this.watermark = watermark;
063        this.flags = (flags == null) ? DEFAULT_FLAG : flags;
064    }
065
066    public static Record of(String key, byte[] data) {
067        return new Record(key, data, 0, DEFAULT_FLAG);
068    }
069
070    @Override
071    public String toString() {
072        String overview = "";
073        String wmDate = "";
074        if (data != null) {
075            try {
076                overview = ", data=\"" + new String(data, "UTF-8").substring(0, min(data.length, 127)) + '"';
077            } catch (UnsupportedEncodingException e) {
078                overview = "unsupported encoding";
079            }
080            overview = overview.replaceAll("[^\\x20-\\x7e]", ".");
081        }
082        if (watermark > 0) {
083            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
084            Watermark wm = Watermark.ofValue(watermark);
085            wmDate = ", wmDate=" + dateFormat.format(new Date(wm.getTimestamp()));
086        }
087        return "Record{" + "watermark=" + watermark + wmDate + ", flags=" + flags + ", key='" + key + '\''
088                + ", data.length=" + ((data == null) ? 0 : data.length) + overview + '}';
089    }
090
091    @Override
092    public void writeExternal(ObjectOutput out) throws IOException {
093        out.writeLong(watermark);
094        out.writeShort(encodeFlags());
095        out.writeObject(key);
096        if (data == null || data.length == 0) {
097            out.writeInt(0);
098        } else {
099            out.writeInt(data.length);
100            out.write(data);
101        }
102    }
103
104    protected short encodeFlags() {
105        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
106        short ret = 0;
107        if (flags != null) {
108            for (Flag val : flags) {
109                ret = (short) (ret | (1 << val.ordinal()));
110            }
111        }
112        return ret;
113    }
114
115    @Override
116    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
117        this.watermark = in.readLong();
118        this.flags = decodeFlags(in.readShort());
119        this.key = (String) in.readObject();
120        int dataLength = in.readInt();
121        if (dataLength == 0) {
122            this.data = null;
123        } else {
124            this.data = new byte[dataLength];
125            // not using in.readFully because it is not impl by Chronicle WireObjectInput
126            int pos = 0;
127            while (pos < dataLength) {
128                int byteRead = in.read(this.data, pos, dataLength - pos);
129                if (byteRead == -1) {
130                    throw new IllegalStateException("Corrupted stream, can not read " + dataLength + " bytes");
131                }
132                pos += byteRead;
133            }
134        }
135    }
136
137    protected EnumSet<Flag> decodeFlags(short encoded) {
138        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
139        Map<Integer, Flag> ordinalMap = new HashMap<>();
140        for (Flag val : Flag.ALL_OPTS) {
141            ordinalMap.put(val.ordinal(), val);
142        }
143        EnumSet<Flag> ret = EnumSet.noneOf(Flag.class);
144        int ordinal = 0;
145        for (int i = 1; i != 0; i <<= 1) {
146            if ((i & encoded) != 0) {
147                ret.add(ordinalMap.get(ordinal));
148            }
149            ++ordinal;
150        }
151        return ret;
152    }
153
154    public enum Flag {
155        DEFAULT, COMMIT, POISON_PILL;
156
157        public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class);
158    }
159
160    @Override
161    public boolean equals(Object o) {
162        if (this == o) {
163            return true;
164        }
165        if (o == null || getClass() != o.getClass()) {
166            return false;
167        }
168        Record record = (Record) o;
169        return watermark == record.watermark && Objects.equals(flags, record.flags) && Objects.equals(key, record.key)
170                && Arrays.equals(data, record.data);
171    }
172
173    @Override
174    public int hashCode() {
175        int result = Objects.hash(watermark, flags, key);
176        result = 31 * result + Arrays.hashCode(data);
177        return result;
178    }
179}