001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.ecm.core.io.avro; 020 021import java.nio.ByteBuffer; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.Date; 027import java.util.GregorianCalendar; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.stream.Collectors; 032 033import org.apache.avro.Schema; 034import org.apache.avro.Schema.Field; 035import org.apache.avro.generic.GenericData; 036import org.apache.avro.generic.GenericData.Array; 037import org.apache.avro.generic.GenericRecord; 038import org.nuxeo.ecm.core.api.Blobs; 039import org.nuxeo.ecm.core.api.model.Property; 040import org.nuxeo.ecm.core.api.model.impl.ListProperty; 041import org.nuxeo.ecm.core.api.model.impl.primitives.BlobProperty; 042import org.nuxeo.ecm.core.schema.types.ListType; 043import org.nuxeo.runtime.RuntimeServiceException; 044import org.nuxeo.runtime.avro.AvroMapper; 045import org.nuxeo.runtime.avro.AvroService; 046 047/** 048 * @since 10.2 049 */ 050public class PropertyMapper extends AvroMapper<Property, Object> { 051 052 protected static final Map<String, Class<?>> MAPPING = Collections.singletonMap( 053 AvroConstants.CONTENT, BlobProperty.class); 054 055 public PropertyMapper(AvroService service) { 056 super(service); 057 } 058 059 @Override 060 public Object fromAvro(Schema schema, Object input) { 061 switch (schema.getType()) { 062 case NULL: 063 if (input == null) { 064 return null; 065 } 066 throw new NonNullValueException(); 067 case UNION: 068 for (Schema sub : schema.getTypes()) { 069 try { 070 return service.fromAvro(sub, Property.class, input); 071 } catch (NonNullValueException e) { 072 // ignore 073 } 074 } 075 throw new RuntimeServiceException(CANNOT_MAP_FROM + schema.getType()); 076 case RECORD: 077 GenericRecord record = (GenericRecord) input; 078 List<Field> fields = schema.getFields(); 079 Map<String, Object> data = new HashMap<>(fields.size()); 080 for (Field field : fields) { 081 String propertyName = service.decodeName(field.name()); 082 Class<?> clazz = MAPPING.getOrDefault(propertyName, Property.class); 083 Object value = service.fromAvro(field.schema(), clazz, record.get(field.name())); 084 data.put(propertyName, value); 085 } 086 return data; 087 case ARRAY: 088 GenericData.Array<?> array = (Array<?>) input; 089 List<Object> list = new ArrayList<>(array.size()); 090 for (Object element : array) { 091 list.add(service.fromAvro(schema.getElementType(), Property.class, element)); 092 } 093 return list; 094 case LONG: 095 if (AvroConstants.AVRO_LOGICTYPE_TIMESTAMP_MILLIS.equals(getLogicalType(schema))) { 096 return new Date(((Long) input).longValue()); 097 } 098 return input; 099 case INT: 100 case FLOAT: 101 case STRING: 102 case DOUBLE: 103 case BOOLEAN: 104 return input; 105 case BYTES: 106 return Blobs.createBlob(((ByteBuffer) input).array()); 107 default: 108 throw new RuntimeServiceException(CANNOT_MAP_FROM + schema.getType()); 109 } 110 } 111 112 @Override 113 public Object toAvro(Schema schema, Property input) { 114 switch (schema.getType()) { 115 case NULL: 116 if (input.getValue() == null) { 117 return null; 118 } 119 throw new NonNullValueException(); 120 case UNION: 121 for (Schema s : schema.getTypes()) { 122 try { 123 return service.toAvro(s, input); 124 } catch (NonNullValueException e) { 125 // this exception is thrown when a null value is expected and not found 126 // this happens for schema unions [null, schema] 127 } 128 } 129 throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType()); 130 case RECORD: 131 if (input.isComplex()) { 132 GenericRecord record = new GenericData.Record(schema); 133 for (Field f : schema.getFields()) { 134 record.put(f.name(), service.toAvro(f.schema(), input.get(service.decodeName(f.name())))); 135 } 136 return record; 137 } 138 throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType()); 139 case ARRAY: 140 if (input.getType().isListType()) { 141 Collection<Object> objects; 142 if (((ListType) input.getType()).isArray()) { 143 objects = Arrays.asList((Object[]) input.getValue()); 144 } else { 145 ListProperty list = (ListProperty) input; 146 objects = list.stream() 147 .map(p -> service.toAvro(schema.getElementType(), p)) 148 .collect(Collectors.toList()); 149 } 150 return new GenericData.Array<>(schema, objects); 151 } 152 throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType()); 153 case INT: 154 case FLOAT: 155 case STRING: 156 case DOUBLE: 157 case BOOLEAN: 158 if (input.isScalar()) { 159 return input.getValue(); 160 } 161 throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType()); 162 case LONG: 163 if (input.isScalar()) { 164 if (AvroConstants.AVRO_LOGICTYPE_TIMESTAMP_MILLIS.equals(getLogicalType(schema))) { 165 GregorianCalendar cal = (GregorianCalendar) input.getValue(); 166 return cal.toInstant().toEpochMilli(); 167 } 168 return input.getValue(); 169 } 170 throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType()); 171 default: 172 throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType()); 173 } 174 } 175 176}