001/*
002 * (C) Copyright 2018 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     bdelbosc
018 */
019package org.nuxeo.lib.stream.codec;
020
021import java.io.File;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.nio.file.Files;
026import java.nio.file.Path;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.stream.Stream;
030
031import org.apache.avro.Schema;
032import org.apache.avro.SchemaNormalization;
033import org.apache.avro.SchemaParseException;
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036
037/**
038 * Very simple SchemaStore that uses a file storage to persists its schemas.
039 *
040 * @since 10.3
041 */
042public class FileAvroSchemaStore implements AvroSchemaStore {
043
044    private static final Log log = LogFactory.getLog(FileAvroSchemaStore.class);
045
046    protected static final String AVRO_SCHEMA_EXT = ".avsc";
047
048    protected final Path schemaDirectoryPath;
049
050    protected final Map<Long, Schema> schemas = new ConcurrentHashMap<>();
051
052    public FileAvroSchemaStore(Path schemaDirectoryPath) {
053        this.schemaDirectoryPath = schemaDirectoryPath;
054        File directory = schemaDirectoryPath.toFile();
055        if (directory.exists()) {
056            if (!directory.isDirectory()) {
057                throw new IllegalArgumentException("Invalid SchemaStore root path: " + schemaDirectoryPath);
058            }
059            loadSchemas(schemaDirectoryPath);
060        } else {
061            directory.mkdirs();
062        }
063    }
064
065    /**
066     * Load all avro schema files from this directory. Files must have the .avsc extention.
067     */
068    public void loadSchemas(Path directory) {
069        try (Stream<Path> paths = Files.list(directory)) {
070            paths.filter(path -> Files.isReadable(path) && path.getFileName().toString().endsWith(AVRO_SCHEMA_EXT))
071                 .forEach(this::loadSchema);
072        } catch (IOException e) {
073            throw new IllegalArgumentException("Invalid base path: " + directory, e);
074        }
075    }
076
077    /**
078     * Load the avro schema from this file.
079     */
080    public void loadSchema(Path schemaPath) {
081        try {
082            Schema schema = new Schema.Parser().parse(schemaPath.toFile());
083            addSchema(schema);
084        } catch (IOException | SchemaParseException e) {
085            log.error("Invalid schema file: " + schemaPath, e);
086        }
087    }
088
089    @Override
090    public long addSchema(Schema schema) {
091        long fp = SchemaNormalization.parsingFingerprint64(schema);
092        if (schemas.put(fp, schema) == null) {
093            Path schemaPath = schemaDirectoryPath.resolve(
094                    String.format("%s-0x%08X%s", schema.getName(), fp, AVRO_SCHEMA_EXT));
095            try (PrintWriter out = new PrintWriter(schemaPath.toFile())) {
096                out.println(schema.toString(true));
097            } catch (FileNotFoundException e) {
098                throw new IllegalStateException("Cannot write schema to file: " + schemaPath);
099            }
100        }
101        return fp;
102    }
103
104    @Override
105    public Schema findByFingerprint(long fingerprint) {
106        return schemas.get(fingerprint);
107    }
108}