001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.runtime.pubsub;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.nuxeo.runtime.api.Framework;
029
030/**
031 * Encapsulates message sending and receiving through the {@link PubSubService}.
032 * <p>
033 * All nodes that use the same topic will receive the same messages. The discriminator is used to distinguish nodes
034 * between one another, and to avoid that a node receives the messages it send itself.
035 * <p>
036 * An actual implementation must implement the method {@link #deserialize} (usually by delegating to a static method in
037 * the {@link T} message class), and the {@link #receivedMessage} callback.
038 * <p>
039 * The public API is {@link #sendMessage}, and the {@link #receivedMessage} callback.
040 *
041 * @since 9.3
042 */
043public abstract class AbstractPubSubBroker<T extends SerializableMessage> {
044
045    private static final Log log = LogFactory.getLog(AbstractPubSubBroker.class);
046
047    private static final String UTF_8 = "UTF-8";
048
049    protected String topic;
050
051    protected byte[] discriminatorBytes;
052
053    /** Deserializes an {@link InputStream} into a message, or {@code null}. */
054    public abstract T deserialize(InputStream in) throws IOException;
055
056    /**
057     * Initializes the broker.
058     *
059     * @param topic the topic
060     * @param discriminator the discriminator
061     */
062    public void initialize(String topic, String discriminator) {
063        this.topic = topic;
064        try {
065            discriminatorBytes = discriminator.getBytes(UTF_8);
066        } catch (IOException e) { // cannot happen
067            throw new IllegalArgumentException(e);
068        }
069        for (byte b : discriminatorBytes) {
070            if (b == DISCRIMINATOR_SEP) {
071                throw new IllegalArgumentException("Invalid discriminator, must not contains separator '"
072                        + (char) DISCRIMINATOR_SEP + "': " + discriminator);
073            }
074        }
075        PubSubService pubSubService = Framework.getService(PubSubService.class);
076        pubSubService.registerSubscriber(topic, this::subscriber);
077    }
078
079    /**
080     * Closes this broker and releases resources.
081     */
082    public void close() {
083        PubSubService pubSubService = Framework.getService(PubSubService.class);
084        pubSubService.unregisterSubscriber(topic, this::subscriber);
085    }
086
087    protected static final byte DISCRIMINATOR_SEP = ':';
088
089    /**
090     * Sends a message to other nodes.
091     */
092    public void sendMessage(T message) {
093        if (log.isTraceEnabled()) {
094            log.trace("Sending message: " + message);
095        }
096        ByteArrayOutputStream baout = new ByteArrayOutputStream();
097        try {
098            baout.write(discriminatorBytes);
099        } catch (IOException e) {
100            // cannot happen, ByteArrayOutputStream.write doesn't throw
101            return;
102        }
103        baout.write(DISCRIMINATOR_SEP);
104        try {
105            message.serialize(baout);
106        } catch (IOException e) {
107            log.error("Failed to serialize message", e);
108            // don't crash for this
109            return;
110        }
111        byte[] bytes = baout.toByteArray();
112        PubSubService pubSubService = Framework.getService(PubSubService.class);
113        pubSubService.publish(topic, bytes);
114    }
115
116    /**
117     * PubSubService subscriber, called from a separate thread.
118     */
119    protected void subscriber(String topic, byte[] bytes) {
120        int start = scanDiscriminator(bytes);
121        if (start == -1) {
122            // same discriminator or invalid message
123            return;
124        }
125        InputStream bain = new ByteArrayInputStream(bytes, start, bytes.length - start);
126        T message;
127        try {
128            message = deserialize(bain);
129        } catch (IOException e) {
130            log.error("Failed to deserialize message", e);
131            // don't crash for this
132            return;
133        }
134        if (message == null) {
135            return;
136        }
137        if (log.isTraceEnabled()) {
138            log.trace("Received message: " + message);
139        }
140        receivedMessage(message);
141    }
142
143    /**
144     * Callback implementing the delivery of a message from another node.
145     *
146     * @param message the received message
147     */
148    public abstract void receivedMessage(T message);
149
150    /**
151     * Scans for the discriminator and returns the payload start offset.
152     *
153     * @return payload start offset, or -1 if the discriminator is local or if the message is invalid
154     */
155    protected int scanDiscriminator(byte[] message) {
156        if (message == null) {
157            return -1;
158        }
159        int start = -1;
160        boolean differ = false;
161        for (int i = 0; i < message.length; i++) {
162            byte b = message[i];
163            if (b == DISCRIMINATOR_SEP) {
164                differ = differ || discriminatorBytes.length > i;
165                start = i + 1;
166                break;
167            }
168            if (!differ) {
169                if (i == discriminatorBytes.length) {
170                    // discriminator is a prefix of the received one
171                    differ = true;
172                } else if (b != discriminatorBytes[i]) {
173                    // difference
174                    differ = true;
175                }
176            }
177        }
178        if (!differ) {
179            // same discriminator
180            return -1;
181        }
182        return start; // may be -1 if separator was never found (invalid message)
183    }
184
185}