001/*
002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     pierre
018 */
019package org.nuxeo.runtime.avro;
020
021import java.lang.reflect.Constructor;
022import java.nio.file.Paths;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.ListIterator;
029import java.util.Map;
030import java.util.Map.Entry;
031
032import org.apache.avro.Schema;
033import org.apache.avro.generic.GenericRecord;
034import org.nuxeo.common.Environment;
035import org.nuxeo.lib.stream.codec.AvroSchemaStore;
036import org.nuxeo.lib.stream.codec.FileAvroSchemaStore;
037import org.nuxeo.runtime.RuntimeServiceException;
038import org.nuxeo.runtime.api.Framework;
039
040/**
041 * @since 10.2
042 */
043public class AvroServiceImpl implements AvroService {
044
045    private static final AvroMapper<Object, Object> NULL = new AvroMapper<Object, Object>(null) {
046
047        @Override
048        public Object fromAvro(Schema schema, Object input) {
049            return null;
050        }
051
052        @Override
053        public GenericRecord toAvro(Schema schema, Object input) {
054            return null;
055        }
056
057    };
058
059    protected final Map<Class<?>, Class<AvroSchemaFactory<?>>> factories;
060
061    protected final List<AvroReplacementDescriptor> replacements;
062
063    protected final AvroSchemaStore schemaStore;
064
065    protected Map<Class<?>, AvroMapper<?, ?>> mappers;
066
067    public AvroServiceImpl(Collection<AvroReplacementDescriptor> replacements,
068            Map<Class<?>, Class<AvroSchemaFactory<?>>> factories) {
069        this.replacements = new ArrayList<>(replacements);
070        this.factories = new HashMap<>(factories);
071        String dataDir = Framework.getProperty(Environment.NUXEO_DATA_DIR);
072        if (dataDir != null) {
073            this.schemaStore = new FileAvroSchemaStore(Paths.get(dataDir, "avro"));
074        } else {
075            this.schemaStore = new FileAvroSchemaStore(
076                    Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "avro"));
077        }
078        Collections.sort(this.replacements);
079        // assert at creation that factories are valid
080        createContext();
081    }
082
083    @Override
084    public AvroSchemaStore getSchemaStore() {
085        return schemaStore;
086    }
087
088    @Override
089    public <D> Schema createSchema(D input) {
090        return createContext().createSchema(input);
091    }
092
093    @Override
094    public String decodeName(String input) {
095        String output = input;
096        ListIterator<AvroReplacementDescriptor> it = replacements.listIterator(replacements.size());
097        while (it.hasPrevious()) {
098            AvroReplacementDescriptor descriptor = it.previous();
099            output = output.replaceAll(descriptor.replacement, descriptor.forbidden);
100        }
101        return output;
102    }
103
104    @Override
105    public String encodeName(String input) {
106        String output = input;
107        for (AvroReplacementDescriptor descriptor : replacements) {
108            output = output.replaceAll(descriptor.forbidden, descriptor.replacement);
109        }
110        return output;
111    }
112
113    @Override
114    @SuppressWarnings("unchecked")
115    public <D, M> D fromAvro(Schema schema, Class<D> clazz, M input) {
116        return (D) getMapper(clazz).fromAvro(schema, input);
117    }
118
119    public void setMappers(Map<Class<?>, AvroMapper<?, ?>> mappers) {
120        this.mappers = new HashMap<>(mappers);
121    }
122
123    @Override
124    @SuppressWarnings("unchecked")
125    public <D, M> M toAvro(Schema schema, D input) {
126        return (M) getMapper((Class<D>) input.getClass()).toAvro(schema, input);
127    }
128
129    protected AvroSchemaFactoryContext createContext() {
130        AvroSchemaFactoryContext context = new AvroSchemaFactoryContext(this);
131        for (Entry<Class<?>, Class<AvroSchemaFactory<?>>> entry : factories.entrySet()) {
132            try {
133                Class<AvroSchemaFactory<?>> clazz = entry.getValue();
134                Constructor<AvroSchemaFactory<?>> constructor = clazz.getConstructor(AvroSchemaFactoryContext.class);
135                AvroSchemaFactory<?> factory = constructor.newInstance(context);
136                context.register(entry.getKey(), factory);
137            } catch (ReflectiveOperationException e) {
138                throw new RuntimeServiceException(e);
139            }
140        }
141        return context;
142    }
143
144    @SuppressWarnings("unchecked")
145    protected <D, M> AvroMapper<D, M> getMapper(Class<D> clazz) {
146        AvroMapper<?, ?> factory = mappers.get(clazz);
147        if (factory != null) {
148            return (AvroMapper<D, M>) factory;
149        }
150        for (Class<?> intrface : clazz.getInterfaces()) {
151            factory = mappers.get(intrface);
152            if (factory != null) {
153                return (AvroMapper<D, M>) factory;
154            }
155        }
156        for (Entry<Class<?>, AvroMapper<?, ?>> entry : mappers.entrySet()) {
157            if (entry.getKey().isAssignableFrom(clazz)) {
158                return (AvroMapper<D, M>) entry.getValue();
159            }
160        }
161        return (AvroMapper<D, M>) NULL;
162    }
163
164}