001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.runtime.avro; 020 021import java.lang.reflect.Constructor; 022import java.nio.file.Paths; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.ListIterator; 029import java.util.Map; 030import java.util.Map.Entry; 031 032import org.apache.avro.Schema; 033import org.apache.avro.generic.GenericRecord; 034import org.nuxeo.common.Environment; 035import org.nuxeo.lib.stream.codec.AvroSchemaStore; 036import org.nuxeo.lib.stream.codec.FileAvroSchemaStore; 037import org.nuxeo.runtime.RuntimeServiceException; 038import org.nuxeo.runtime.api.Framework; 039 040/** 041 * @since 10.2 042 */ 043public class AvroServiceImpl implements AvroService { 044 045 private static final AvroMapper<Object, Object> NULL = new AvroMapper<Object, Object>(null) { 046 047 @Override 048 public Object fromAvro(Schema schema, Object input) { 049 return null; 050 } 051 052 @Override 053 public GenericRecord toAvro(Schema schema, Object input) { 054 return null; 055 } 056 057 }; 058 059 protected final Map<Class<?>, Class<AvroSchemaFactory<?>>> factories; 060 061 protected final List<AvroReplacementDescriptor> replacements; 062 063 protected final AvroSchemaStore schemaStore; 064 065 protected Map<Class<?>, AvroMapper<?, ?>> mappers; 066 067 public AvroServiceImpl(Collection<AvroReplacementDescriptor> replacements, 068 Map<Class<?>, Class<AvroSchemaFactory<?>>> factories) { 069 this.replacements = new ArrayList<>(replacements); 070 this.factories = new HashMap<>(factories); 071 String dataDir = Framework.getProperty(Environment.NUXEO_DATA_DIR); 072 if (dataDir != null) { 073 this.schemaStore = new FileAvroSchemaStore(Paths.get(dataDir, "avro")); 074 } else { 075 this.schemaStore = new FileAvroSchemaStore( 076 Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "avro")); 077 } 078 Collections.sort(this.replacements); 079 // assert at creation that factories are valid 080 createContext(); 081 } 082 083 @Override 084 public AvroSchemaStore getSchemaStore() { 085 return schemaStore; 086 } 087 088 @Override 089 public <D> Schema createSchema(D input) { 090 return createContext().createSchema(input); 091 } 092 093 @Override 094 public String decodeName(String input) { 095 String output = input; 096 ListIterator<AvroReplacementDescriptor> it = replacements.listIterator(replacements.size()); 097 while (it.hasPrevious()) { 098 AvroReplacementDescriptor descriptor = it.previous(); 099 output = output.replaceAll(descriptor.replacement, descriptor.forbidden); 100 } 101 return output; 102 } 103 104 @Override 105 public String encodeName(String input) { 106 String output = input; 107 for (AvroReplacementDescriptor descriptor : replacements) { 108 output = output.replaceAll(descriptor.forbidden, descriptor.replacement); 109 } 110 return output; 111 } 112 113 @Override 114 @SuppressWarnings("unchecked") 115 public <D, M> D fromAvro(Schema schema, Class<D> clazz, M input) { 116 return (D) getMapper(clazz).fromAvro(schema, input); 117 } 118 119 public void setMappers(Map<Class<?>, AvroMapper<?, ?>> mappers) { 120 this.mappers = new HashMap<>(mappers); 121 } 122 123 @Override 124 @SuppressWarnings("unchecked") 125 public <D, M> M toAvro(Schema schema, D input) { 126 return (M) getMapper((Class<D>) input.getClass()).toAvro(schema, input); 127 } 128 129 protected AvroSchemaFactoryContext createContext() { 130 AvroSchemaFactoryContext context = new AvroSchemaFactoryContext(this); 131 for (Entry<Class<?>, Class<AvroSchemaFactory<?>>> entry : factories.entrySet()) { 132 try { 133 Class<AvroSchemaFactory<?>> clazz = entry.getValue(); 134 Constructor<AvroSchemaFactory<?>> constructor = clazz.getConstructor(AvroSchemaFactoryContext.class); 135 AvroSchemaFactory<?> factory = constructor.newInstance(context); 136 context.register(entry.getKey(), factory); 137 } catch (ReflectiveOperationException e) { 138 throw new RuntimeServiceException(e); 139 } 140 } 141 return context; 142 } 143 144 @SuppressWarnings("unchecked") 145 protected <D, M> AvroMapper<D, M> getMapper(Class<D> clazz) { 146 AvroMapper<?, ?> factory = mappers.get(clazz); 147 if (factory != null) { 148 return (AvroMapper<D, M>) factory; 149 } 150 for (Class<?> intrface : clazz.getInterfaces()) { 151 factory = mappers.get(intrface); 152 if (factory != null) { 153 return (AvroMapper<D, M>) factory; 154 } 155 } 156 for (Entry<Class<?>, AvroMapper<?, ?>> entry : mappers.entrySet()) { 157 if (entry.getKey().isAssignableFrom(clazz)) { 158 return (AvroMapper<D, M>) entry.getValue(); 159 } 160 } 161 return (AvroMapper<D, M>) NULL; 162 } 163 164}