diff --git a/karapace/protobuf/dependency.py b/karapace/protobuf/dependency.py index af5121488..c0b04d2bf 100644 --- a/karapace/protobuf/dependency.py +++ b/karapace/protobuf/dependency.py @@ -8,7 +8,11 @@ from karapace.dependency import DependencyVerifierResult from karapace.protobuf.known_dependency import DependenciesHardcoded, KnownDependency from karapace.protobuf.one_of_element import OneOfElement -from typing import List +from typing import List, Optional, Set + + +class FieldNotUniquelyIdentifiableException(Exception): + pass class ProtobufDependencyVerifier: @@ -17,7 +21,12 @@ def __init__(self) -> None: self.used_types: List[str] = [] self.import_path: List[str] = [] - def add_declared_type(self, full_name: str) -> None: + def add_declared_type(self, full_name: str, uniquely: bool = False) -> None: + if uniquely and full_name in self.declared_types: + raise FieldNotUniquelyIdentifiableException( + f"{full_name} is not currently identifiable from the parser, " + f"validating this message lead to break the schema evolution!" + ) self.declared_types.append(full_name) def add_used_type(self, parent: str, element_type: str) -> None: @@ -35,13 +44,31 @@ def add_used_type(self, parent: str, element_type: str) -> None: def add_import(self, import_name: str) -> None: self.import_path.append(import_name) + def is_type_declared( + self, + used_type: str, + declared_index: Set[str], + father_child_type: Optional[str], + used_type_with_scope: Optional[str], + ) -> bool: + return ( + used_type in declared_index + or (used_type_with_scope is not None and used_type_with_scope in declared_index) + or (father_child_type is not None and father_child_type in declared_index) + or "." + used_type in declared_index + ) + def verify(self) -> DependencyVerifierResult: declared_index = set(self.declared_types) for used_type in self.used_types: delimiter = used_type.rfind(";") - used_type_with_scope = "" + father_child_type = None + used_type_with_scope = None if delimiter != -1: used_type_with_scope = used_type[:delimiter] + "." + used_type[delimiter + 1 :] + father_delimiter = used_type[:delimiter].find(".") + if father_delimiter != -1: + father_child_type = used_type[:father_delimiter] + "." + used_type[delimiter + 1 :] used_type = used_type[delimiter + 1 :] if used_type in DependenciesHardcoded.index: @@ -51,11 +78,7 @@ def verify(self) -> DependencyVerifierResult: if known_pkg is not None and known_pkg in self.import_path: continue - if ( - used_type in declared_index - or (delimiter != -1 and used_type_with_scope in declared_index) - or "." + used_type in declared_index - ): + if self.is_type_declared(used_type, declared_index, father_child_type, used_type_with_scope): continue return DependencyVerifierResult(False, f'type "{used_type}" is not defined') diff --git a/karapace/protobuf/schema.py b/karapace/protobuf/schema.py index a0d91b1cb..9eef89952 100644 --- a/karapace/protobuf/schema.py +++ b/karapace/protobuf/schema.py @@ -161,6 +161,10 @@ def _process_nested_type( ): verifier.add_declared_type(package_name + "." + parent_name + "." + element_type.name) verifier.add_declared_type(parent_name + "." + element_type.name) + anchestor_only = parent_name.find(".") + if anchestor_only != -1: + # adding the first father and the type name, this should be unique to identify which is which. + verifier.add_declared_type(parent_name[:anchestor_only] + "." + element_type.name, uniquely=True) if isinstance(element_type, MessageElement): for one_of in element_type.one_ofs: @@ -169,7 +173,17 @@ def _process_nested_type( one_of_parent_name = parent_name + "." + element_type.name process_one_of(verifier, package_name, one_of_parent_name, one_of) for field in element_type.fields: - verifier.add_used_type(parent_name, field.element_type) + # since we declare the subtype in the same level of the scope, it's legit + # use the same scoping when declare the dependent type. + if field.element_type in [defined_in_same_scope.name for defined_in_same_scope in element_type.nested_types]: + verifier.add_used_type(parent_name + "." + element_type.name, field.element_type) + else: + ancestor_only = parent_name.find(".") + if ancestor_only != -1: + verifier.add_used_type(parent_name[:ancestor_only], field.element_type) + else: + verifier.add_used_type(parent_name, field.element_type) + for nested_type in element_type.nested_types: self._process_nested_type(verifier, package_name, parent_name + "." + element_type.name, nested_type) diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 8b73a7ffd..0062c354d 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -5,6 +5,8 @@ from __future__ import annotations from kafka.errors import UnknownTopicOrPartitionError +from karapace.client import Client +from karapace.kafka_rest_apis import KafkaRestAdminClient from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( @@ -186,6 +188,56 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie # assert res.status_code == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}" +async def test_another_avro_publish( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +): + topic = new_topic(admin_client) + other_tn = new_topic(admin_client) + + await wait_for_topics(rest_async_client, topic_names=[topic, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + header = REST_HEADERS["avro"] + + tested_avro_schema = { + "type": "record", + "name": "example", + "namespace": "example", + "doc": "example", + "fields": [{"type": "int", "name": "test", "doc": "my test number", "namespace": "test", "default": "5"}], + } + + schema_str = json.dumps(tested_avro_schema) + + # check succeeds with 1 record and brand new schema] + res = await registry_async_client.post( + f"subjects/{topic}-key/versions", json={"schema": schema_str, "schemaType": "AVRO"} + ) + assert res.ok + + key_schema_id = res.json()["id"] + + res = await registry_async_client.post( + f"subjects/{topic}-value/versions", json={"schema": schema_str, "schemaType": "AVRO"} + ) + assert res.ok + + value_schema_id = res.json()["id"] + + key_body = {"test": 5} + + value_body = {"test": 5} + + body = { + "key_schema_id": key_schema_id, + "value_schema_id": value_schema_id, + "records": [{"key": key_body, "value": value_body}], + } + + url = f"/topics/{topic}" + res = await rest_async_client.post(url, json=body, headers=header) + assert res.ok + async def test_admin_client(admin_client, producer): topic_names = [new_topic(admin_client) for i in range(10, 13)] topic_info = admin_client.cluster_metadata() diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index dfa7278b5..c6b1c96da 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -2,12 +2,12 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ - from karapace.client import Client from karapace.kafka_rest_apis import KafkaRestAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from tests.integration.test_rest import NEW_TOPIC_TIMEOUT from tests.utils import ( + create_subject_name_factory, new_consumer, new_random_name, new_topic, @@ -17,6 +17,7 @@ schema_data_second, wait_for_topics, ) +from typing import Generator import pytest @@ -139,7 +140,7 @@ async def test_publish_protobuf_with_references( res = await rest_async_client.post( f"/topics/{topic_name}", json=example_message, - headers=REST_HEADERS["avro"], + headers=REST_HEADERS["protobuf"], ) assert res.status_code == 200 @@ -230,7 +231,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( res = await rest_async_client.post( f"/topics/{topic_name}", json=example_message, - headers=REST_HEADERS["avro"], + headers=REST_HEADERS["protobuf"], ) assert res.status_code == 200 @@ -241,7 +242,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" - res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"]) + res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"]) assert res.ok resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) @@ -262,3 +263,173 @@ async def test_publish_and_consume_protobuf_with_recursive_references( assert msg["offset"] == 0 and msg["partition"] == 0, "first message of the only partition available" assert msg["topic"] == topic_name assert msg["value"] == produced_message + + +@pytest.mark.parametrize("google_library_included", [True, False]) +async def test_produce_and_retrieve_protobuf( + registry_async_client: Client, + rest_async_client: Client, + admin_client: KafkaRestAdminClient, + google_library_included: bool, +) -> None: + topic_name = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + subject = create_subject_name_factory("test_produce_and_retrieve_protobuf")() + subject_topic = f"{topic_name}-value" + + base_schema_subject = f"{subject}_base_schema_subject" + google_postal_address_schema_subject = f"{subject}_google_address_schema_subject" + + CUSTOMER_PLACE_PROTO = """ + syntax = "proto3"; + package a1; + message Place { + string city = 1; + int32 zone = 2; + } + """ + + body = {"schemaType": "PROTOBUF", "schema": CUSTOMER_PLACE_PROTO} + res = await registry_async_client.post(f"subjects/{base_schema_subject}/versions", json=body) + assert res.status_code == 200 + + if not google_library_included: + GOOGLE_POSTAL_ADDRESS_PROTO = """ + syntax = "proto3"; + + package google.type; + + option cc_enable_arenas = true; + option go_package = "google.golang.org/genproto/googleapis/type/postaladdress;postaladdress"; + option java_multiple_files = true; + option java_outer_classname = "PostalAddressProto"; + option java_package = "com.google.type"; + option objc_class_prefix = "GTP"; + message PostalAddress { + int32 revision = 1; + string region_code = 2; + string language_code = 3; + string postal_code = 4; + string sorting_code = 5; + string administrative_area = 6; + string locality = 7; + string sublocality = 8; + repeated string address_lines = 9; + repeated string recipients = 10; + string organization = 11; + } + """ + + body = {"schemaType": "PROTOBUF", "schema": GOOGLE_POSTAL_ADDRESS_PROTO} + res = await registry_async_client.post(f"subjects/{google_postal_address_schema_subject}/versions", json=body) + assert res.status_code == 200 + + postal_address_import = ( + 'import "google/type/postal_address.proto";' if google_library_included else 'import "postal_address.proto";' + ) + + CUSTOMER_PROTO = f""" + syntax = "proto3"; + package a1; + import "Place.proto"; + + {postal_address_import} + + // @producer: another comment + message Customer {{ + string name = 1; + int32 code = 2; + Place place = 3; + google.type.PostalAddress address = 4; + }} + """ + + def references() -> Generator[str, None, None]: + yield {"name": "Place.proto", "subject": base_schema_subject, "version": 1} + + if not google_library_included: + yield {"name": "postal_address.proto", "subject": google_postal_address_schema_subject, "version": 1} + + body = { + "schemaType": "PROTOBUF", + "schema": CUSTOMER_PROTO, + "references": list(references()), + } + res = await registry_async_client.post(f"subjects/{subject_topic}/versions", json=body) + + assert res.status_code == 200 + topic_schema_id = res.json()["id"] + + message_to_produce = [ + { + "name": "John Doe", + "code": 123456, + "place": {"city": "New York", "zone": 5}, + "address": { + "revision": 1, + "region_code": "US", + "postal_code": "10001", + "address_lines": ["123 Main St", "Apt 4"], + }, + }, + { + "name": "Sophie Smith", + "code": 987654, + "place": {"city": "London", "zone": 3}, + "address": { + "revision": 2, + "region_code": "UK", + "postal_code": "SW1A 1AA", + "address_lines": ["10 Downing Street"], + }, + }, + { + "name": "Pierre Dupont", + "code": 246813, + "place": {"city": "Paris", "zone": 1}, + "address": {"revision": 1, "region_code": "FR", "postal_code": "75001", "address_lines": ["1 Rue de Rivoli"]}, + }, + ] + + res = await rest_async_client.post( + f"/topics/{topic_name}", + json={"value_schema_id": topic_schema_id, "records": [{"value": m} for m in message_to_produce]}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 200 + + group = new_random_name("protobuf_recursive_reference_message") + instance_id = await new_consumer(rest_async_client, group) + + subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription" + + consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" + + res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["protobuf"]) + assert res.ok + + resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) + data = resp.json() + + assert isinstance(data, list) + assert len(data) == 3 + + for i in range(0, 3): + msg = data[i] + expected_message = message_to_produce[i] + + assert "key" in msg + assert "offset" in msg + assert "topic" in msg + assert "value" in msg + assert "timestamp" in msg + + assert msg["key"] is None, "no key defined in production" + assert msg["topic"] == topic_name + + for key in expected_message.keys(): + if key == "address": + for address_key in expected_message["address"].keys(): + assert expected_message["address"][address_key] == msg["value"]["address"][address_key] + else: + assert msg["value"][key] == expected_message[key] diff --git a/tests/unit/protobuf/test_protobuf_schema.py b/tests/unit/protobuf/test_protobuf_schema.py index 774ea3279..1956411d4 100644 --- a/tests/unit/protobuf/test_protobuf_schema.py +++ b/tests/unit/protobuf/test_protobuf_schema.py @@ -2,7 +2,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from _pytest.python_api import raises from karapace.protobuf.compare_result import CompareResult +from karapace.protobuf.dependency import FieldNotUniquelyIdentifiableException from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.protobuf.location import Location from karapace.protobuf.schema import ProtobufSchema @@ -293,3 +295,118 @@ def test_protobuf_field_compatible_alter_to_oneof(): protobuf_schema1.compare(protobuf_schema2, result) assert result.is_compatible() + + +def test_protobuf_self_referencing_schema(): + proto1 = """\ + syntax = "proto3"; + + package fancy.company.in.party.v1; + message MyFirstMessage { + string my_fancy_string = 1; + } + message AnotherMessage { + message WowANestedMessage { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; + } + } + """ + + assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto1).schema, ProtobufSchema) + + proto2 = """\ + syntax = "proto3"; + + package fancy.company.in.party.v1; + message AnotherMessage { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message WowANestedMessage { + message DeeplyNestedMsg { + BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; + } + } + } + """ + + assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto2).schema, ProtobufSchema) + + proto3 = """\ + syntax = "proto3"; + + package fancy.company.in.party.v1; + message AnotherMessage { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message WowANestedMessage { + message DeeplyNestedMsg { + message AnotherLevelOfNesting { + BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; + } + } + } + } + """ + + assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto3).schema, ProtobufSchema) + + proto4 = """\ + syntax = "proto3"; + + package fancy.company.in.party.v1; + message AnotherMessage { + message WowANestedMessage { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message DeeplyNestedMsg { + message AnotherLevelOfNesting { + BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; + } + } + } + } + """ + + assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto4).schema, ProtobufSchema) + + +def test_illegal_redefine_objects_in_same_scope(): + proto1 = """\ + syntax = "proto3"; + + package fancy.company.in.party.v1; + message AnotherMessage { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message WowANestedMessage { + message DeeplyNestedMsg { + enum BamFancyEnum { + // Hei! This is a comment! + MY_AWESOME_FIELD = 0; + } + message AnotherLevelOfNesting { + BamFancyEnum im_tricky_im_referring_to_the_previous_enum = 1; + } + } + } + } + """ + with raises(FieldNotUniquelyIdentifiableException) as e: + assert isinstance(ValidatedTypedSchema.parse(SchemaType.PROTOBUF, proto1).schema, ProtobufSchema) + + assert ( + e.value.args[0] == "AnotherMessage.BamFancyEnum is not currently identifiable from the parser, " + "validating this message lead to break the schema evolution!" + )