001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.runtime.avro;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.lang.reflect.Constructor;
024import java.net.URL;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.avro.Schema;
030import org.nuxeo.lib.stream.codec.AvroSchemaStore;
031import org.nuxeo.runtime.RuntimeServiceException;
032import org.nuxeo.runtime.kafka.KafkaConfigServiceImpl;
033import org.nuxeo.runtime.model.ComponentContext;
034import org.nuxeo.runtime.model.DefaultComponent;
035
036/**
037 * Avro component.
038 *
039 * @since 10.2
040 */
041public class AvroComponent extends DefaultComponent {
042
043    public static final String XP_SCHEMA = "schema";
044
045    public static final String XP_MAPPER = "mapper";
046
047    public static final String XP_FACTORY = "factory";
048
049    public static final String XP_REPLACEMENT = "replacement";
050
051    protected AvroService avroService;
052
053    @Override
054    @SuppressWarnings("unchecked")
055    public <T> T getAdapter(Class<T> adapter) {
056        if (adapter.isAssignableFrom(avroService.getClass())) {
057            return (T) avroService;
058        }
059        return null;
060    }
061
062    @Override
063    public int getApplicationStartedOrder() {
064        return KafkaConfigServiceImpl.APPLICATION_STARTED_ORDER;
065    }
066
067    @Override
068    @SuppressWarnings("unchecked")
069    public void start(ComponentContext context) {
070        super.start(context);
071        // schema factories can be give to the constructor since they don't need a service instance
072        List<AvroSchemaFactoryDescriptor> factoryDescs = getDescriptors(XP_FACTORY);
073        Map<Class<?>, Class<AvroSchemaFactory<?>>> factories = new HashMap<>(factoryDescs.size());
074        for (AvroSchemaFactoryDescriptor descriptor : factoryDescs) {
075            try {
076                Class<Object> type = (Class<Object>) Class.forName(descriptor.type);
077                factories.put(type, (Class<AvroSchemaFactory<?>>) Class.forName(descriptor.klass));
078            } catch (ReflectiveOperationException e) {
079                throw new RuntimeServiceException(e);
080            }
081        }
082        // as well as replacements
083        List<AvroReplacementDescriptor> replacementDescs = getDescriptors(XP_REPLACEMENT);
084        AvroServiceImpl impl = new AvroServiceImpl(replacementDescs, factories);
085        // mappers are instantiated with an instance of the service
086        List<AvroMapperDescriptor> mapperDescs = getDescriptors(XP_MAPPER);
087        Map<Class<?>, AvroMapper<?, ?>> mappers = new HashMap<>(mapperDescs.size());
088        for (AvroMapperDescriptor descriptor : mapperDescs) {
089            try {
090                Class<Object> type = (Class<Object>) Class.forName(descriptor.type);
091                Class<AvroMapper<?, ?>> clazz = (Class<AvroMapper<?, ?>>) Class.forName(descriptor.klass);
092                Constructor<AvroMapper<?, ?>> constructor = clazz.getConstructor(AvroService.class);
093                mappers.put(type, constructor.newInstance(impl));
094            } catch (ReflectiveOperationException e) {
095                throw new RuntimeServiceException(e);
096            }
097        }
098        // and are added to the service implementation
099        impl.setMappers(mappers);
100        List<AvroSchemaDescriptor> schemaDescs = getDescriptors(XP_SCHEMA);
101        // schemas are registered through the SchemaService interface
102        AvroSchemaStore schemaStore = impl.getSchemaStore();
103        for (AvroSchemaDescriptor descriptor : schemaDescs) {
104            URL url = context.getRuntimeContext().getResource(descriptor.file);
105            try (InputStream stream = url == null ? null : url.openStream()) {
106                if (stream == null) {
107                    throw new RuntimeServiceException("Could not load stream for file " + descriptor.file);
108                }
109
110                schemaStore.addSchema(new Schema.Parser().parse(stream));
111            } catch (IOException e) {
112                throw new RuntimeServiceException(e);
113            }
114        }
115        avroService = impl;
116    }
117
118    @Override
119    public void stop(ComponentContext context) throws InterruptedException {
120        super.stop(context);
121        avroService = null;
122    }
123
124}