001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.codec; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import java.util.Arrays; 025 026import org.apache.avro.Schema; 027import org.apache.avro.message.RawMessageDecoder; 028import org.apache.avro.message.RawMessageEncoder; 029import org.apache.avro.reflect.ReflectData; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.lib.stream.StreamRuntimeException; 033 034import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; 035import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; 036import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; 037import io.confluent.kafka.serializers.KafkaAvroSerializer; 038 039/** 040 * Use the Confluent Avro encoding which differs from Avro message, the schema store is a REST Confluent Schema 041 * Registry. 042 * 043 * @since 10.3 044 */ 045public class AvroConfluentCodec<T> implements Codec<T> { 046 private static final Log log = LogFactory.getLog(AvroConfluentCodec.class); 047 048 public static final String NAME = "avroConfluent"; 049 050 public static final byte MAGIC_BYTE = 0x0; 051 052 public static final int ID_SIZE = 4; 053 054 protected static final int DEFAULT_IDENTITY_MAP_CAPACITY = 10; 055 056 protected final Class<T> messageClass; 057 058 protected final Schema schema; 059 060 protected final int schemaId; 061 062 protected final String schemaName; 063 064 protected final KafkaAvroSerializer serializer; 065 066 protected final RawMessageEncoder<T> encoder; 067 068 protected final SchemaRegistryClient client; 069 070 /** 071 * Create an AvroConfluent codec 072 * 073 * @param messageClass the class to encode and decode 074 * @param schemaRegistryUrls a comma separated list of Confluent Schema Registry URL 075 */ 076 public AvroConfluentCodec(Class<T> messageClass, String schemaRegistryUrls) { 077 this.messageClass = messageClass; 078 schema = ReflectData.get().getSchema(messageClass); 079 schemaName = messageClass.getName(); 080 client = getRegistryClient(schemaRegistryUrls); 081 try { 082 this.schemaId = client.register(messageClass.getName(), schema); 083 } catch (RestClientException | IOException e) { 084 throw new StreamRuntimeException(e); 085 } 086 this.serializer = new KafkaAvroSerializer(client); 087 this.encoder = new RawMessageEncoder<>(ReflectData.get(), schema); 088 } 089 090 public static SchemaRegistryClient getRegistryClient(String schemaRegistryUrls) { 091 if (schemaRegistryUrls.contains(",")) { 092 return new CachedSchemaRegistryClient(Arrays.asList(schemaRegistryUrls.split(",")), 093 DEFAULT_IDENTITY_MAP_CAPACITY); 094 } else { 095 return new CachedSchemaRegistryClient(schemaRegistryUrls, DEFAULT_IDENTITY_MAP_CAPACITY); 096 } 097 } 098 099 @Override 100 public String getName() { 101 return NAME; 102 } 103 104 @Override 105 public byte[] encode(T object) { 106 ByteArrayOutputStream out = new ByteArrayOutputStream(); 107 out.write(MAGIC_BYTE); 108 try { 109 out.write(ByteBuffer.allocate(ID_SIZE).putInt(schemaId).array()); 110 out.write(encoder.encode(object).array()); 111 } catch (IOException e) { 112 throw new StreamRuntimeException(e); 113 } 114 return out.toByteArray(); 115 } 116 117 @Override 118 public T decode(byte[] data) { 119 ByteBuffer buffer = ByteBuffer.wrap(data); 120 if (buffer.get() != MAGIC_BYTE) { 121 throw new IllegalArgumentException("Invalid Avro Confluent message, expecting magic byte"); 122 } 123 int id = buffer.getInt(); 124 Schema writeSchema; 125 try { 126 writeSchema = client.getById(id); 127 } catch (IOException e) { 128 throw new StreamRuntimeException("Cannot retrieve write schema id: " + id + " on " + messageClass, e); 129 } catch (RestClientException e) { 130 if (e.getStatus() != 404) { 131 throw new StreamRuntimeException("Cannot retrieve write schema id: " + id + " on " + messageClass, e); 132 } 133 // the write schema is not found, we fallback to read schema 134 // this enable to read message that have the same read schema even if we loose the schema registry 135 if (log.isWarnEnabled()) { 136 log.warn(String.format("Cannot retrieve write schema %d, fallback to read schema: %d for %s", id, 137 schemaId, messageClass)); 138 } 139 writeSchema = schema; 140 } 141 RawMessageDecoder<T> decoder = new RawMessageDecoder<>(ReflectData.get(), writeSchema, schema); 142 try { 143 return decoder.decode(buffer.slice(), null); 144 } catch (IOException | IndexOutOfBoundsException e) { 145 throw new IllegalArgumentException(e); 146 } 147 } 148}