001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.codec; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import java.util.Arrays; 025 026import org.apache.avro.Schema; 027import org.apache.avro.message.RawMessageDecoder; 028import org.apache.avro.message.RawMessageEncoder; 029import org.apache.avro.reflect.ReflectData; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.nuxeo.lib.stream.StreamRuntimeException; 033 034import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; 035import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; 036import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; 037import io.confluent.kafka.serializers.KafkaAvroSerializer; 038 039/** 040 * Use the Confluent Avro encoding which differs from Avro message, the schema store is a REST Confluent Schema 041 * Registry. 042 * 043 * @since 10.3 044 */ 045public class AvroConfluentCodec<T> implements Codec<T> { 046 private static final Log log = LogFactory.getLog(AvroConfluentCodec.class); 047 048 public static final String NAME = "avroConfluent"; 049 050 protected static final byte MAGIC_BYTE = 0x0; 051 052 protected static final int ID_SIZE = 4; 053 054 protected static final int DEFAULT_IDENTITY_MAP_CAPACITY = 10; 055 056 protected final Class<T> messageClass; 057 058 protected final Schema schema; 059 060 protected final int schemaId; 061 062 protected final String schemaName; 063 064 protected final KafkaAvroSerializer serializer; 065 066 protected final RawMessageEncoder<T> encoder; 067 068 protected final SchemaRegistryClient client; 069 070 /** 071 * Create an AvroConfluent codec 072 * 073 * @param messageClass the class to encode and decode 074 * @param schemaRegistryUrls a comma separated list of Confluent Schema Registry URL 075 */ 076 public AvroConfluentCodec(Class<T> messageClass, String schemaRegistryUrls) { 077 this.messageClass = messageClass; 078 schema = ReflectData.get().getSchema(messageClass); 079 schemaName = messageClass.getName(); 080 if (schemaRegistryUrls.contains(",")) { 081 client = new CachedSchemaRegistryClient(Arrays.asList(schemaRegistryUrls.split(",")), 082 DEFAULT_IDENTITY_MAP_CAPACITY); 083 } else { 084 client = new CachedSchemaRegistryClient(schemaRegistryUrls, DEFAULT_IDENTITY_MAP_CAPACITY); 085 } 086 try { 087 this.schemaId = client.register(messageClass.getName(), schema); 088 } catch (RestClientException | IOException e) { 089 throw new StreamRuntimeException(e); 090 } 091 this.serializer = new KafkaAvroSerializer(client); 092 this.encoder = new RawMessageEncoder<>(ReflectData.get(), schema); 093 } 094 095 @Override 096 public String getName() { 097 return NAME; 098 } 099 100 @Override 101 public byte[] encode(T object) { 102 ByteArrayOutputStream out = new ByteArrayOutputStream(); 103 out.write(MAGIC_BYTE); 104 try { 105 out.write(ByteBuffer.allocate(ID_SIZE).putInt(schemaId).array()); 106 out.write(encoder.encode(object).array()); 107 } catch (IOException e) { 108 throw new StreamRuntimeException(e); 109 } 110 return out.toByteArray(); 111 } 112 113 @Override 114 public T decode(byte[] data) { 115 ByteBuffer buffer = ByteBuffer.wrap(data); 116 if (buffer.get() != MAGIC_BYTE) { 117 throw new IllegalArgumentException("Invalid Avro Confluent message, expecting magic byte"); 118 } 119 int id = buffer.getInt(); 120 Schema writeSchema; 121 try { 122 writeSchema = client.getById(id); 123 } catch (IOException e) { 124 throw new StreamRuntimeException("Cannot retrieve write schema id: " + id + " on " + messageClass, e); 125 } catch (RestClientException e) { 126 if (e.getStatus() != 404) { 127 throw new StreamRuntimeException("Cannot retrieve write schema id: " + id + " on " + messageClass, e); 128 } 129 // the write schema is not found, we fallback to read schema 130 // this enable to read message that have the same read schema even if we loose the schema registry 131 if (log.isWarnEnabled()) { 132 log.warn(String.format("Cannot retrieve write schema %d, fallback to read schema: %d for %s", id, 133 schemaId, messageClass)); 134 } 135 writeSchema = schema; 136 } 137 RawMessageDecoder<T> decoder = new RawMessageDecoder<>(ReflectData.get(), writeSchema, schema); 138 try { 139 return decoder.decode(buffer.slice(), null); 140 } catch (IOException | IndexOutOfBoundsException e) { 141 throw new IllegalArgumentException(e); 142 } 143 } 144}