001/* 002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.codec; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024 025import org.apache.avro.Schema; 026import org.apache.avro.io.DecoderFactory; 027import org.apache.avro.io.EncoderFactory; 028import org.apache.avro.io.JsonDecoder; 029import org.apache.avro.io.JsonEncoder; 030import org.apache.avro.reflect.ReflectData; 031import org.apache.avro.reflect.ReflectDatumReader; 032import org.apache.avro.reflect.ReflectDatumWriter; 033 034/** 035 * JSON Avro format for debugging purpose. 036 * 037 * @since 10.2 038 */ 039public class AvroJsonCodec<T> implements Codec<T> { 040 public static final String NAME = "avroJson"; 041 042 protected final Class<T> messageClass; 043 044 protected final Schema schema; 045 046 protected final ReflectDatumWriter<T> writer; 047 048 protected final ReflectDatumReader<T> reader; 049 050 public AvroJsonCodec(Class<T> messageClass) { 051 this.messageClass = messageClass; 052 schema = ReflectData.get().getSchema(messageClass); 053 writer = new ReflectDatumWriter<>(schema); 054 reader = new ReflectDatumReader<>(schema); 055 } 056 057 @Override 058 public String getName() { 059 return NAME; 060 } 061 062 @Override 063 public byte[] encode(T object) { 064 try { 065 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 066 JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, baos); 067 writer.write(object, jsonEncoder); 068 jsonEncoder.flush(); 069 return baos.toByteArray(); 070 } catch (IOException e) { 071 throw new IllegalArgumentException(e); 072 } 073 } 074 075 @Override 076 public T decode(byte[] data) { 077 ByteArrayInputStream bais = new ByteArrayInputStream(data); 078 try { 079 JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, bais); 080 return reader.read(null, jsonDecoder); 081 } catch (IOException e) { 082 throw new IllegalArgumentException(e); 083 } 084 } 085}