001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.runtime.pubsub; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.nuxeo.runtime.api.Framework; 029 030/** 031 * Encapsulates message sending and receiving through the {@link PubSubService}. 032 * <p> 033 * All nodes that use the same topic will receive the same messages. The discriminator is used to distinguish nodes 034 * between one another, and to avoid that a node receives the messages it send itself. 035 * <p> 036 * An actual implementation must implement the method {@link #deserialize} (usually by delegating to a static method in 037 * the {@link T} message class), and the {@link #receivedMessage} callback. 038 * <p> 039 * The public API is {@link #sendMessage}, and the {@link #receivedMessage} callback. 040 * 041 * @since 9.3 042 */ 043public abstract class AbstractPubSubBroker<T extends SerializableMessage> { 044 045 private static final Log log = LogFactory.getLog(AbstractPubSubBroker.class); 046 047 private static final String UTF_8 = "UTF-8"; 048 049 protected String topic; 050 051 protected byte[] discriminatorBytes; 052 053 /** Deserializes an {@link InputStream} into a message, or {@code null}. */ 054 public abstract T deserialize(InputStream in) throws IOException; 055 056 /** 057 * Initializes the broker. 058 * 059 * @param topic the topic 060 * @param discriminator the discriminator 061 */ 062 public void initialize(String topic, String discriminator) { 063 this.topic = topic; 064 try { 065 discriminatorBytes = discriminator.getBytes(UTF_8); 066 } catch (IOException e) { // cannot happen 067 throw new IllegalArgumentException(e); 068 } 069 for (byte b : discriminatorBytes) { 070 if (b == DISCRIMINATOR_SEP) { 071 throw new IllegalArgumentException("Invalid discriminator, must not contains separator '" 072 + (char) DISCRIMINATOR_SEP + "': " + discriminator); 073 } 074 } 075 PubSubService pubSubService = Framework.getService(PubSubService.class); 076 pubSubService.registerSubscriber(topic, this::subscriber); 077 } 078 079 /** 080 * Closes this broker and releases resources. 081 */ 082 public void close() { 083 PubSubService pubSubService = Framework.getService(PubSubService.class); 084 pubSubService.unregisterSubscriber(topic, this::subscriber); 085 } 086 087 protected static final byte DISCRIMINATOR_SEP = ':'; 088 089 /** 090 * Sends a message to other nodes. 091 */ 092 public void sendMessage(T message) { 093 if (log.isTraceEnabled()) { 094 log.trace("Sending message: " + message); 095 } 096 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 097 try { 098 baout.write(discriminatorBytes); 099 } catch (IOException e) { 100 // cannot happen, ByteArrayOutputStream.write doesn't throw 101 return; 102 } 103 baout.write(DISCRIMINATOR_SEP); 104 try { 105 message.serialize(baout); 106 } catch (IOException e) { 107 log.error("Failed to serialize message", e); 108 // don't crash for this 109 return; 110 } 111 byte[] bytes = baout.toByteArray(); 112 PubSubService pubSubService = Framework.getService(PubSubService.class); 113 pubSubService.publish(topic, bytes); 114 } 115 116 /** 117 * PubSubService subscriber, called from a separate thread. 118 */ 119 protected void subscriber(String topic, byte[] bytes) { 120 int start = scanDiscriminator(bytes); 121 if (start == -1) { 122 // same discriminator or invalid message 123 return; 124 } 125 InputStream bain = new ByteArrayInputStream(bytes, start, bytes.length - start); 126 T message; 127 try { 128 message = deserialize(bain); 129 } catch (IOException e) { 130 log.error("Failed to deserialize message", e); 131 // don't crash for this 132 return; 133 } 134 if (message == null) { 135 return; 136 } 137 if (log.isTraceEnabled()) { 138 log.trace("Received message: " + message); 139 } 140 receivedMessage(message); 141 } 142 143 /** 144 * Callback implementing the delivery of a message from another node. 145 * 146 * @param message the received message 147 */ 148 public abstract void receivedMessage(T message); 149 150 /** 151 * Scans for the discriminator and returns the payload start offset. 152 * 153 * @return payload start offset, or -1 if the discriminator is local or if the message is invalid 154 */ 155 protected int scanDiscriminator(byte[] message) { 156 if (message == null) { 157 return -1; 158 } 159 int start = -1; 160 boolean differ = false; 161 for (int i = 0; i < message.length; i++) { 162 byte b = message[i]; 163 if (b == DISCRIMINATOR_SEP) { 164 differ = differ || discriminatorBytes.length > i; 165 start = i + 1; 166 break; 167 } 168 if (!differ) { 169 if (i == discriminatorBytes.length) { 170 // discriminator is a prefix of the received one 171 differ = true; 172 } else if (b != discriminatorBytes[i]) { 173 // difference 174 differ = true; 175 } 176 } 177 } 178 if (!differ) { 179 // same discriminator 180 return -1; 181 } 182 return start; // may be -1 if separator was never found (invalid message) 183 } 184 185}