001/*
002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.pattern;
020
021import java.io.IOException;
022import java.io.ObjectInput;
023import java.io.ObjectOutput;
024import java.util.Arrays;
025import java.util.Objects;
026
027/**
028 * Simple key value message.
029 *
030 * @since 9.1
031 */
032public class KeyValueMessage implements Message {
033    // Externalizable do rely on serialVersionUID
034    static final long serialVersionUID = 20170529L;
035
036    public static final KeyValueMessage POISON_PILL = new KeyValueMessage("_POISON_PILL_", null, true, false);
037
038    protected String key;
039
040    protected byte[] value;
041
042    protected boolean poisonPill = false;
043
044    protected boolean forceBatch = false;
045
046    public KeyValueMessage() {
047    }
048
049    protected KeyValueMessage(String key, byte[] value, boolean poisonPill, boolean forceBatch) {
050        this.key = Objects.requireNonNull(key);
051        this.value = value;
052        this.poisonPill = poisonPill;
053        this.forceBatch = forceBatch;
054    }
055
056    public static KeyValueMessage of(String key, byte[] value) {
057        return new KeyValueMessage(key, value, false, false);
058    }
059
060    public static KeyValueMessage of(String key) {
061        return new KeyValueMessage(key, null, false, false);
062    }
063
064    /**
065     * A message that force the batch.
066     */
067    public static KeyValueMessage ofForceBatch(String key, byte[] value) {
068        return new KeyValueMessage(key, value, false, true);
069    }
070
071    public static KeyValueMessage ofForceBatch(String key) {
072        return new KeyValueMessage(key, null, false, true);
073    }
074
075    public String key() {
076        return key;
077    }
078
079    public byte[] value() {
080        return value;
081    }
082
083    @Override
084    public String getId() {
085        return key;
086    }
087
088    @Override
089    public boolean poisonPill() {
090        return poisonPill;
091    }
092
093    @Override
094    public boolean forceBatch() {
095        return forceBatch;
096    }
097
098    @Override
099    public void writeExternal(ObjectOutput out) throws IOException {
100        out.writeObject(key);
101        out.writeBoolean(poisonPill);
102        out.writeBoolean(forceBatch);
103        if (value == null || value.length == 0) {
104            out.writeInt(0);
105        } else {
106            out.writeInt(value.length);
107            out.write(value);
108        }
109    }
110
111    @Override
112    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
113        this.key = (String) in.readObject();
114        this.poisonPill = in.readBoolean();
115        this.forceBatch = in.readBoolean();
116        int dataLength = in.readInt();
117        if (dataLength == 0) {
118            this.value = null;
119        } else {
120            this.value = new byte[dataLength];
121            // not using in.readFully because it is not impl by Chronicle WireObjectInput
122            int pos = 0;
123            while (pos < dataLength) {
124                pos += in.read(this.value, pos, dataLength - pos);
125            }
126        }
127    }
128
129    @Override
130    public boolean equals(Object o) {
131        if (this == o)
132            return true;
133        if (o == null || getClass() != o.getClass())
134            return false;
135
136        KeyValueMessage keyValueMessage = (KeyValueMessage) o;
137
138        return poisonPill == keyValueMessage.poisonPill && forceBatch == keyValueMessage.forceBatch
139                && (key != null ? key.equals(keyValueMessage.key) : keyValueMessage.key == null);
140
141    }
142
143    @Override
144    public int hashCode() {
145        int result = key.hashCode();
146        result = 31 * result + Arrays.hashCode(value);
147        result = 31 * result + (poisonPill ? 1 : 0);
148        result = 31 * result + (forceBatch ? 1 : 0);
149        return result;
150    }
151
152    @Override
153    public String toString() {
154        return String.format("KeyValueMessage(\"%s\", len:%d%s%s)", key, (value != null) ? value.length : 0,
155                poisonPill ? ", poison" : "", forceBatch ? ", batch" : "");
156    }
157}