001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.ecm.platform.importer.mqueues.computation;
020
021import java.io.Externalizable;
022import java.io.IOException;
023import java.io.ObjectInput;
024import java.io.ObjectOutput;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.Map;
028
029/**
030 * @since 9.2
031 */
032public class Record implements Externalizable {
033    // Externalizable do rely on serialVersionUID
034    static final long serialVersionUID = 20170529L;
035    private static final EnumSet<Flag> DEFAULT_FLAG = EnumSet.of(Flag.DEFAULT);
036
037    public long watermark;
038    public EnumSet<Flag> flags;
039    public String key;
040    public byte[] data;
041
042    public enum Flag {
043        DEFAULT, BEGIN, COMMIT, POISON_PILL, INTERNAL;
044
045        public static final EnumSet<Flag> ALL_OPTS = EnumSet.allOf(Flag.class);
046    }
047
048    static public Record of(String key, byte[] data) {
049        return new Record(key, data, 0, null);
050    }
051
052    public Record() {
053
054    }
055
056    public Record(String key, byte[] data, long watermark, EnumSet<Flag> flags) {
057        this.key = key;
058        this.data = data;
059        this.watermark = watermark;
060        this.flags = flags;
061    }
062
063    @Override
064    public String toString() {
065        return "Record{" +
066                "watermark=" + watermark +
067                ", flags=" + flags +
068                ", key='" + key + '\'' +
069                ", data.length=" + ((data == null) ? 0 : data.length) +
070                '}';
071    }
072
073    @Override
074    public void writeExternal(ObjectOutput out) throws IOException {
075        out.writeLong(watermark);
076        out.writeShort(encodeFlags());
077        out.writeObject(key);
078        if (data == null || data.length == 0) {
079            out.writeInt(0);
080        } else {
081            out.writeInt(data.length);
082            out.write(data);
083        }
084    }
085
086    private short encodeFlags() {
087        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
088        short ret = 0;
089        if (flags != null) {
090            for (Flag val : flags) {
091                ret |= (1 << val.ordinal());
092            }
093        }
094        return ret;
095    }
096
097    @Override
098    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
099        this.watermark = in.readLong();
100        this.flags = decodeFlags(in.readShort());
101        this.key = (String) in.readObject();
102        int dataLength = in.readInt();
103        if (dataLength == 0) {
104            this.data = null;
105        } else {
106            this.data = new byte[dataLength];
107            // not using in.readFully because it is not impl by Chronicle WireObjectInput
108            int pos = 0;
109            while (pos < dataLength) {
110                pos += in.read(this.data, pos, dataLength - pos);
111            }
112        }
113    }
114
115    private EnumSet<Flag> decodeFlags(short encoded) {
116        // adapted from Adamski: http://stackoverflow.com/questions/2199399/storing-enumset-in-a-database
117        Map<Integer, Flag> ordinalMap = new HashMap<>();
118        for (Flag val : EnumSet.allOf(Flag.class)) {
119            ordinalMap.put(val.ordinal(), val);
120        }
121        EnumSet<Flag> ret = EnumSet.noneOf(Flag.class);
122        int ordinal = 0;
123        for (int i = 1; i != 0; i <<= 1) {
124            if ((i & encoded) != 0) {
125                ret.add(ordinalMap.get(ordinal));
126            }
127            ++ordinal;
128        }
129        return ret;
130    }
131
132}