001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.ecm.core.io.avro;
020
021import java.nio.ByteBuffer;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Date;
027import java.util.GregorianCalendar;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.stream.Collectors;
032
033import org.apache.avro.Schema;
034import org.apache.avro.Schema.Field;
035import org.apache.avro.generic.GenericData;
036import org.apache.avro.generic.GenericData.Array;
037import org.apache.avro.generic.GenericRecord;
038import org.nuxeo.ecm.core.api.Blobs;
039import org.nuxeo.ecm.core.api.model.Property;
040import org.nuxeo.ecm.core.api.model.impl.ListProperty;
041import org.nuxeo.ecm.core.api.model.impl.primitives.BlobProperty;
042import org.nuxeo.ecm.core.schema.types.ListType;
043import org.nuxeo.runtime.RuntimeServiceException;
044import org.nuxeo.runtime.avro.AvroMapper;
045import org.nuxeo.runtime.avro.AvroService;
046
047/**
048 * @since 10.2
049 */
050public class PropertyMapper extends AvroMapper<Property, Object> {
051
052    protected static final Map<String, Class<?>> MAPPING = Collections.singletonMap(
053            AvroConstants.CONTENT, BlobProperty.class);
054
055    public PropertyMapper(AvroService service) {
056        super(service);
057    }
058
059    @Override
060    public Object fromAvro(Schema schema, Object input) {
061        switch (schema.getType()) {
062        case NULL:
063            if (input == null) {
064                return null;
065            }
066            throw new NonNullValueException();
067        case UNION:
068            for (Schema sub : schema.getTypes()) {
069                try {
070                    return service.fromAvro(sub, Property.class, input);
071                } catch (NonNullValueException e) {
072                    // ignore
073                }
074            }
075            throw new RuntimeServiceException(CANNOT_MAP_FROM + schema.getType());
076        case RECORD:
077            GenericRecord record = (GenericRecord) input;
078            List<Field> fields = schema.getFields();
079            Map<String, Object> data = new HashMap<>(fields.size());
080            for (Field field : fields) {
081                String propertyName = service.decodeName(field.name());
082                Class<?> clazz = MAPPING.getOrDefault(propertyName, Property.class);
083                Object value = service.fromAvro(field.schema(), clazz, record.get(field.name()));
084                data.put(propertyName, value);
085            }
086            return data;
087        case ARRAY:
088            GenericData.Array<?> array = (Array<?>) input;
089            List<Object> list = new ArrayList<>(array.size());
090            for (Object element : array) {
091                list.add(service.fromAvro(schema.getElementType(), Property.class, element));
092            }
093            return list;
094        case LONG:
095            if (AvroConstants.AVRO_LOGICTYPE_TIMESTAMP_MILLIS.equals(getLogicalType(schema))) {
096                return new Date(((Long) input).longValue());
097            }
098            return input;
099        case INT:
100        case FLOAT:
101        case STRING:
102        case DOUBLE:
103        case BOOLEAN:
104            return input;
105        case BYTES:
106            return Blobs.createBlob(((ByteBuffer) input).array());
107        default:
108            throw new RuntimeServiceException(CANNOT_MAP_FROM + schema.getType());
109        }
110    }
111
112    @Override
113    public Object toAvro(Schema schema, Property input) {
114        switch (schema.getType()) {
115        case NULL:
116            if (input.getValue() == null) {
117                return null;
118            }
119            throw new NonNullValueException();
120        case UNION:
121            for (Schema s : schema.getTypes()) {
122                try {
123                    return service.toAvro(s, input);
124                } catch (NonNullValueException e) {
125                    // this exception is thrown when a null value is expected and not found
126                    // this happens for schema unions [null, schema]
127                }
128            }
129            throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType());
130        case RECORD:
131            if (input.isComplex()) {
132                GenericRecord record = new GenericData.Record(schema);
133                for (Field f : schema.getFields()) {
134                    record.put(f.name(), service.toAvro(f.schema(), input.get(service.decodeName(f.name()))));
135                }
136                return record;
137            }
138            throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType());
139        case ARRAY:
140            if (input.getType().isListType()) {
141                Collection<Object> objects;
142                if (((ListType) input.getType()).isArray()) {
143                    objects = Arrays.asList((Object[]) input.getValue());
144                } else {
145                    ListProperty list = (ListProperty) input;
146                    objects = list.stream()
147                                  .map(p -> service.toAvro(schema.getElementType(), p))
148                                  .collect(Collectors.toList());
149                }
150                return new GenericData.Array<>(schema, objects);
151            }
152            throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType());
153        case INT:
154        case FLOAT:
155        case STRING:
156        case DOUBLE:
157        case BOOLEAN:
158            if (input.isScalar()) {
159                return input.getValue();
160            }
161            throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType());
162        case LONG:
163            if (input.isScalar()) {
164                if (AvroConstants.AVRO_LOGICTYPE_TIMESTAMP_MILLIS.equals(getLogicalType(schema))) {
165                    GregorianCalendar cal = (GregorianCalendar) input.getValue();
166                    return cal.toInstant().toEpochMilli();
167                }
168                return input.getValue();
169            }
170            throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType());
171        default:
172            throw new RuntimeServiceException(CANNOT_MAP_TO + schema.getType());
173        }
174    }
175
176}