Skip to content

Commit

Permalink
AVRO References Support
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Jul 29, 2024
1 parent 8c50eb0 commit 0167a75
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 16 deletions.
1 change: 1 addition & 0 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def check_compatibility(
if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)

if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
Expand Down
70 changes: 66 additions & 4 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Dict, Final, final, Mapping, Sequence

import avro.schema
import hashlib
import logging
import re

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,6 +140,7 @@ def normalize_schema_str(
except JSONDecodeError as e:
LOG.info("Schema is not valid JSON")
raise e

elif schema_type == SchemaType.PROTOBUF:
if schema:
schema_str = str(schema)
Expand Down Expand Up @@ -180,6 +183,44 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


class AvroMerge:
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None):
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
self.dependencies = dependencies
self.unique_id = 0

def union_safe_schema_str(self, schema_str: str) -> str:
# in case we meet union - we use it as is
regex = re.compile(r"^\s*\[")
base_schema = (
f'{{ "name": "___RESERVED_KARAPACE_WRAPPER_NAME_{self.unique_id}___",'
f'"type": "record", "fields": [{{"name": "name", "type":'
)
if regex.match(schema_str):
return f"{base_schema} {schema_str}}}]}}"
return f"{base_schema} [{schema_str}]}}]}}"

def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> str:
"""To support references in AVRO we iteratively merge all referenced schemas with current schema"""
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)]
merged_schemas = []

while stack:
current_schema_str, current_dependencies = stack.pop()
if current_dependencies:
stack.append((current_schema_str, None))
for dependency in reversed(current_dependencies.values()):
stack.append((dependency.schema.schema_str, dependency.schema.dependencies))
else:
self.unique_id += 1
merged_schemas.append(self.union_safe_schema_str(current_schema_str))

return ",\n".join(merged_schemas)

def wrap(self) -> str:
return "[\n" + self.builder(self.schema_str, self.dependencies) + "\n]"


def parse(
schema_type: SchemaType,
schema_str: str,
Expand All @@ -188,21 +229,41 @@ def parse(
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
normalize: bool = False,
dependencies_compat: bool = False,
) -> ParsedTypedSchema:
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema_result: Draft7Validator | AvroSchema | ProtobufSchema
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
if schema_type is SchemaType.AVRO:
try:
if dependencies or dependencies_compat:
wrapped_schema_str = AvroMerge(schema_str, dependencies).wrap()
else:
wrapped_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
schema_str,
wrapped_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies or dependencies_compat:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]

else:
raise InvalidSchema
else:
parsed_schema_result = parsed_schema
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
schema=parsed_schema_result,
references=references,
dependencies=dependencies,
schema_wrapped=parsed_schema,
)
except (SchemaParseException, JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
Expand Down Expand Up @@ -264,9 +325,10 @@ def __init__(
schema: Draft7Validator | AvroSchema | ProtobufSchema,
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None,
) -> None:
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema

self.schema_wrapped = schema_wrapped
super().__init__(
schema_type=schema_type,
schema_str=schema_str,
Expand Down
34 changes: 23 additions & 11 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

try:
schema_type_parsed = SchemaType(schema_type)
except ValueError:
except ValueError as e:
LOG.warning("Invalid schema type: %s", schema_type)
return
raise e

# This does two jobs:
# - Validates the schema's JSON
Expand All @@ -525,12 +525,24 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None
resolved_dependencies: dict[str, Dependency] | None = None
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
if schema_type_parsed == SchemaType.AVRO:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
raise e
except InvalidReferences as e:
LOG.exception("Invalid AVRO references")
raise e
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
return
raise e
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
if schema_references:
Expand All @@ -544,12 +556,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
normalize=False,
)
schema_str = str(parsed_schema)
except InvalidSchema:
except InvalidSchema as e:
LOG.exception("Schema is not valid ProtoBuf definition")
return
except InvalidReferences:
raise e
except InvalidReferences as e:
LOG.exception("Invalid Protobuf references")
return
raise e

try:
typed_schema = TypedSchema(
Expand All @@ -559,8 +571,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
dependencies=resolved_dependencies,
schema=parsed_schema,
)
except (InvalidSchema, JSONDecodeError):
return
except (InvalidSchema, JSONDecodeError) as e:
raise e

self.database.insert_schema_version(
subject=schema_subject,
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
self.r(
body={
"error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value,
Expand Down
Loading

0 comments on commit 0167a75

Please sign in to comment.