Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of References for AVRO Schemas #926

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

libretto
Copy link
Contributor

@libretto libretto commented Jul 27, 2024

This change adds support for references for AVRO schemas

We aim to add support for references for AVRO schemas to Karapace using a straightforward approach by leveraging the same API used in Protobuf references. Although Avro does not natively support references, we can implement this functionality by utilizing the property of Avro schemas that allows combining types. By including multiple types in unions, we can enable referencing from one union element to another.

@libretto libretto requested review from a team as code owners July 27, 2024 13:50
Copy link
Contributor

@jjaakola-aiven jjaakola-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup commits to single/coherent commits. Add tests for better coverage.

@libretto
Copy link
Contributor Author

libretto commented Jul 29, 2024

Cleanup commits to single/coherent commits. Add tests for better coverage.

@jjaakola-aiven Could you suggest the tests that should be added?

@libretto libretto force-pushed the avro_references2 branch 2 times, most recently from f0f9450 to 0167a75 Compare July 29, 2024 09:46
@libretto
Copy link
Contributor Author

@jjaakola-aiven I'm still waiting for your suggestions regarding additional tests for the new functionality. Please keep in mind that we already have integration tests covering various aspects, including:

Basic Avro references
Avro references compatibility
Avro union references
Avro incompatible names in references

Additionally, the new code has successfully passed all existing tests related to Avro functionality. I also added a few unit tests for the AvroMerge class today.


def union_safe_schema_str(self, schema_str: str) -> str:
# in case we meet union - we use it as is
regex = re.compile(r"^\s*\[")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this will be called multiple times, maybe move this regex compilation out of the function and into the __init__(), else, we compile every time this is called.

Copy link
Contributor Author

@libretto libretto Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i see. Fixed.

@@ -1055,7 +1055,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is wrong, we will never have both schema types within the same request, thus this will not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is right because it excludes both schemas from being processed by code block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take another look, because I see you also use the API client in the integration tests and if the tests pass then this should be correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the two and bugs me out, maybe a check for inclusion, i.e. if references and (schema_type not in {SchemaType.PROTOBUF, SchemaType.AVRO}), but it's also fine as the tests confirm.
I think when I was testing the API via postman against the docker compose of your fork, the request might have still been hitting the docker compose I had running via the upstream.

current_schema_str, current_dependencies = stack.pop()
if current_dependencies:
stack.append((current_schema_str, None))
for dependency in reversed(current_dependencies.values()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you doing the reverse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using reverse allows us to preserve the original order of including dependencies in the merged schema. This approach was chosen because the performance of using append() and pop() in lists is better than that of insert(), plus the merging code looks slightly more elegant.

@@ -188,21 +229,41 @@ def parse(
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
normalize: bool = False,
dependencies_compat: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing this is some flag you want to introduce so we can opt-in/out of the referencing logic, as it is it is hard coded, so maybe you can add it to the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems I forgot to remove it from the code. It was used in some iteration of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
if schema_type is SchemaType.AVRO:
try:
if dependencies or dependencies_compat:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I could have dependencies but not opt-in to the dependencies_compat and you will still run this logic, so what is the essence of dependencies_compat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems I forgot to remove it from the code. It was used in some iteration of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
if dependencies or dependencies_compat:
if isinstance(parsed_schema, avro.schema.UnionSchema):
parsed_schema_result = parsed_schema.schemas[-1].fields[0].type.schemas[-1]
Copy link
Contributor

@nosahama nosahama Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please what is going on here? Can you give some hint as to what you want to achieve, taking into account also the checks added?

if dependencies or dependencies_compat:
    if isinstance(parsed_schema, avro.schema.UnionSchema):

Copy link
Contributor Author

@libretto libretto Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To achieve schema merging with references (dependencies), we wrap the top-level schema and its references in a special way, then merge all schemas into a Union, where the top-level schema is the last, and all reference schemas are appended in a specific order with the rule that all referenced schemas must appear higher in the Union list than their referers (I mentioned this order in my answer about 'reverse' above). Then we parse this merged schema, resulting in an avro.schema.UnionSchema object, where the top-level schema is last in the union.

The AVRO object is constructed in such a way that dependent schemas are stored inside the AVRO object, and the top-level schema references them in a way that allows schema comparison without issues. However, this union also includes all dependency items at the same level as the top-level schema. So, we remove these items and extract only the top-level schema object from the union. All references remain present and work perfectly. As a result, we have no union, but we have a correct AVRO schema with dependencies.

However, there is a case where the top-level schema may have no references but is a Union itself. In this case, the code above checks if we have a UnionSchema as a result, and if this schema has references, we extract the top-level schema from it. If it is a Union without references, then we do nothing.

@@ -264,9 +325,10 @@ def __init__(
schema: Draft7Validator | AvroSchema | ProtobufSchema,
references: Sequence[Reference] | None = None,
dependencies: Mapping[str, Dependency] | None = None,
schema_wrapped: Draft7Validator | AvroSchema | ProtobufSchema | None = None,
Copy link
Contributor

@nosahama nosahama Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and it's respective instance variable are not used anywhere in the code, why are you adding this?

Copy link
Contributor Author

@libretto libretto Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

],
}

SCHEMA_PERSON_INT_LONG = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_INT_LONG is a bit hard to know what it should be doing, I see you updated the age type to long, maybe _AGE_LONG

Copy link
Contributor Author

@libretto libretto Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change naming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

},
]

