001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.runtime.avro;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029
030import org.apache.avro.Schema;
031
032/**
033 * An AvroSchemaFactoryContext represents a context in which Avro schemas are cached and reused depending on their
034 * qualified name.<br>
035 * <br>
036 * Avro does not permit to declare twice a schema with the same qualified name. Thus a schema has to be fully described
037 * the first time it appears in the object, and then be referred by name.<br>
038 *
039 * @since 10.2
040 */
041public class AvroSchemaFactoryContext {
042
043    protected final Map<Class<?>, AvroSchemaFactory<?>> factories = new HashMap<>();
044
045    protected final Map<String, Schema> createdSchemas = new HashMap<>();
046
047    protected final AvroService service;
048
049    protected AvroSchemaFactoryContext(AvroService service) {
050        super();
051        this.service = service;
052    }
053
054    public <T> Schema createSchema(T input) {
055        String qualifiedName = getFactory(input).getQualifiedName(input);
056        // don't use computeIfAbsent here as getFactory(T).createSchema(T) will call back this method and modify the map
057        // which will throw a ConcurrentModificationException since java 11
058        Schema schema = createdSchemas.get(qualifiedName);
059        if (schema == null) {
060            schema = getFactory(input).createSchema(input);
061            createdSchemas.put(qualifiedName, schema);
062        }
063        return schema;
064    }
065
066    public AvroService getService() {
067        return service;
068    }
069
070    public <U> List<U> sort(Collection<U> children) {
071        if (children == null || children.isEmpty()) {
072            return Collections.emptyList();
073        }
074        AvroSchemaFactory<U> factory = getFactory(children.iterator().next());
075        if (factory == null) {
076            return Collections.emptyList();
077        }
078        List<U> sortedChildren = new ArrayList<>(children);
079        sortedChildren.sort(Comparator.comparing(factory::getQualifiedName));
080        return sortedChildren;
081    }
082
083    @SuppressWarnings("unchecked")
084    protected <T> AvroSchemaFactory<T> getFactory(T input) {
085        AvroSchemaFactory<T> factory = (AvroSchemaFactory<T>) factories.get(input.getClass());
086        if (factory != null) {
087            return factory;
088        }
089        for (Class<?> intrface : input.getClass().getInterfaces()) {
090            factory = (AvroSchemaFactory<T>) factories.get(intrface);
091            if (factory != null) {
092                return factory;
093            }
094        }
095        for (Entry<Class<?>, AvroSchemaFactory<?>> entry : factories.entrySet()) {
096            if (entry.getKey().isAssignableFrom(input.getClass())) {
097                return (AvroSchemaFactory<T>) entry.getValue();
098            }
099        }
100        return null;
101    }
102
103    protected void register(Class<?> type, AvroSchemaFactory<?> factory) {
104        factories.put(type, factory);
105    }
106
107}