001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * pierre 018 */ 019package org.nuxeo.runtime.avro; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.lang.reflect.Constructor; 024import java.net.URL; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Map; 028 029import org.apache.avro.Schema; 030import org.nuxeo.runtime.RuntimeServiceException; 031import org.nuxeo.runtime.model.ComponentContext; 032import org.nuxeo.runtime.model.ComponentInstance; 033import org.nuxeo.runtime.model.DefaultComponent; 034import org.nuxeo.runtime.model.SimpleContributionRegistry; 035 036/** 037 * Avro component. 038 * 039 * @since 10.2 040 */ 041public class AvroComponent extends DefaultComponent { 042 043 public static final int APPLICATION_START_ORDER = -600; 044 045 protected static class AvroMapperDescriptorRegistry extends SimpleContributionRegistry<AvroMapperDescriptor> { 046 @Override 047 public String getContributionId(AvroMapperDescriptor contrib) { 048 return contrib.type; 049 } 050 051 public Collection<AvroMapperDescriptor> getDescriptors() { 052 return currentContribs.values(); 053 } 054 } 055 056 protected static class AvroReplacementDescriptorRegistry 057 extends SimpleContributionRegistry<AvroReplacementDescriptor> { 058 @Override 059 public String getContributionId(AvroReplacementDescriptor contrib) { 060 return contrib.forbidden; 061 } 062 063 public Collection<AvroReplacementDescriptor> getDescriptors() { 064 return currentContribs.values(); 065 } 066 } 067 068 protected static class AvroSchemaDescriptorRegistry extends SimpleContributionRegistry<AvroSchemaDescriptor> { 069 @Override 070 public String getContributionId(AvroSchemaDescriptor contrib) { 071 return contrib.name; 072 } 073 074 public Collection<AvroSchemaDescriptor> getDescriptors() { 075 return currentContribs.values(); 076 } 077 } 078 079 protected static class AvroSchemaFactoryDescriptorRegistry 080 extends SimpleContributionRegistry<AvroSchemaFactoryDescriptor> { 081 @Override 082 public String getContributionId(AvroSchemaFactoryDescriptor contrib) { 083 return contrib.type; 084 } 085 086 public Collection<AvroSchemaFactoryDescriptor> getDescriptors() { 087 return currentContribs.values(); 088 } 089 } 090 091 public static final String SCHEMA_XP = "schema"; 092 093 public static final String MAPPER_XP = "mapper"; 094 095 public static final String FACTORY_XP = "factory"; 096 097 public static final String REPLACEMENT_XP = "replacement"; 098 099 protected final AvroMapperDescriptorRegistry avroMapperDescriptors = new AvroMapperDescriptorRegistry(); 100 101 protected final AvroSchemaDescriptorRegistry schemaDescriptors = new AvroSchemaDescriptorRegistry(); 102 103 protected final AvroSchemaFactoryDescriptorRegistry avroSchemaFactoryDescriptors = new AvroSchemaFactoryDescriptorRegistry(); 104 105 protected final AvroReplacementDescriptorRegistry replacementDescriptors = new AvroReplacementDescriptorRegistry(); 106 107 protected AvroService avroService; 108 109 @Override 110 @SuppressWarnings("unchecked") 111 public <T> T getAdapter(Class<T> adapter) { 112 if (adapter.isAssignableFrom(avroService.getClass())) { 113 return (T) avroService; 114 } 115 return null; 116 } 117 118 @Override 119 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 120 switch (extensionPoint) { 121 case SCHEMA_XP: 122 schemaDescriptors.addContribution((AvroSchemaDescriptor) contribution); 123 break; 124 case MAPPER_XP: 125 avroMapperDescriptors.addContribution((AvroMapperDescriptor) contribution); 126 break; 127 case FACTORY_XP: 128 avroSchemaFactoryDescriptors.addContribution((AvroSchemaFactoryDescriptor) contribution); 129 break; 130 case REPLACEMENT_XP: 131 replacementDescriptors.addContribution((AvroReplacementDescriptor) contribution); 132 break; 133 default: 134 throw new RuntimeServiceException("Unknown extension point: " + extensionPoint); 135 } 136 } 137 138 @Override 139 public int getApplicationStartedOrder() { 140 return APPLICATION_START_ORDER; 141 } 142 143 @Override 144 @SuppressWarnings("unchecked") 145 public void start(ComponentContext context) { 146 // schema factories can be give to the constructor since they don't need a service instance 147 Collection<AvroSchemaFactoryDescriptor> factoryDescriptors = avroSchemaFactoryDescriptors.getDescriptors(); 148 Map<Class<?>, Class<AvroSchemaFactory<?>>> factories = new HashMap<>(factoryDescriptors.size()); 149 for (AvroSchemaFactoryDescriptor descriptor : factoryDescriptors) { 150 try { 151 Class<Object> type = (Class<Object>) Class.forName(descriptor.type); 152 factories.put(type, (Class<AvroSchemaFactory<?>>) Class.forName(descriptor.clazz)); 153 } catch (ReflectiveOperationException e) { 154 throw new RuntimeServiceException(e); 155 } 156 } 157 // as well as replacements 158 AvroServiceImpl impl = new AvroServiceImpl(replacementDescriptors.getDescriptors(), factories); 159 // mappers are instantiated with an instance of the service 160 Collection<AvroMapperDescriptor> mapperDescriptors = avroMapperDescriptors.getDescriptors(); 161 Map<Class<?>, AvroMapper<?, ?>> mappers = new HashMap<>(mapperDescriptors.size()); 162 for (AvroMapperDescriptor descriptor : mapperDescriptors) { 163 try { 164 Class<Object> type = (Class<Object>) Class.forName(descriptor.type); 165 Class<AvroMapper<?, ?>> clazz = (Class<AvroMapper<?, ?>>) Class.forName(descriptor.clazz); 166 Constructor<AvroMapper<?, ?>> constructor = clazz.getConstructor(AvroService.class); 167 mappers.put(type, constructor.newInstance(impl)); 168 } catch (ReflectiveOperationException e) { 169 throw new RuntimeServiceException(e); 170 } 171 } 172 // and are added to the service implementation 173 impl.setMappers(mappers); 174 // schemas are registered through the SchemaService interface 175 for (AvroSchemaDescriptor descriptor : schemaDescriptors.getDescriptors()) { 176 URL url = context.getRuntimeContext().getResource(descriptor.file); 177 try (InputStream stream = url == null ? null : url.openStream()) { 178 if (stream == null) { 179 throw new RuntimeServiceException("Could not load stream for file " + descriptor.file); 180 } 181 impl.addSchema(new Schema.Parser().parse(stream)); 182 } catch (IOException e) { 183 throw new RuntimeServiceException(e); 184 } 185 } 186 avroService = impl; 187 } 188 189 @Override 190 public void stop(ComponentContext context) { 191 avroService = null; 192 } 193 194 @Override 195 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 196 switch (extensionPoint) { 197 case SCHEMA_XP: 198 schemaDescriptors.removeContribution((AvroSchemaDescriptor) contribution); 199 break; 200 case MAPPER_XP: 201 avroMapperDescriptors.removeContribution((AvroMapperDescriptor) contribution); 202 break; 203 case FACTORY_XP: 204 avroSchemaFactoryDescriptors.removeContribution((AvroSchemaFactoryDescriptor) contribution); 205 break; 206 case REPLACEMENT_XP: 207 replacementDescriptors.removeContribution((AvroReplacementDescriptor) contribution); 208 break; 209 default: 210 throw new RuntimeServiceException("Unknown extension point: " + extensionPoint); 211 } 212 } 213 214}