001/* 002 * (C) Copyright 2017 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Florent Guillaume 018 */ 019package org.nuxeo.ecm.core.pubsub; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.UnsupportedEncodingException; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.nuxeo.runtime.api.Framework; 030 031/** 032 * Encapsulates invalidations management through the {@link PubSubService}. 033 * <p> 034 * All nodes that use the same topic will share the same invalidations. 035 * <p> 036 * The discriminator is used to distinguish nodes between one another, and to avoid that a node receives the 037 * invalidations it send itself. 038 * 039 * @since 9.1 040 */ 041public abstract class AbstractPubSubInvalidator<T extends SerializableInvalidations> { 042 043 private static final Log log = LogFactory.getLog(AbstractPubSubInvalidator.class); 044 045 private static final String UTF_8 = "UTF-8"; 046 047 protected String topic; 048 049 protected byte[] discriminatorBytes; 050 051 protected volatile T bufferedInvalidations; 052 053 /** Constructs new empty invalidations, of type {@link T}. */ 054 public abstract T newInvalidations(); 055 056 /** Deserializes an {@link InputStream} into invalidations, or {@code null}. */ 057 public abstract T deserialize(InputStream in) throws IOException; 058 059 /** 060 * Initializes the invalidator. 061 * 062 * @param topic the topic 063 * @param discriminator the discriminator 064 */ 065 public void initialize(String topic, String discriminator) { 066 this.topic = topic; 067 try { 068 discriminatorBytes = discriminator.getBytes(UTF_8); 069 } catch (UnsupportedEncodingException e) { 070 throw new IllegalArgumentException(e); 071 } 072 for (byte b : discriminatorBytes) { 073 if (b == DISCRIMINATOR_SEP) { 074 throw new IllegalArgumentException("Invalid discriminator, must not contains separator '" 075 + (char) DISCRIMINATOR_SEP + "': " + discriminator); 076 } 077 } 078 bufferedInvalidations = newInvalidations(); 079 PubSubService pubSubService = Framework.getService(PubSubService.class); 080 pubSubService.registerSubscriber(topic, this::subscriber); 081 } 082 083 /** 084 * Closes this invalidator and releases resources. 085 */ 086 public void close() { 087 PubSubService pubSubService = Framework.getService(PubSubService.class); 088 pubSubService.unregisterSubscriber(topic, this::subscriber); 089 // not null to avoid crashing subscriber thread still in flight 090 bufferedInvalidations = newInvalidations(); 091 } 092 093 protected static final byte DISCRIMINATOR_SEP = ':'; 094 095 /** 096 * Sends invalidations to other nodes. 097 */ 098 public void sendInvalidations(T invalidations) { 099 if (log.isTraceEnabled()) { 100 log.trace("Sending invalidations: " + invalidations); 101 } 102 ByteArrayOutputStream baout = new ByteArrayOutputStream(); 103 try { 104 baout.write(discriminatorBytes); 105 } catch (IOException e) { 106 // cannot happen, ByteArrayOutputStream.write doesn't throw 107 return; 108 } 109 baout.write(DISCRIMINATOR_SEP); 110 try { 111 invalidations.serialize(baout); 112 } catch (IOException e) { 113 log.error("Failed to serialize invalidations", e); 114 // don't crash for this 115 return; 116 } 117 byte[] message = baout.toByteArray(); 118 PubSubService pubSubService = Framework.getService(PubSubService.class); 119 pubSubService.publish(topic, message); 120 } 121 122 /** 123 * PubSubService subscriber, called from a separate thread. 124 */ 125 protected void subscriber(String topic, byte[] message) { 126 int start = scanDiscriminator(message); 127 if (start == -1) { 128 // same discriminator or invalid message 129 return; 130 } 131 InputStream bain = new ByteArrayInputStream(message, start, message.length - start); 132 T invalidations; 133 try { 134 invalidations = deserialize(bain); 135 } catch (IOException e) { 136 log.error("Failed to deserialize invalidations", e); 137 // don't crash for this 138 return; 139 } 140 if (invalidations == null || invalidations.isEmpty()) { 141 return; 142 } 143 if (log.isTraceEnabled()) { 144 log.trace("Receiving invalidations: " + invalidations); 145 } 146 synchronized (this) { 147 bufferedInvalidations.add(invalidations); 148 } 149 } 150 151 /** 152 * Scans for the discriminator and returns the payload start offset. 153 * 154 * @return payload start offset, or -1 if the discriminator is local or if the message is invalid 155 */ 156 protected int scanDiscriminator(byte[] message) { 157 if (message == null) { 158 return -1; 159 } 160 int start = -1; 161 boolean differ = false; 162 for (int i = 0; i < message.length; i++) { 163 byte b = message[i]; 164 if (b == DISCRIMINATOR_SEP) { 165 differ = differ || discriminatorBytes.length > i; 166 start = i + 1; 167 break; 168 } 169 if (!differ) { 170 if (i == discriminatorBytes.length) { 171 // discriminator is a prefix of the received one 172 differ = true; 173 } else if (b != discriminatorBytes[i]) { 174 // difference 175 differ = true; 176 } 177 } 178 } 179 if (!differ) { 180 // same discriminator 181 return -1; 182 } 183 return start; // may be -1 if separator was never found (invalid message) 184 } 185 186 /** 187 * Receives invalidations from other nodes. 188 */ 189 public T receiveInvalidations() { 190 T newInvalidations = newInvalidations(); 191 T invalidations; 192 synchronized (this) { 193 invalidations = bufferedInvalidations; 194 bufferedInvalidations = newInvalidations; 195 } 196 if (log.isTraceEnabled()) { 197 log.trace("Received invalidations: " + invalidations); 198 } 199 return invalidations; 200 } 201 202}