Skip to content

Commit

Permalink
Merge pull request #681 from aiven/eliax1996/add-recursive-schema-ref…
Browse files Browse the repository at this point in the history
…erence-in-protobuf-schema-resolution

Resolve schema reference recursively during schema parsing
  • Loading branch information
giuseppelillo committed Jul 25, 2023
2 parents c95c4a0 + 76da932 commit fca563e
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 50 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ venv
.python-version
.hypothesis/
.DS_Store
# ignoring protobuf generated files.
runtime/
2 changes: 1 addition & 1 deletion karapace/kafka_rest_apis/schema_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_schema_str(self, topic: Subject, schema_id: SchemaId) -> Optional[str]:
class SchemaCache(SchemaCacheProtocol):
def __init__(self) -> None:
self._schema_hash_str_to_id: Dict[str, SchemaId] = {}
self._id_to_schema_str: MutableMapping[SchemaId, TypedSchema] = TTLCache(maxsize=10000, ttl=600)
self._id_to_schema_str: MutableMapping[SchemaId, TypedSchema] = TTLCache(maxsize=100, ttl=600)

def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]:
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest()
Expand Down
2 changes: 1 addition & 1 deletion karapace/protobuf/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def verify_schema_dependencies(self) -> DependencyVerifierResult:
def collect_dependencies(self, verifier: ProtobufDependencyVerifier) -> None:
if self.dependencies:
for key in self.dependencies:
self.dependencies[key].schema.schema.collect_dependencies(verifier)
self.dependencies[key].get_schema().schema.collect_dependencies(verifier)

for i in self.proto_file_element.imports:
verifier.add_import(i)
Expand Down
13 changes: 10 additions & 3 deletions karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from __future__ import annotations

from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData, ResolvedVersion, SchemaId, Subject
from typing import List, Mapping, NewType, TypeVar
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject
from typing import cast, List, Mapping, NewType, TypeVar

Referents = NewType("Referents", List[SchemaId])


T = TypeVar("T")


Expand Down Expand Up @@ -64,6 +63,14 @@ def to_dict(self) -> JsonData:
"version": self.version,
}

@staticmethod
def from_dict(data: JsonObject) -> Reference:
return Reference(
name=str(data["name"]),
subject=Subject(str(data["subject"])),
version=ResolvedVersion(cast(int, data["version"])),
)


def reference_from_mapping(
data: Mapping[str, object],
Expand Down
107 changes: 89 additions & 18 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
from aiohttp import BasicAuth
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from cachetools import TTLCache
from functools import lru_cache
from google.protobuf.message import DecodeError
from jsonschema import ValidationError
from karapace.client import Client
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import Reference, reference_from_mapping
from karapace.typing import SchemaId, Subject
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.utils import json_decode, json_encode
from typing import Any, Callable, Dict, List, MutableMapping, Optional, Tuple
from typing import Any, Callable, Dict, List, MutableMapping, Optional, Set, Tuple
from urllib.parse import quote

import asyncio
Expand Down Expand Up @@ -101,19 +103,79 @@ async def post_new_schema(
raise SchemaRetrievalError(result.json())
return SchemaId(result.json()["id"])

async def get_latest_schema(self, subject: str) -> Tuple[SchemaId, ParsedTypedSchema]:
result = await self.client.get(f"subjects/{quote(subject)}/versions/latest")
async def _get_schema_r(
self,
subject: Subject,
explored_schemas: Set[Tuple[Subject, Optional[ResolvedVersion]]],
version: Optional[ResolvedVersion] = None,
) -> Tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]:
if (subject, version) in explored_schemas:
raise InvalidSchema(
f"The schema has at least a cycle in dependencies, "
f"one path of the cycle is given by the following nodes: {explored_schemas}"
)

explored_schemas = explored_schemas | {(subject, version)}

version_str = str(version) if version is not None else "latest"
result = await self.client.get(f"subjects/{quote(subject)}/versions/{version_str}")

if not result.ok:
raise SchemaRetrievalError(result.json())

json_result = result.json()
if "id" not in json_result or "schema" not in json_result:
if "id" not in json_result or "schema" not in json_result or "version" not in json_result:
raise SchemaRetrievalError(f"Invalid result format: {json_result}")

if "references" in json_result:
references = [Reference.from_dict(data) for data in json_result["references"]]
dependencies = {}
for reference in references:
_, schema, version = await self._get_schema_r(reference.subject, explored_schemas, reference.version)
dependencies[reference.name] = Dependency(
name=reference.name, subject=reference.subject, version=version, target_schema=schema
)
else:
references = None
dependencies = None

try:
schema_type = SchemaType(json_result.get("schemaType", "AVRO"))
return SchemaId(json_result["id"]), ParsedTypedSchema.parse(schema_type, json_result["schema"])
return (
SchemaId(json_result["id"]),
ValidatedTypedSchema.parse(
schema_type,
json_result["schema"],
references=references,
dependencies=dependencies,
),
ResolvedVersion(json_result["version"]),
)
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

@lru_cache(maxsize=100)
async def get_schema(
self,
subject: Subject,
version: Optional[ResolvedVersion] = None,
) -> Tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]:
"""
Retrieves the schema and its dependencies for the specified subject.
Args:
subject (Subject): The subject for which to retrieve the schema.
version (Optional[ResolvedVersion]): The specific version of the schema to retrieve.
If None, the latest available schema will be returned.
Returns:
Tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]: A tuple containing:
- SchemaId: The ID of the retrieved schema.
- ValidatedTypedSchema: The retrieved schema, validated and typed.
- ResolvedVersion: The version of the schema that was retrieved.
"""
return await self._get_schema_r(subject, set(), version)

