001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.runtime.avro;
020
021import java.lang.reflect.Constructor;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.List;
027import java.util.ListIterator;
028import java.util.Map;
029import java.util.Map.Entry;
030
031import org.apache.avro.Schema;
032import org.apache.avro.generic.GenericRecord;
033import org.apache.avro.message.SchemaStore;
034import org.nuxeo.runtime.RuntimeServiceException;
035
036/**
037 * @since 10.2
038 */
039public class AvroServiceImpl extends SchemaStore.Cache implements AvroService {
040
041    private static final AvroMapper<Object, Object> NULL = new AvroMapper<Object, Object>(null) {
042
043        @Override
044        public Object fromAvro(Schema schema, Object input) {
045            return null;
046        }
047
048        @Override
049        public GenericRecord toAvro(Schema schema, Object input) {
050            return null;
051        }
052
053    };
054
055    protected final Map<Class<?>, Class<AvroSchemaFactory<?>>> factories;
056
057    protected final List<AvroReplacementDescriptor> replacements;
058
059    protected Map<Class<?>, AvroMapper<?, ?>> mappers;
060
061    public AvroServiceImpl(Collection<AvroReplacementDescriptor> replacements,
062            Map<Class<?>, Class<AvroSchemaFactory<?>>> factories) {
063        this.replacements = new ArrayList<>(replacements);
064        this.factories = new HashMap<>(factories);
065        this.replacements.sort(Comparator.naturalOrder());
066        // assert at creation that factories are valid
067        createContext();
068    }
069
070    @Override
071    public <D> Schema createSchema(D input) {
072        return createContext().createSchema(input);
073    }
074
075    @Override
076    public String decodeName(String input) {
077        String output = input;
078        ListIterator<AvroReplacementDescriptor> it = replacements.listIterator(replacements.size());
079        while (it.hasPrevious()) {
080            AvroReplacementDescriptor replacement = it.previous();
081            output = output.replaceAll(replacement.getReplacement(), replacement.getForbidden());
082        }
083        return output;
084    }
085
086    @Override
087    public String encodeName(String input) {
088        String output = input;
089        for (AvroReplacementDescriptor replacement : replacements) {
090            output = output.replaceAll(replacement.getForbidden(), replacement.getReplacement());
091        }
092        return output;
093    }
094
095    @Override
096    @SuppressWarnings("unchecked")
097    public <D, M> D fromAvro(Schema schema, Class<D> clazz, M input) {
098        return (D) getMapper(clazz).fromAvro(schema, input);
099    }
100
101    public void setMappers(Map<Class<?>, AvroMapper<?, ?>> mappers) {
102        this.mappers = new HashMap<>(mappers);
103    }
104
105    @Override
106    @SuppressWarnings("unchecked")
107    public <D, M> M toAvro(Schema schema, D input) {
108        return (M) getMapper((Class<D>) input.getClass()).toAvro(schema, input);
109    }
110
111    protected AvroSchemaFactoryContext createContext() {
112        AvroSchemaFactoryContext context = new AvroSchemaFactoryContext(this);
113        for (Entry<Class<?>, Class<AvroSchemaFactory<?>>> entry : factories.entrySet()) {
114            try {
115                Class<AvroSchemaFactory<?>> clazz = entry.getValue();
116                Constructor<AvroSchemaFactory<?>> constructor = clazz.getConstructor(AvroSchemaFactoryContext.class);
117                AvroSchemaFactory<?> factory = constructor.newInstance(context);
118                context.register(entry.getKey(), factory);
119            } catch (ReflectiveOperationException e) {
120                throw new RuntimeServiceException(e);
121            }
122        }
123        return context;
124    }
125
126    @SuppressWarnings("unchecked")
127    protected <D, M> AvroMapper<D, M> getMapper(Class<D> clazz) {
128        AvroMapper<?, ?> factory = mappers.get(clazz);
129        if (factory != null) {
130            return (AvroMapper<D, M>) factory;
131        }
132        for (Class<?> intrface : clazz.getInterfaces()) {
133            factory = mappers.get(intrface);
134            if (factory != null) {
135                return (AvroMapper<D, M>) factory;
136            }
137        }
138        for (Entry<Class<?>, AvroMapper<?, ?>> entry : mappers.entrySet()) {
139            if (entry.getKey().isAssignableFrom(clazz)) {
140                return (AvroMapper<D, M>) entry.getValue();
141            }
142        }
143        return (AvroMapper<D, M>) NULL;
144    }
145
146}