Skip to content

Commit

Permalink
Merge pull request #270 from RADAR-base/release-0.7.2-1
Browse files Browse the repository at this point in the history
Release 0.7.2
  • Loading branch information
yatharthranjan authored Jul 30, 2021
2 parents 0a16c80 + 106713e commit 68464bb
Show file tree
Hide file tree
Showing 16 changed files with 573 additions and 152 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ jobs:
- name: Check
run: ./gradlew check

- name: Validate schemas
run: ./gradlew :radar-schemas-tools:run --args="validate $GITHUB_WORKSPACE"

docker:
# The type of runner that the job will run on
runs-on: ubuntu-latest
Expand Down
17 changes: 8 additions & 9 deletions docker/topic_init.sh
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
#!/bin/bash

NUM_TRIES=${TOPIC_INIT_TRIES:-20}

if [ -z NO_VALIDATE ]; then
radar-schemas-tools validate merged
fi

# Create topics
echo "Creating RADAR-base topics..."
echo "Creating RADAR-base topics. Will try ${NUM_TRIES} times..."

if ! radar-schemas-tools create -c "${KAFKA_CONFIG_PATH}" -p $KAFKA_NUM_PARTITIONS -r $KAFKA_NUM_REPLICATION -b $KAFKA_NUM_BROKERS -s "${KAFKA_BOOTSTRAP_SERVERS}" merged; then
echo "FAILED TO CREATE TOPICS ... Retrying"
if ! radar-schemas-tools create -c "${KAFKA_CONFIG_PATH}" -p $KAFKA_NUM_PARTITIONS -r $KAFKA_NUM_REPLICATION -b $KAFKA_NUM_BROKERS -s "${KAFKA_BOOTSTRAP_SERVERS}" merged; then
if radar-schemas-tools create -c "${KAFKA_CONFIG_PATH}" -p $KAFKA_NUM_PARTITIONS -r $KAFKA_NUM_REPLICATION -b $KAFKA_NUM_BROKERS -s "${KAFKA_BOOTSTRAP_SERVERS}" -n ${NUM_TRIES} merged; then
echo "Created topics"
else
echo "FAILED TO CREATE TOPICS"
exit 1
else
echo "Created topics at second attempt"
fi
else
echo "Topics created."
fi

echo "Topics created."

echo "Registering RADAR-base schemas..."
if ! radar-schemas-tools register --force -u "$SCHEMA_REGISTRY_API_KEY" -p "$SCHEMA_REGISTRY_API_SECRET" "${KAFKA_SCHEMA_REGISTRY}" merged; then
echo "FAILED TO REGISTER SCHEMAS"
Expand Down
4 changes: 2 additions & 2 deletions java-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//---------------------------------------------------------------------------//

plugins {
id 'com.github.davidmc24.gradle.plugin.avro-base' version '1.1.0'
id 'com.github.davidmc24.gradle.plugin.avro-base' version '1.2.0'
id("io.github.gradle-nexus.publish-plugin") version "1.1.0"
}