async def get_schema_for_id(self, schema_id: SchemaId) -> Tuple[TypedSchema, List[Subject]]:
result = await self.client.get(f"schemas/ids/{schema_id}", params={"includeSubjects": "True"})
if not result.ok:
Expand All @@ -138,15 +200,24 @@ async def get_schema_for_id(self, schema_id: SchemaId) -> Tuple[TypedSchema, Lis
raise InvalidReferences from exc
parsed_references.append(reference)
if parsed_references:
return (
ParsedTypedSchema.parse(
schema_type,
json_result["schema"],
references=parsed_references,
),
subjects,
)
return ParsedTypedSchema.parse(schema_type, json_result["schema"]), subjects
dependencies = {}

for reference in parsed_references:
if isinstance(reference, LatestVersionReference):
_, schema, version = await self.get_schema(reference.subject)
else:
_, schema, version = await self.get_schema(reference.subject, reference.version)

dependencies[reference.name] = Dependency(reference.name, reference.subject, version, schema)
else:
dependencies = None

return (
ParsedTypedSchema.parse(
schema_type, json_result["schema"], references=parsed_references, dependencies=dependencies
),
subjects,
)
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

Expand Down Expand Up @@ -204,9 +275,9 @@ def get_subject_name(

return Subject(f"{self.subject_name_strategy(topic_name, namespace)}-{subject_type}")

async def get_schema_for_subject(self, subject: str) -> TypedSchema:
async def get_schema_for_subject(self, subject: Subject) -> TypedSchema:
assert self.registry_client, "must not call this method after the object is closed."
schema_id, schema = await self.registry_client.get_latest_schema(subject)
schema_id, schema, _ = await self.registry_client.get_schema(subject)
async with self.state_lock:
schema_ser = str(schema)
self.schemas_to_ids[schema_ser] = schema_id
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def test_remote_client(registry_async_client: Client) -> None:
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema, _ = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_avro

Expand All @@ -31,6 +31,6 @@ async def test_remote_client_tls(registry_async_client_tls: Client) -> None:
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_avro
4 changes: 2 additions & 2 deletions tests/integration/test_client_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def test_remote_client_protobuf(registry_async_client):
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema, _ = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_protobuf

Expand All @@ -33,6 +33,6 @@ async def test_remote_client_protobuf2(registry_async_client):
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema, _ = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_protobuf_after
Loading

0 comments on commit fca563e

Please sign in to comment.