-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support of References for AVRO Schemas #926
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleanup commits to single/coherent commits. Add tests for better coverage.
@jjaakola-aiven Could you suggest the tests that should be added? |
f0f9450
to
0167a75
Compare
@jjaakola-aiven I'm still waiting for your suggestions regarding additional tests for the new functionality. Please keep in mind that we already have integration tests covering various aspects, including:
Additionally, the new code has successfully passed all existing tests related to Avro functionality. I also added a few unit tests for the AvroMerge class today. |
karapace/schema_models.py
Outdated
|
||
def union_safe_schema_str(self, schema_str: str) -> str: | ||
# in case we meet union - we use it as is | ||
regex = re.compile(r"^\s*\[") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this will be called multiple times, maybe move this regex compilation out of the function and into the __init__()
, else, we compile every time this is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i see. Fixed.
karapace/schema_registry_apis.py
Outdated
@@ -1055,7 +1055,7 @@ def _validate_references( | |||
content_type=content_type, | |||
status=HTTPStatus.BAD_REQUEST, | |||
) | |||
if references and schema_type != SchemaType.PROTOBUF: | |||
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is wrong, we will never have both schema types within the same request, thus this will not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is right because it excludes both schemas from being processed by code block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me take another look, because I see you also use the API client in the integration tests and if the tests pass then this should be correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the two and
bugs me out, maybe a check for inclusion, i.e. if references and (schema_type not in {SchemaType.PROTOBUF, SchemaType.AVRO})
, but it's also fine as the tests confirm.
I think when I was testing the API via postman against the docker compose of your fork, the request might have still been hitting the docker compose I had running via the upstream.
karapace/schema_models.py
Outdated
current_schema_str, current_dependencies = stack.pop() | ||
if current_dependencies: | ||
stack.append((current_schema_str, None)) | ||
for dependency in reversed(current_dependencies.values()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you doing the reverse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using reverse allows us to preserve the original order of including dependencies in the merged schema. This approach was chosen because the performance of using append() and pop() in lists is better than that of insert(), plus the merging code looks slightly more elegant.
karapace/schema_models.py
Outdated
@@ -188,21 +229,41 @@ def parse( | |||
references: Sequence[Reference] | None = None, | |||
dependencies: Mapping[str, Dependency] | None = None, | |||
normalize: bool = False, | |||
dependencies_compat: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am guessing this is some flag you want to introduce so we can opt-in/out of the referencing logic, as it is it is hard coded, so maybe you can add it to the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems I forgot to remove it from the code. It was used in some iteration of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
karapace/schema_models.py
Outdated
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | ||
if schema_type is SchemaType.AVRO: | ||
try: | ||
if dependencies or dependencies_compat: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I could have dependencies
but not opt-in to the dependencies_compat
and you will still run this logic, so what is the essence of dependencies_compat
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems I forgot to remove it from the code. It was used in some iteration of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
karapace/schema_models.py
Outdated
validate_enum_symbols=validate_avro_enum_symbols, | ||
validate_names=validate_avro_names, | ||
) | ||
if dependencies or dependencies_compat: | ||
if isinstance(parsed_schema, avro.schema.UnionSchema): | ||
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please what is going on here? Can you give some hint as to what you want to achieve, taking into account also the checks added?
if dependencies or dependencies_compat:
if isinstance(parsed_schema, avro.schema.UnionSchema):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To achieve schema merging with references (dependencies), we wrap the top-level schema and its references in a special way, then merge all schemas into a Union, where the top-level schema is the last, and all reference schemas are appended in a specific order with the rule that all referenced schemas must appear higher in the Union list than their referers (I mentioned this order in my answer about 'reverse' above). Then we parse this merged schema, resulting in an avro.schema.UnionSchema object, where the top-level schema is last in the union.
The AVRO object is constructed in such a way that dependent schemas are stored inside the AVRO object, and the top-level schema references them in a way that allows schema comparison without issues. However, this union also includes all dependency items at the same level as the top-level schema. So, we remove these items and extract only the top-level schema object from the union. All references remain present and work perfectly. As a result, we have no union, but we have a correct AVRO schema with dependencies.
However, there is a case where the top-level schema may have no references but is a Union itself. In this case, the code above checks if we have a UnionSchema as a result, and if this schema has references, we extract the top-level schema from it. If it is a Union without references, then we do nothing.
karapace/schema_models.py
Outdated
@@ -264,9 +325,10 @@ def __init__( | |||
schema: Draft7Validator | AvroSchema | ProtobufSchema, | |||
references: Sequence[Reference] | None = None, | |||
dependencies: Mapping[str, Dependency] | None = None, | |||
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and it's respective instance variable are not used anywhere in the code, why are you adding this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
], | ||
} | ||
|
||
SCHEMA_PERSON_INT_LONG = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_INT_LONG
is a bit hard to know what it should be doing, I see you updated the age
type to long
, maybe _AGE_LONG
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
}, | ||
] | ||
|
||
SCHEMA_ADDRESS_INCOMPATIBLE = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot find the incompatibility in this schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test shows incompatibility in the naming of the field... We changed the name from "Address" to "ChangeAddress".
@libretto shall we rebase this work so we get upstream changes and then let's discuss around the comments if you can tell me when you'd like to, we can spare sometime and discuss in one thread here. |
tests/unit/test_avro_merge.py
Outdated
def test_init(self, setup_avro_merge): | ||
avro_merge = setup_avro_merge | ||
assert avro_merge.schema_str == json_encode(json_decode(avro_merge.schema_str), compact=True, sort_keys=True) | ||
assert avro_merge.dependencies == avro_merge.dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are asserting the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm yes, removed
@pytest.fixture | ||
def setup_avro_merge(self): | ||
schema_str = '{"type": "record", "name": "Test", "fields": [{"name": "field1", "type": "string"}]}' | ||
dependencies = {"dependency1": MagicMock(schema=MagicMock(schema_str='{"type": "string"}', dependencies=None))} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need to mock the dependency in this simple case? It'd be nice to also use a proper schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a basic unit test for the AvroMerge functionality.
tests/unit/test_avro_merge.py
Outdated
return AvroMerge(schema_str, dependencies) | ||
|
||
def test_init(self, setup_avro_merge): | ||
avro_merge = setup_avro_merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
def test_union_safe_schema_str_no_union(self, setup_avro_merge): | ||
avro_merge = setup_avro_merge | ||
result = avro_merge.union_safe_schema_str('{"type": "string"}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of schema is this {"type": "string"}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a basic AVRO schema that represents a string... In this case, we test avro_merge so it is acceptable for this test.
9c26f46
to
0167a75
Compare
@libretto The unit test for |
9cac0ba
to
f4346ea
Compare
f4346ea
to
5ab6ba0
Compare
rebased |
This change adds support for references for AVRO schemas
We aim to add support for references for AVRO schemas to Karapace using a straightforward approach by leveraging the same API used in Protobuf references. Although Avro does not natively support references, we can implement this functionality by utilizing the property of Avro schemas that allows combining types. By including multiple types in unions, we can enable referencing from one union element to another.