Skip to content

Commit

Permalink
Implementation of AVRO References support
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Sep 26, 2024
1 parent 88bff01 commit c156fe9
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 6 deletions.
1 change: 1 addition & 0 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def check_compatibility(
if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)

if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
Expand Down
70 changes: 66 additions & 4 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Collection, Dict, Final, final, Mapping, Sequence

import avro.schema
import hashlib
import logging
import re

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,6 +154,7 @@ def normalize_schema_str(
except JSONDecodeError as e:
LOG.info("Schema is not valid JSON")
raise e

elif schema_type == SchemaType.PROTOBUF:
if schema:
schema_str = str(schema)
Expand Down Expand Up @@ -194,6 +197,45 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


class AvroMerge:
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None):
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
self.dependencies = dependencies
self.unique_id = 0
self.regex = re.compile(r"^\s*\[")

def union_safe_schema_str(self, schema_str: str) -> str:
# in case we meet union - we use it as is

base_schema = (
f'{{"name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",'
f'"type": "record", "fields": [{{"name": "name", "type":'
)
if self.regex.match(schema_str):
return f"{base_schema} {schema_str}}}]}}"
return f"{base_schema} [{schema_str}]}}]}}"

def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str:
"""To support references in AVRO we iteratively merge all referenced schemas with current schema"""
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)]
merged_schemas = []

while stack:
current_schema_str, current_dependencies = stack.pop()
if current_dependencies:
stack.append((current_schema_str, None))
for dependency in reversed(current_dependencies.values()):
stack.append((dependency.schema.schema_str, dependency.schema.dependencies))
else:
self.unique_id += 1
merged_schemas.append(self.union_safe_schema_str(current_schema_str))

return ",\n".join(merged_schemas)

def wrap(self) -> str:
return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]"


def parse(
schema_type: SchemaType,
schema_str: str,
Expand All @@ -206,18 +248,37 @@ def parse(
) -> ParsedTypedSchema:
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
if schema_type is SchemaType.AVRO:
try:
if dependencies:
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
else:
wrapped_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
schema_str,
wrapped_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]

else:
raise InvalidSchema
else:
parsed_schema_result = parsed_schema
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
schema=parsed_schema_result,
references=references,
dependencies=dependencies,
schema_wrapped=parsed_schema,
)
except (SchemaParseException, JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
Expand Down Expand Up @@ -284,9 +345,10 @@ def __init__(
schema: Draft7Validator | AvroSchema | ProtobufSchema,
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None,
) -> None:
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema

self.schema_wrapped = schema_wrapped
super().__init__(
schema_type=schema_type,
schema_str=schema_str,
Expand Down
14 changes: 13 additions & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,19 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None
resolved_dependencies: dict[str, Dependency] | None = None
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
if schema_type_parsed == SchemaType.AVRO:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
raise e
except InvalidReferences as e:
LOG.exception("Invalid AVRO references")
raise e
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as exc:
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
self.r(
body={
"error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value,
Expand Down
Loading

0 comments on commit c156fe9

Please sign in to comment.