001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.runtime.avro;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.lang.reflect.Constructor;
024import java.net.URL;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Map;
028
029import org.apache.avro.Schema;
030import org.nuxeo.runtime.RuntimeServiceException;
031import org.nuxeo.runtime.model.ComponentContext;
032import org.nuxeo.runtime.model.ComponentInstance;
033import org.nuxeo.runtime.model.DefaultComponent;
034import org.nuxeo.runtime.model.SimpleContributionRegistry;
035
036/**
037 * Avro component.
038 *
039 * @since 10.2
040 */
041public class AvroComponent extends DefaultComponent {
042
043    public static final int APPLICATION_START_ORDER = -600;
044
045    protected static class AvroMapperDescriptorRegistry extends SimpleContributionRegistry<AvroMapperDescriptor> {
046        @Override
047        public String getContributionId(AvroMapperDescriptor contrib) {
048            return contrib.type;
049        }
050
051        public Collection<AvroMapperDescriptor> getDescriptors() {
052            return currentContribs.values();
053        }
054    }
055
056    protected static class AvroReplacementDescriptorRegistry
057            extends SimpleContributionRegistry<AvroReplacementDescriptor> {
058        @Override
059        public String getContributionId(AvroReplacementDescriptor contrib) {
060            return contrib.forbidden;
061        }
062
063        public Collection<AvroReplacementDescriptor> getDescriptors() {
064            return currentContribs.values();
065        }
066    }
067
068    protected static class AvroSchemaDescriptorRegistry extends SimpleContributionRegistry<AvroSchemaDescriptor> {
069        @Override
070        public String getContributionId(AvroSchemaDescriptor contrib) {
071            return contrib.name;
072        }
073
074        public Collection<AvroSchemaDescriptor> getDescriptors() {
075            return currentContribs.values();
076        }
077    }
078
079    protected static class AvroSchemaFactoryDescriptorRegistry
080            extends SimpleContributionRegistry<AvroSchemaFactoryDescriptor> {
081        @Override
082        public String getContributionId(AvroSchemaFactoryDescriptor contrib) {
083            return contrib.type;
084        }
085
086        public Collection<AvroSchemaFactoryDescriptor> getDescriptors() {
087            return currentContribs.values();
088        }
089    }
090
091    public static final String SCHEMA_XP = "schema";
092
093    public static final String MAPPER_XP = "mapper";
094
095    public static final String FACTORY_XP = "factory";
096
097    public static final String REPLACEMENT_XP = "replacement";
098
099    protected final AvroMapperDescriptorRegistry avroMapperDescriptors = new AvroMapperDescriptorRegistry();
100
101    protected final AvroSchemaDescriptorRegistry schemaDescriptors = new AvroSchemaDescriptorRegistry();
102
103    protected final AvroSchemaFactoryDescriptorRegistry avroSchemaFactoryDescriptors = new AvroSchemaFactoryDescriptorRegistry();
104
105    protected final AvroReplacementDescriptorRegistry replacementDescriptors = new AvroReplacementDescriptorRegistry();
106
107    protected AvroService avroService;
108
109    @Override
110    @SuppressWarnings("unchecked")
111    public <T> T getAdapter(Class<T> adapter) {
112        if (adapter.isAssignableFrom(avroService.getClass())) {
113            return (T) avroService;
114        }
115        return null;
116    }
117
118    @Override
119    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
120        switch (extensionPoint) {
121        case SCHEMA_XP:
122            schemaDescriptors.addContribution((AvroSchemaDescriptor) contribution);
123            break;
124        case MAPPER_XP:
125            avroMapperDescriptors.addContribution((AvroMapperDescriptor) contribution);
126            break;
127        case FACTORY_XP:
128            avroSchemaFactoryDescriptors.addContribution((AvroSchemaFactoryDescriptor) contribution);
129            break;
130        case REPLACEMENT_XP:
131            replacementDescriptors.addContribution((AvroReplacementDescriptor) contribution);
132            break;
133        default:
134            throw new RuntimeServiceException("Unknown extension point: " + extensionPoint);
135        }
136    }
137
138    @Override
139    public int getApplicationStartedOrder() {
140        return APPLICATION_START_ORDER;
141    }
142
143    @Override
144    @SuppressWarnings("unchecked")
145    public void start(ComponentContext context) {
146        // schema factories can be give to the constructor since they don't need a service instance
147        Collection<AvroSchemaFactoryDescriptor> factoryDescriptors = avroSchemaFactoryDescriptors.getDescriptors();
148        Map<Class<?>, Class<AvroSchemaFactory<?>>> factories = new HashMap<>(factoryDescriptors.size());
149        for (AvroSchemaFactoryDescriptor descriptor : factoryDescriptors) {
150            try {
151                Class<Object> type = (Class<Object>) Class.forName(descriptor.type);
152                factories.put(type, (Class<AvroSchemaFactory<?>>) Class.forName(descriptor.clazz));
153            } catch (ReflectiveOperationException e) {
154                throw new RuntimeServiceException(e);
155            }
156        }
157        // as well as replacements
158        AvroServiceImpl impl = new AvroServiceImpl(replacementDescriptors.getDescriptors(), factories);
159        // mappers are instantiated with an instance of the service
160        Collection<AvroMapperDescriptor> mapperDescriptors = avroMapperDescriptors.getDescriptors();
161        Map<Class<?>, AvroMapper<?, ?>> mappers = new HashMap<>(mapperDescriptors.size());
162        for (AvroMapperDescriptor descriptor : mapperDescriptors) {
163            try {
164                Class<Object> type = (Class<Object>) Class.forName(descriptor.type);
165                Class<AvroMapper<?, ?>> clazz = (Class<AvroMapper<?, ?>>) Class.forName(descriptor.clazz);
166                Constructor<AvroMapper<?, ?>> constructor = clazz.getConstructor(AvroService.class);
167                mappers.put(type, constructor.newInstance(impl));
168            } catch (ReflectiveOperationException e) {
169                throw new RuntimeServiceException(e);
170            }
171        }
172        // and are added to the service implementation
173        impl.setMappers(mappers);
174        // schemas are registered through the SchemaService interface
175        for (AvroSchemaDescriptor descriptor : schemaDescriptors.getDescriptors()) {
176            URL url = context.getRuntimeContext().getResource(descriptor.file);
177            try (InputStream stream = url == null ? null : url.openStream()) {
178                if (stream == null) {
179                    throw new RuntimeServiceException("Could not load stream for file " + descriptor.file);
180                }
181                impl.addSchema(new Schema.Parser().parse(stream));
182            } catch (IOException e) {
183                throw new RuntimeServiceException(e);
184            }
185        }
186        avroService = impl;
187    }
188
189    @Override
190    public void stop(ComponentContext context) {
191        avroService = null;
192    }
193
194    @Override
195    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
196        switch (extensionPoint) {
197        case SCHEMA_XP:
198            schemaDescriptors.removeContribution((AvroSchemaDescriptor) contribution);
199            break;
200        case MAPPER_XP:
201            avroMapperDescriptors.removeContribution((AvroMapperDescriptor) contribution);
202            break;
203        case FACTORY_XP:
204            avroSchemaFactoryDescriptors.removeContribution((AvroSchemaFactoryDescriptor) contribution);
205            break;
206        case REPLACEMENT_XP:
207            replacementDescriptors.removeContribution((AvroReplacementDescriptor) contribution);
208            break;
209        default:
210            throw new RuntimeServiceException("Unknown extension point: " + extensionPoint);
211        }
212    }
213
214}