allprojects {
version = '0.7.1'
version = '0.7.2'
group = 'org.radarbase'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package org.radarbase.schema;

import static java.util.function.Function.identity;
import static java.util.function.Predicate.not;
import kotlin.Pair;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericRecord;
import org.radarbase.config.AvroTopicConfig;
import org.radarbase.schema.validation.SchemaValidator;
import org.radarbase.schema.validation.rules.SchemaMetadata;
import org.radarbase.topic.AvroTopic;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -15,32 +21,33 @@
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericRecord;
import org.radarbase.config.AvroTopicConfig;
import org.radarbase.schema.validation.SchemaValidator;
import org.radarbase.schema.validation.rules.SchemaMetadata;
import org.radarbase.topic.AvroTopic;

import static java.util.function.Function.identity;
import static java.util.function.Predicate.not;

public class SchemaCatalogue {
private final Path root;
private final Map<String, SchemaMetadata> schemas;
private final List<Path> unmappedFiles;
private final List<SchemaMetadata> unmappedFiles;


public SchemaCatalogue(Path root) throws IOException {
this(root, null, p -> Files.isRegularFile(p) && SchemaValidator.isAvscFile(p));
this(root, null);
}

public SchemaCatalogue(Path root, Scope scope, Predicate<Path> filterPath) throws IOException {
public SchemaCatalogue(Path root, Scope scope) throws IOException {
this.root = root.resolve("commons");
Map<String, SchemaMetadata> schemaTemp = new HashMap<>();
List<Path> unmappedTemp = new ArrayList<>();
List<SchemaMetadata> unmappedTemp = new ArrayList<>();

loadSchemas(schemaTemp, unmappedTemp, scope, filterPath);
if (scope != null) {
loadSchemas(schemaTemp, unmappedTemp, scope);
} else {
for (Scope useScope : Scope.values()) {
loadSchemas(schemaTemp, unmappedTemp, useScope);
}
}

schemas = Collections.unmodifiableMap(schemaTemp);
unmappedFiles = Collections.unmodifiableList(unmappedTemp);
Expand All @@ -55,49 +62,32 @@ public SchemaCatalogue(Path root, Scope scope, Predicate<Path> filterPath) throw
* @throws IllegalArgumentException if the topic configuration is null
*/
public AvroTopic<GenericRecord, GenericRecord> getGenericAvroTopic(AvroTopicConfig config) {
SchemaMetadata parsedKeySchema = schemas.get(config.getKeySchema());
if (parsedKeySchema == null) {
throw new NoSuchElementException("Key schema " + config.getKeySchema()
+ " for topic " + config.getTopic() + " not found.");
}

SchemaMetadata parsedValueSchema = schemas.get(config.getValueSchema());
if (parsedValueSchema == null) {
throw new NoSuchElementException("Value schema " + config.getValueSchema()
+ " for topic " + config.getTopic() + " not found.");
}

Pair<SchemaMetadata, SchemaMetadata> schemaMetadata = getSchemaMetadata(config);
return new AvroTopic<>(config.getTopic(),
parsedKeySchema.getSchema(), parsedValueSchema.getSchema(),
schemaMetadata.component1().getSchema(), schemaMetadata.component2().getSchema(),
GenericRecord.class, GenericRecord.class);
}

public Map<String, SchemaMetadata> getSchemas() {
return schemas;
}

public List<Path> getUnmappedAvroFiles() {
public List<SchemaMetadata> getUnmappedAvroFiles() {
return unmappedFiles;
}

private void loadSchemas(
Map<String, SchemaMetadata> schemas,
List<Path> unmappedFiles,
Scope scope,
Predicate<Path> filterPath) throws IOException {
List<SchemaMetadata> unmappedFiles,
Scope scope) throws IOException {

Path walkRoot;
if (scope != null) {
walkRoot = scope.getPath(root);
if (walkRoot == null) {
throw new IllegalArgumentException("No scope directory for scope " + scope);
}
} else {
walkRoot = root;
Path walkRoot = scope.getPath(root);
if (walkRoot == null) {
return;
}

List<Path> avroFiles = Files.walk(walkRoot)
.filter(filterPath)
.filter(p -> Files.isRegularFile(p) && SchemaValidator.isAvscFile(p))
.collect(Collectors.toList());

int prevSize = -1;
Expand Down Expand Up @@ -144,6 +134,31 @@ private void loadSchemas(

unmappedFiles.addAll(avroFiles.stream()
.filter(p -> !mappedPaths.contains(p))
.map(p -> new SchemaMetadata(null, scope, p))
.collect(Collectors.toList()));
}

/**
* Returns an avro topic with the schemas from this catalogue.
* @param config avro topic configuration
* @return AvroTopic with
* @throws NoSuchElementException if the key or value schema do not exist in this catalogue.
* @throws NullPointerException if the key or value schema configurations are null
* @throws IllegalArgumentException if the topic configuration is null
*/
public Pair<SchemaMetadata, SchemaMetadata> getSchemaMetadata(AvroTopicConfig config) {
SchemaMetadata parsedKeySchema = schemas.get(config.getKeySchema());
if (parsedKeySchema == null) {
throw new NoSuchElementException("Key schema " + config.getKeySchema()
+ " for topic " + config.getTopic() + " not found.");
}

SchemaMetadata parsedValueSchema = schemas.get(config.getValueSchema());
if (parsedValueSchema == null) {
throw new NoSuchElementException("Value schema " + config.getValueSchema()
+ " for topic " + config.getTopic() + " not found.");
}

return new Pair<>(parsedKeySchema, parsedValueSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,38 @@

package org.radarbase.schema.validation;

import static org.radarbase.schema.validation.rules.Validator.raise;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kotlin.Pair;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.radarbase.schema.SchemaCatalogue;
import org.radarbase.schema.Scope;
import org.radarbase.schema.specification.DataProducer;
import org.radarbase.schema.specification.SourceCatalogue;
import org.radarbase.schema.validation.config.ExcludeConfig;
import org.radarbase.schema.validation.rules.RadarSchemaMetadataRules;
import org.radarbase.schema.validation.rules.RadarSchemaRules;
import org.radarbase.schema.validation.rules.SchemaMetadata;
import org.radarbase.schema.validation.rules.SchemaMetadataRules;
import org.radarbase.schema.validation.rules.Validator;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Validator for a set of RADAR-Schemas.
*/
public class SchemaValidator {
public static final String AVRO_EXTENSION = "avsc";

private final Path root;
private final ExcludeConfig config;
private final Validator<SchemaMetadata> validator;
private final SchemaMetadataRules rules;
private Validator<SchemaMetadata> validator;

/**
* Schema validator for given RADAR-Schemas directory.
Expand All @@ -56,52 +56,75 @@ public class SchemaValidator {
*/
public SchemaValidator(Path root, ExcludeConfig config) {
this.config = config;
this.root = root;
this.rules = new RadarSchemaMetadataRules(root, config);
this.validator = rules.getValidator();
this.validator = rules.getValidator(false);
}

public Stream<ValidationException> analyseSourceCatalogue(
Scope scope, SourceCatalogue catalogue) {
this.validator = rules.getValidator(true);
Stream<DataProducer<?>> producers;
if (scope != null) {
producers = catalogue.getSources().stream()
.filter(s -> s.getScope().equals(scope));
} else {
producers = catalogue.getSources().stream();
}

try {
return producers.flatMap(s -> s.getData().stream())
.flatMap(d -> {
Pair<SchemaMetadata, SchemaMetadata> metadata =
catalogue.getSchemaCatalogue().getSchemaMetadata(d);
return Stream.of(metadata.component1(), metadata.component2());
})
.sorted(Comparator.comparing(s -> s.getSchema().getFullName()))
.distinct()
.flatMap(this::validate)
.distinct();
} finally {
this.validator = rules.getValidator(false);
}
}

/**
* TODO.
* @param scope TODO.
*/
public Stream<ValidationException> analyseFiles(Scope scope) {
try {
SchemaCatalogue schemaCatalogue = new SchemaCatalogue(root, scope,
p -> Files.isRegularFile(p)
&& SchemaValidator.isAvscFile(p)
&& !config.skipFile(p));

Map<String, Schema> useTypes = schemaCatalogue.getSchemas().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().getSchema()));

return Stream.concat(
schemaCatalogue.getUnmappedAvroFiles().stream()
.map(p -> {
Parser parser = new Parser();
parser.addTypes(useTypes);
try {
parser.parse(p.toFile());
return null;
} catch (Exception ex) {
return new ValidationException("Cannot parse schema", ex);
}
})
.filter(Objects::nonNull),
schemaCatalogue.getSchemas().values().stream()
.flatMap(this::validate)
);
} catch (IOException ex) {
return raise("Failed to read files: " + ex, ex);
public Stream<ValidationException> analyseFiles(Scope scope, SchemaCatalogue schemaCatalogue) {
if (scope == null) {
return analyseFiles(schemaCatalogue);
}
this.validator = rules.getValidator(false);
Map<String, Schema> useTypes = schemaCatalogue.getSchemas().entrySet().stream()
.filter(s -> s.getValue().getScope().equals(scope))
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().getSchema()));

return Stream.concat(
schemaCatalogue.getUnmappedAvroFiles().stream()
.filter(s -> s.getScope().equals(scope))
.map(p -> {
Parser parser = new Parser();
parser.addTypes(useTypes);
try {
parser.parse(p.getPath().toFile());
return null;
} catch (Exception ex) {
return new ValidationException("Cannot parse schema", ex);
}
})
.filter(Objects::nonNull),
schemaCatalogue.getSchemas().values().stream()
.flatMap(this::validate)
).distinct();
}

/**
* TODO.
*/
public Stream<ValidationException> analyseFiles() {
public Stream<ValidationException> analyseFiles(SchemaCatalogue schemaCatalogue) {
return Arrays.stream(Scope.values())
.flatMap(this::analyseFiles);
.flatMap(scope -> analyseFiles(scope, schemaCatalogue));
}

/** Validate a single schema in given path. */
Expand All @@ -111,6 +134,9 @@ public Stream<ValidationException> validate(Schema schema, Path path, Scope scop

/** Validate a single schema in given path. */
public Stream<ValidationException> validate(SchemaMetadata schemaMetadata) {
if (config.skipFile(schemaMetadata.getPath())) {
return Stream.empty();
}
return validator.apply(schemaMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package org.radarbase.schema.validation;

import static org.radarbase.schema.validation.ValidationHelper.SPECIFICATIONS_PATH;
import org.radarbase.schema.Scope;
import org.radarbase.schema.validation.config.ExcludeConfig;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.radarbase.schema.Scope;
import org.radarbase.schema.validation.config.ExcludeConfig;

import static org.radarbase.schema.validation.ValidationHelper.SPECIFICATIONS_PATH;

/**
* Validates RADAR-Schemas specifications.
Expand Down
Loading

0 comments on commit 68464bb

Please sign in to comment.