001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.codec;
020
021import java.io.IOException;
022
023import org.apache.avro.Schema;
024import org.apache.avro.message.BadHeaderException;
025import org.apache.avro.message.BinaryMessageDecoder;
026import org.apache.avro.message.BinaryMessageEncoder;
027import org.apache.avro.reflect.ReflectData;
028
029/**
030 * Avro Single object encoding: magic 2 bytes + schema fingerprint 8 bytes + avro binary. See
031 * https://avro.apache.org/docs/current/spec.html#single_object_encoding When using a SchemaStore the writer and reader
032 * schemas can evolve.
033 *
034 * @since 10.2
035 */
036public class AvroMessageCodec<T> implements Codec<T> {
037    public static final String NAME = "avro";
038
039    protected final Class<T> messageClass;
040
041    protected final Schema schema;
042
043    protected final BinaryMessageEncoder<T> encoder;
044
045    protected final BinaryMessageDecoder<T> decoder;
046
047    public AvroMessageCodec(Class<T> messageClass, AvroSchemaStore store) {
048        this.messageClass = messageClass;
049        schema = ReflectData.get().getSchema(messageClass);
050        if (store != null) {
051            store.addSchema(schema);
052        }
053        encoder = new BinaryMessageEncoder<>(ReflectData.get(), schema);
054        decoder = new BinaryMessageDecoder<>(ReflectData.get(), schema, store);
055    }
056
057    public AvroMessageCodec(Class<T> messageClass) {
058        this(messageClass, null);
059    }
060
061    @Override
062    public String getName() {
063        return NAME;
064    }
065
066    @Override
067    public byte[] encode(T object) {
068        try {
069            return encoder.encode(object).array();
070        } catch (IOException e) {
071            throw new IllegalArgumentException(e);
072        }
073    }
074
075    @Override
076    public T decode(byte[] data) {
077        try {
078            return decoder.decode(data, null);
079        } catch (IOException | BadHeaderException e) {
080            throw new IllegalArgumentException(e);
081        }
082    }
083}