001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.runtime.avro; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029 030import org.apache.avro.Schema; 031 032/** 033 * An AvroSchemaFactoryContext represents a context in which Avro schemas are cached and reused depending on their 034 * qualified name.<br> 035 * <br> 036 * Avro does not permit to declare twice a schema with the same qualified name. Thus a schema has to be fully described 037 * the first time it appears in the object, and then be referred by name.<br> 038 * 039 * @since 10.2 040 */ 041public class AvroSchemaFactoryContext { 042 043 protected final Map<Class<?>, AvroSchemaFactory<?>> factories = new HashMap<>(); 044 045 protected final Map<String, Schema> createdSchemas = new HashMap<>(); 046 047 protected final AvroService service; 048 049 protected AvroSchemaFactoryContext(AvroService service) { 050 super(); 051 this.service = service; 052 } 053 054 public <T> Schema createSchema(T input) { 055 String qualifiedName = getFactory(input).getQualifiedName(input); 056 // don't use computeIfAbsent here as getFactory(T).createSchema(T) will call back this method and modify the map 057 // which will throw a ConcurrentModificationException since java 11 058 Schema schema = createdSchemas.get(qualifiedName); 059 if (schema == null) { 060 schema = getFactory(input).createSchema(input); 061 createdSchemas.put(qualifiedName, schema); 062 } 063 return schema; 064 } 065 066 public AvroService getService() { 067 return service; 068 } 069 070 public <U> List<U> sort(Collection<U> children) { 071 if (children == null || children.isEmpty()) { 072 return Collections.emptyList(); 073 } 074 AvroSchemaFactory<U> factory = getFactory(children.iterator().next()); 075 if (factory == null) { 076 return Collections.emptyList(); 077 } 078 List<U> sortedChildren = new ArrayList<>(children); 079 sortedChildren.sort(Comparator.comparing(factory::getQualifiedName)); 080 return sortedChildren; 081 } 082 083 @SuppressWarnings("unchecked") 084 protected <T> AvroSchemaFactory<T> getFactory(T input) { 085 AvroSchemaFactory<T> factory = (AvroSchemaFactory<T>) factories.get(input.getClass()); 086 if (factory != null) { 087 return factory; 088 } 089 for (Class<?> intrface : input.getClass().getInterfaces()) { 090 factory = (AvroSchemaFactory<T>) factories.get(intrface); 091 if (factory != null) { 092 return factory; 093 } 094 } 095 for (Entry<Class<?>, AvroSchemaFactory<?>> entry : factories.entrySet()) { 096 if (entry.getKey().isAssignableFrom(input.getClass())) { 097 return (AvroSchemaFactory<T>) entry.getValue(); 098 } 099 } 100 return null; 101 } 102 103 protected void register(Class<?> type, AvroSchemaFactory<?> factory) { 104 factories.put(type, factory); 105 } 106 107}