001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.runtime.avro; 020 021import java.lang.reflect.Constructor; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.List; 027import java.util.ListIterator; 028import java.util.Map; 029import java.util.Map.Entry; 030 031import org.apache.avro.Schema; 032import org.apache.avro.generic.GenericRecord; 033import org.apache.avro.message.SchemaStore; 034import org.nuxeo.runtime.RuntimeServiceException; 035 036/** 037 * @since 10.2 038 */ 039public class AvroServiceImpl extends SchemaStore.Cache implements AvroService { 040 041 private static final AvroMapper<Object, Object> NULL = new AvroMapper<Object, Object>(null) { 042 043 @Override 044 public Object fromAvro(Schema schema, Object input) { 045 return null; 046 } 047 048 @Override 049 public GenericRecord toAvro(Schema schema, Object input) { 050 return null; 051 } 052 053 }; 054 055 protected final Map<Class<?>, Class<AvroSchemaFactory<?>>> factories; 056 057 protected final List<AvroReplacementDescriptor> replacements; 058 059 protected Map<Class<?>, AvroMapper<?, ?>> mappers; 060 061 public AvroServiceImpl(Collection<AvroReplacementDescriptor> replacements, 062 Map<Class<?>, Class<AvroSchemaFactory<?>>> factories) { 063 this.replacements = new ArrayList<>(replacements); 064 this.factories = new HashMap<>(factories); 065 this.replacements.sort(Comparator.naturalOrder()); 066 // assert at creation that factories are valid 067 createContext(); 068 } 069 070 @Override 071 public <D> Schema createSchema(D input) { 072 return createContext().createSchema(input); 073 } 074 075 @Override 076 public String decodeName(String input) { 077 String output = input; 078 ListIterator<AvroReplacementDescriptor> it = replacements.listIterator(replacements.size()); 079 while (it.hasPrevious()) { 080 AvroReplacementDescriptor replacement = it.previous(); 081 output = output.replaceAll(replacement.getReplacement(), replacement.getForbidden()); 082 } 083 return output; 084 } 085 086 @Override 087 public String encodeName(String input) { 088 String output = input; 089 for (AvroReplacementDescriptor replacement : replacements) { 090 output = output.replaceAll(replacement.getForbidden(), replacement.getReplacement()); 091 } 092 return output; 093 } 094 095 @Override 096 @SuppressWarnings("unchecked") 097 public <D, M> D fromAvro(Schema schema, Class<D> clazz, M input) { 098 return (D) getMapper(clazz).fromAvro(schema, input); 099 } 100 101 public void setMappers(Map<Class<?>, AvroMapper<?, ?>> mappers) { 102 this.mappers = new HashMap<>(mappers); 103 } 104 105 @Override 106 @SuppressWarnings("unchecked") 107 public <D, M> M toAvro(Schema schema, D input) { 108 return (M) getMapper((Class<D>) input.getClass()).toAvro(schema, input); 109 } 110 111 protected AvroSchemaFactoryContext createContext() { 112 AvroSchemaFactoryContext context = new AvroSchemaFactoryContext(this); 113 for (Entry<Class<?>, Class<AvroSchemaFactory<?>>> entry : factories.entrySet()) { 114 try { 115 Class<AvroSchemaFactory<?>> clazz = entry.getValue(); 116 Constructor<AvroSchemaFactory<?>> constructor = clazz.getConstructor(AvroSchemaFactoryContext.class); 117 AvroSchemaFactory<?> factory = constructor.newInstance(context); 118 context.register(entry.getKey(), factory); 119 } catch (ReflectiveOperationException e) { 120 throw new RuntimeServiceException(e); 121 } 122 } 123 return context; 124 } 125 126 @SuppressWarnings("unchecked") 127 protected <D, M> AvroMapper<D, M> getMapper(Class<D> clazz) { 128 AvroMapper<?, ?> factory = mappers.get(clazz); 129 if (factory != null) { 130 return (AvroMapper<D, M>) factory; 131 } 132 for (Class<?> intrface : clazz.getInterfaces()) { 133 factory = mappers.get(intrface); 134 if (factory != null) { 135 return (AvroMapper<D, M>) factory; 136 } 137 } 138 for (Entry<Class<?>, AvroMapper<?, ?>> entry : mappers.entrySet()) { 139 if (entry.getKey().isAssignableFrom(clazz)) { 140 return (AvroMapper<D, M>) entry.getValue(); 141 } 142 } 143 return (AvroMapper<D, M>) NULL; 144 } 145 146}