001/*
002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     Florent Guillaume
018 */
019package org.nuxeo.ecm.core.pubsub;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.UnsupportedEncodingException;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.nuxeo.runtime.api.Framework;
030
031/**
032 * Encapsulates invalidations management through the {@link PubSubService}.
033 * <p>
034 * All nodes that use the same topic will share the same invalidations.
035 * <p>
036 * The discriminator is used to distinguish nodes between one another, and to avoid that a node receives the
037 * invalidations it send itself.
038 *
039 * @since 9.1
040 */
041public abstract class AbstractPubSubInvalidator<T extends SerializableInvalidations> {
042
043    private static final Log log = LogFactory.getLog(AbstractPubSubInvalidator.class);
044
045    private static final String UTF_8 = "UTF-8";
046
047    protected String topic;
048
049    protected byte[] discriminatorBytes;
050
051    protected volatile T bufferedInvalidations;
052
053    /** Constructs new empty invalidations, of type {@link T}. */
054    public abstract T newInvalidations();
055
056    /** Deserializes an {@link InputStream} into invalidations, or {@code null}. */
057    public abstract T deserialize(InputStream in) throws IOException;
058
059    /**
060     * Initializes the invalidator.
061     *
062     * @param topic the topic
063     * @param discriminator the discriminator
064     */
065    public void initialize(String topic, String discriminator) {
066        this.topic = topic;
067        try {
068            discriminatorBytes = discriminator.getBytes(UTF_8);
069        } catch (UnsupportedEncodingException e) {
070            throw new IllegalArgumentException(e);
071        }
072        for (byte b : discriminatorBytes) {
073            if (b == DISCRIMINATOR_SEP) {
074                throw new IllegalArgumentException("Invalid discriminator, must not contains separator '"
075                        + (char) DISCRIMINATOR_SEP + "': " + discriminator);
076            }
077        }
078        bufferedInvalidations = newInvalidations();
079        PubSubService pubSubService = Framework.getService(PubSubService.class);
080        pubSubService.registerSubscriber(topic, this::subscriber);
081    }
082
083    /**
084     * Closes this invalidator and releases resources.
085     */
086    public void close() {
087        PubSubService pubSubService = Framework.getService(PubSubService.class);
088        pubSubService.unregisterSubscriber(topic, this::subscriber);
089        // not null to avoid crashing subscriber thread still in flight
090        bufferedInvalidations = newInvalidations();
091    }
092
093    protected static final byte DISCRIMINATOR_SEP = ':';
094
095    /**
096     * Sends invalidations to other nodes.
097     */
098    public void sendInvalidations(T invalidations) {
099        if (log.isTraceEnabled()) {
100            log.trace("Sending invalidations: " + invalidations);
101        }
102        ByteArrayOutputStream baout = new ByteArrayOutputStream();
103        try {
104            baout.write(discriminatorBytes);
105        } catch (IOException e) {
106            // cannot happen, ByteArrayOutputStream.write doesn't throw
107            return;
108        }
109        baout.write(DISCRIMINATOR_SEP);
110        try {
111            invalidations.serialize(baout);
112        } catch (IOException e) {
113            log.error("Failed to serialize invalidations", e);
114            // don't crash for this
115            return;
116        }
117        byte[] message = baout.toByteArray();
118        PubSubService pubSubService = Framework.getService(PubSubService.class);
119        pubSubService.publish(topic, message);
120    }
121
122    /**
123     * PubSubService subscriber, called from a separate thread.
124     */
125    protected void subscriber(String topic, byte[] message) {
126        int start = scanDiscriminator(message);
127        if (start == -1) {
128            // same discriminator or invalid message
129            return;
130        }
131        InputStream bain = new ByteArrayInputStream(message, start, message.length - start);
132        T invalidations;
133        try {
134            invalidations = deserialize(bain);
135        } catch (IOException e) {
136            log.error("Failed to deserialize invalidations", e);
137            // don't crash for this
138            return;
139        }
140        if (invalidations == null || invalidations.isEmpty()) {
141            return;
142        }
143        if (log.isTraceEnabled()) {
144            log.trace("Receiving invalidations: " + invalidations);
145        }
146        synchronized (this) {
147            bufferedInvalidations.add(invalidations);
148        }
149    }
150
151    /**
152     * Scans for the discriminator and returns the payload start offset.
153     *
154     * @return payload start offset, or -1 if the discriminator is local or if the message is invalid
155     */
156    protected int scanDiscriminator(byte[] message) {
157        if (message == null) {
158            return -1;
159        }
160        int start = -1;
161        boolean differ = false;
162        for (int i = 0; i < message.length; i++) {
163            byte b = message[i];
164            if (b == DISCRIMINATOR_SEP) {
165                differ = differ || discriminatorBytes.length > i;
166                start = i + 1;
167                break;
168            }
169            if (!differ) {
170                if (i == discriminatorBytes.length) {
171                    // discriminator is a prefix of the received one
172                    differ = true;
173                } else if (b != discriminatorBytes[i]) {
174                    // difference
175                    differ = true;
176                }
177            }
178        }
179        if (!differ) {
180            // same discriminator
181            return -1;
182        }
183        return start; // may be -1 if separator was never found (invalid message)
184    }
185
186    /**
187     * Receives invalidations from other nodes.
188     */
189    public T receiveInvalidations() {
190        T newInvalidations = newInvalidations();
191        T invalidations;
192        synchronized (this) {
193            invalidations = bufferedInvalidations;
194            bufferedInvalidations = newInvalidations;
195        }
196        if (log.isTraceEnabled()) {
197            log.trace("Received invalidations: " + invalidations);
198        }
199        return invalidations;
200    }
201
202}