SCHEMA_ADDRESS_INCOMPATIBLE = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find the incompatibility in this schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test shows incompatibility in the naming of the field... We changed the name from "Address" to "ChangeAddress".

@nosahama
Copy link
Contributor

@libretto shall we rebase this work so we get upstream changes and then let's discuss around the comments if you can tell me when you'd like to, we can spare sometime and discuss in one thread here.

def test_init(self, setup_avro_merge):
avro_merge = setup_avro_merge
assert avro_merge.schema_str == json_encode(json_decode(avro_merge.schema_str), compact=True, sort_keys=True)
assert avro_merge.dependencies == avro_merge.dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are asserting the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yes, removed

@pytest.fixture
def setup_avro_merge(self):
schema_str = '{"type": "record", "name": "Test", "fields": [{"name": "field1", "type": "string"}]}'
dependencies = {"dependency1": MagicMock(schema=MagicMock(schema_str='{"type": "string"}', dependencies=None))}
Copy link
Contributor

@nosahama nosahama Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need to mock the dependency in this simple case? It'd be nice to also use a proper schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a basic unit test for the AvroMerge functionality.

return AvroMerge(schema_str, dependencies)

def test_init(self, setup_avro_merge):
avro_merge = setup_avro_merge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not necessary.

Copy link
Contributor Author

@libretto libretto Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


def test_union_safe_schema_str_no_union(self, setup_avro_merge):
avro_merge = setup_avro_merge
result = avro_merge.union_safe_schema_str('{"type": "string"}')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of schema is this {"type": "string"}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a basic AVRO schema that represents a string... In this case, we test avro_merge so it is acceptable for this test.

@nosahama
Copy link
Contributor

@libretto The unit test for AvroMerge is gone and it seems that you have reverted some of your changes with your recent push.

@libretto
Copy link
Contributor Author

@nosahama

@libretto The unit test for AvroMerge is gone and it seems that you have reverted some of your changes with your recent push.

Yes it was an issue with rebase I just restored banch, synced it with main and try rebase again...

@libretto libretto force-pushed the avro_references2 branch 2 times, most recently from 9cac0ba to f4346ea Compare September 26, 2024 16:57
@libretto
Copy link
Contributor Author

@libretto shall we rebase this work so we get upstream changes and then let's discuss around the comments if you can tell me when you'd like to, we can spare sometime and discuss in one thread here.

rebased

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants