001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.pattern; 020 021import java.io.IOException; 022import java.io.ObjectInput; 023import java.io.ObjectOutput; 024import java.util.Arrays; 025import java.util.Objects; 026 027/** 028 * Simple key value message. 029 * 030 * @since 9.1 031 */ 032public class KeyValueMessage implements Message { 033 // Externalizable do rely on serialVersionUID 034 static final long serialVersionUID = 20170529L; 035 036 public static final KeyValueMessage POISON_PILL = new KeyValueMessage("_POISON_PILL_", null, true, false); 037 038 protected String key; 039 040 protected byte[] value; 041 042 protected boolean poisonPill = false; 043 044 protected boolean forceBatch = false; 045 046 public KeyValueMessage() { 047 } 048 049 protected KeyValueMessage(String key, byte[] value, boolean poisonPill, boolean forceBatch) { 050 this.key = Objects.requireNonNull(key); 051 this.value = value; 052 this.poisonPill = poisonPill; 053 this.forceBatch = forceBatch; 054 } 055 056 static public KeyValueMessage of(String key, byte[] value) { 057 return new KeyValueMessage(key, value, false, false); 058 } 059 060 static public KeyValueMessage of(String key) { 061 return new KeyValueMessage(key, null, false, false); 062 } 063 064 /** 065 * A message that force the batch. 066 */ 067 static public KeyValueMessage ofForceBatch(String key, byte[] value) { 068 return new KeyValueMessage(key, value, false, true); 069 } 070 071 static public KeyValueMessage ofForceBatch(String key) { 072 return new KeyValueMessage(key, null, false, true); 073 } 074 075 public String key() { 076 return key; 077 } 078 079 public byte[] value() { 080 return value; 081 } 082 083 @Override 084 public String getId() { 085 return key; 086 } 087 088 @Override 089 public boolean poisonPill() { 090 return poisonPill; 091 } 092 093 @Override 094 public boolean forceBatch() { 095 return forceBatch; 096 } 097 098 @Override 099 public void writeExternal(ObjectOutput out) throws IOException { 100 out.writeObject(key); 101 out.writeBoolean(poisonPill); 102 out.writeBoolean(forceBatch); 103 if (value == null || value.length == 0) { 104 out.writeInt(0); 105 } else { 106 out.writeInt(value.length); 107 out.write(value); 108 } 109 } 110 111 @Override 112 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 113 this.key = (String) in.readObject(); 114 this.poisonPill = in.readBoolean(); 115 this.forceBatch = in.readBoolean(); 116 int dataLength = in.readInt(); 117 if (dataLength == 0) { 118 this.value = null; 119 } else { 120 this.value = new byte[dataLength]; 121 // not using in.readFully because it is not impl by Chronicle WireObjectInput 122 int pos = 0; 123 while (pos < dataLength) { 124 pos += in.read(this.value, pos, dataLength - pos); 125 } 126 } 127 } 128 129 @Override 130 public boolean equals(Object o) { 131 if (this == o) 132 return true; 133 if (o == null || getClass() != o.getClass()) 134 return false; 135 136 KeyValueMessage keyValueMessage = (KeyValueMessage) o; 137 138 return poisonPill == keyValueMessage.poisonPill && forceBatch == keyValueMessage.forceBatch 139 && (key != null ? key.equals(keyValueMessage.key) : keyValueMessage.key == null); 140 141 } 142 143 @Override 144 public int hashCode() { 145 int result = key.hashCode(); 146 result = 31 * result + Arrays.hashCode(value); 147 result = 31 * result + (poisonPill ? 1 : 0); 148 result = 31 * result + (forceBatch ? 1 : 0); 149 return result; 150 } 151 152 @Override 153 public String toString() { 154 return String.format("KeyValueMessage(\"%s\", len:%d%s%s)", key, (value != null) ? value.length : 0, 155 poisonPill ? ", poison" : "", forceBatch ? ", batch" : ""); 156 } 157}