001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.runtime.avro; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.lang.reflect.Constructor; 024import java.net.URL; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.avro.Schema; 030import org.nuxeo.lib.stream.codec.AvroSchemaStore; 031import org.nuxeo.runtime.RuntimeServiceException; 032import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl; 033import org.nuxeo.runtime.model.ComponentContext; 034import org.nuxeo.runtime.model.DefaultComponent; 035 036/** 037 * Avro component. 038 * 039 * @since 10.2 040 */ 041public class AvroComponent extends DefaultComponent { 042 043 public static final String XP_SCHEMA = "schema"; 044 045 public static final String XP_MAPPER = "mapper"; 046 047 public static final String XP_FACTORY = "factory"; 048 049 public static final String XP_REPLACEMENT = "replacement"; 050 051 protected AvroService avroService; 052 053 @Override 054 @SuppressWarnings("unchecked") 055 public <T> T getAdapter(Class<T> adapter) { 056 if (adapter.isAssignableFrom(avroService.getClass())) { 057 return (T) avroService; 058 } 059 return null; 060 } 061 062 @Override 063 public int getApplicationStartedOrder() { 064 return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER; 065 } 066 067 @Override 068 @SuppressWarnings("unchecked") 069 public void start(ComponentContext context) { 070 super.start(context); 071 // schema factories can be give to the constructor since they don't need a service instance 072 List<AvroSchemaFactoryDescriptor> factoryDescs = getDescriptors(XP_FACTORY); 073 Map<Class<?>, Class<AvroSchemaFactory<?>>> factories = new HashMap<>(factoryDescs.size()); 074 for (AvroSchemaFactoryDescriptor descriptor : factoryDescs) { 075 try { 076 Class<Object> type = (Class<Object>) Class.forName(descriptor.type); 077 factories.put(type, (Class<AvroSchemaFactory<?>>) Class.forName(descriptor.klass)); 078 } catch (ReflectiveOperationException e) { 079 throw new RuntimeServiceException(e); 080 } 081 } 082 // as well as replacements 083 List<AvroReplacementDescriptor> replacementDescs = getDescriptors(XP_REPLACEMENT); 084 AvroServiceImpl impl = new AvroServiceImpl(replacementDescs, factories); 085 // mappers are instantiated with an instance of the service 086 List<AvroMapperDescriptor> mapperDescs = getDescriptors(XP_MAPPER); 087 Map<Class<?>, AvroMapper<?, ?>> mappers = new HashMap<>(mapperDescs.size()); 088 for (AvroMapperDescriptor descriptor : mapperDescs) { 089 try { 090 Class<Object> type = (Class<Object>) Class.forName(descriptor.type); 091 Class<AvroMapper<?, ?>> clazz = (Class<AvroMapper<?, ?>>) Class.forName(descriptor.klass); 092 Constructor<AvroMapper<?, ?>> constructor = clazz.getConstructor(AvroService.class); 093 mappers.put(type, constructor.newInstance(impl)); 094 } catch (ReflectiveOperationException e) { 095 throw new RuntimeServiceException(e); 096 } 097 } 098 // and are added to the service implementation 099 impl.setMappers(mappers); 100 List<AvroSchemaDescriptor> schemaDescs = getDescriptors(XP_SCHEMA); 101 // schemas are registered through the SchemaService interface 102 AvroSchemaStore schemaStore = impl.getSchemaStore(); 103 for (AvroSchemaDescriptor descriptor : schemaDescs) { 104 URL url = context.getRuntimeContext().getResource(descriptor.file); 105 try (InputStream stream = url == null ? null : url.openStream()) { 106 if (stream == null) { 107 throw new RuntimeServiceException("Could not load stream for file " + descriptor.file); 108 } 109 110 schemaStore.addSchema(new Schema.Parser().parse(stream)); 111 } catch (IOException e) { 112 throw new RuntimeServiceException(e); 113 } 114 } 115 avroService = impl; 116 } 117 118 @Override 119 public void stop(ComponentContext context) throws InterruptedException { 120 super.stop(context); 121 avroService = null; 122 } 123 124}