001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.codec;
020
021import java.io.File;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.Map;
028import java.util.stream.Stream;
029
030import org.apache.avro.Schema;
031import org.apache.avro.SchemaNormalization;
032
033import avro.shaded.com.google.common.collect.MapMaker;
034
035/**
036 * Very simple SchemaStore that uses a file storage to persists its schemas.
037 *
038 * @since 10.3
039 */
040public class FileAvroSchemaStore implements AvroSchemaStore {
041
042    protected static final String AVRO_SCHEMA_EXT = ".avsc";
043
044    protected final Path schemaDirectoryPath;
045
046    protected final Map<Long, Schema> schemas = new MapMaker().makeMap();
047
048    public FileAvroSchemaStore(Path schemaDirectoryPath) {
049        this.schemaDirectoryPath = schemaDirectoryPath;
050        File directory = schemaDirectoryPath.toFile();
051        if (directory.exists()) {
052            if (!directory.isDirectory()) {
053                throw new IllegalArgumentException("Invalid SchemaStore root path: " + schemaDirectoryPath);
054            }
055            loadSchemas(schemaDirectoryPath);
056        } else {
057            directory.mkdirs();
058        }
059    }
060
061    /**
062     * Load all avro schema files from this directory. Files must have the .avsc extention.
063     */
064    public void loadSchemas(Path directory) {
065        try (Stream<Path> paths = Files.list(directory)) {
066            paths.filter(path -> Files.isReadable(path) && path.getFileName().toString().endsWith(AVRO_SCHEMA_EXT))
067                 .forEach(this::loadSchema);
068        } catch (IOException e) {
069            throw new IllegalArgumentException("Invalid base path: " + directory, e);
070        }
071    }
072
073    /**
074     * Load the avro schema from this file.
075     */
076    public void loadSchema(Path schemaPath) {
077        Schema schema;
078        try {
079            schema = new Schema.Parser().parse(schemaPath.toFile());
080        } catch (IOException e) {
081            throw new IllegalArgumentException("Invalid schema file: " + schemaPath, e);
082        }
083        addSchema(schema);
084    }
085
086    @Override
087    public long addSchema(Schema schema) {
088        long fp = SchemaNormalization.parsingFingerprint64(schema);
089        if (schemas.put(fp, schema) == null) {
090            Path schemaPath = schemaDirectoryPath.resolve(
091                    String.format("%s-0x%08X%s", schema.getName(), fp, AVRO_SCHEMA_EXT));
092            try (PrintWriter out = new PrintWriter(schemaPath.toFile())) {
093                out.println(schema.toString(true));
094            } catch (FileNotFoundException e) {
095                throw new IllegalStateException("Cannot write schema to file: " + schemaPath);
096            }
097        }
098        return fp;
099    }
100
101    @Override
102    public Schema findByFingerprint(long fingerprint) {
103        return schemas.get(fingerprint);
104    }
105}