001/* 002 * (C) Copyright 2017 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * bdelbosc 018 */ 019package org.nuxeo.lib.stream.tools.renderer; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.nio.ByteOrder; 024import java.text.SimpleDateFormat; 025import java.util.Arrays; 026import java.util.Date; 027import java.util.function.Consumer; 028 029import org.apache.avro.Schema; 030import org.apache.avro.generic.GenericData; 031import org.apache.avro.generic.GenericRecord; 032import org.apache.avro.message.BinaryMessageDecoder; 033import org.nuxeo.lib.stream.codec.AvroSchemaStore; 034import org.nuxeo.lib.stream.computation.Record; 035import org.nuxeo.lib.stream.computation.Watermark; 036import org.nuxeo.lib.stream.log.LogRecord; 037 038/** 039 * @since 9.3 040 */ 041public abstract class Renderer implements Consumer<LogRecord<Record>> { 042 043 protected int dataSize = 256; 044 045 public abstract void header(); 046 047 public abstract void footer(); 048 049 public static final byte[] AVRO_MESSAGE_V1_HEADER = new byte[] { (byte) 0xC3, (byte) 0x01 }; 050 051 protected String watermarkString(long watermark) { 052 if (watermark == 0) { 053 return "0"; 054 } 055 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 056 Watermark wm = Watermark.ofValue(watermark); 057 return String.format("%s:%d%s", dateFormat.format(new Date(wm.getTimestamp())), wm.getSequence(), 058 wm.isCompleted() ? " completed" : ""); 059 } 060 061 protected String tryToRenderAvroData(AvroSchemaStore store, Record record) { 062 String errorMessage; 063 try { 064 return renderAvroMessage(store, record); 065 } catch (IllegalArgumentException e) { 066 errorMessage = ""; 067 } catch (IllegalStateException e) { 068 errorMessage = e.getMessage() + " data: "; 069 } 070 return errorMessage + record.dataOverview(dataSize); 071 072 } 073 074 protected String renderAvroMessage(AvroSchemaStore store, Record record) { 075 if (store == null || !isAvroMessage(record.getData())) { 076 throw new IllegalArgumentException("Not avro encoded"); 077 } 078 long fp = getFingerPrint(record.getData()); 079 Schema schema = store.findByFingerprint(fp); 080 if (schema == null) { 081 throw new IllegalStateException(String.format("Not found schema: 0x%08X", fp)); 082 } 083 GenericData genericData = new GenericData(); 084 BinaryMessageDecoder<GenericRecord> decoder = new BinaryMessageDecoder<>(genericData, schema); 085 try { 086 GenericRecord avroRecord = decoder.decode(record.getData(), null); 087 return avroRecord.toString(); 088 } catch (IOException e) { 089 throw new IllegalStateException( 090 String.format("Error: %s decoding with schema: 0x%08X", e.getMessage(), fp)); 091 } 092 } 093 094 protected long getFingerPrint(byte[] data) { 095 byte[] fingerPrintBytes = Arrays.copyOfRange(data, 2, 10); 096 return ByteBuffer.wrap(fingerPrintBytes).order(ByteOrder.LITTLE_ENDIAN).getLong(); 097 } 098 099 protected boolean isAvroMessage(byte[] data) { 100 return data.length >= 10 && data[0] == AVRO_MESSAGE_V1_HEADER[0] && data[1] == AVRO_MESSAGE_V1_HEADER[1]; 101 } 102}