001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Funsho David 018 */ 019 020package org.nuxeo.ecm.core.bulk.io; 021 022import java.io.IOException; 023import java.time.Instant; 024import java.util.Arrays; 025import java.util.List; 026 027import org.apache.avro.Schema; 028import org.apache.avro.io.Decoder; 029import org.apache.avro.io.Encoder; 030import org.apache.avro.reflect.CustomEncoding; 031 032/** 033 * This {@link CustomEncoding} encodes/decodes {@link Instant} to a long (time in milliseconds) before encoding it in 034 * Avro format. 035 * 036 * @since 10.3 037 */ 038public class InstantAsLongEncoding extends CustomEncoding<Instant> { 039 040 protected static final int NULL_SCHEMA_INDEX = 0; 041 042 protected static final int LONG_SCHEMA_INDEX = 1; 043 044 public InstantAsLongEncoding() { 045 List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)); 046 union.get(1).addProp("CustomEncoding", "InstantAsLongEncoding"); 047 schema = Schema.createUnion(union); 048 } 049 050 @Override 051 protected void write(Object o, Encoder encoder) throws IOException { 052 if (o == null) { 053 // encode the position of the data in the union 054 encoder.writeIndex(NULL_SCHEMA_INDEX); 055 encoder.writeNull(); 056 } else { 057 // encode the position of the data in the union 058 encoder.writeIndex(LONG_SCHEMA_INDEX); 059 encoder.writeLong(((Instant) o).toEpochMilli()); 060 } 061 } 062 063 @Override 064 protected Instant read(Object o, Decoder decoder) throws IOException { 065 int index = decoder.readIndex(); 066 if (index == NULL_SCHEMA_INDEX) { 067 decoder.readNull(); 068 return null; 069 } else if (index == LONG_SCHEMA_INDEX) { 070 return Instant.ofEpochMilli(decoder.readLong()); 071 } else { 072 throw new IOException("Unable to read Instant as long, index=" + index + " is unknown"); 073 } 074 } 075}