Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Jun 28, 2023
1 parent 435cb52 commit 9e79723
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 27 deletions.
11 changes: 3 additions & 8 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from threading import Lock, RLock
from typing import Iterable
from typing import Iterable, Sequence

import logging

Expand Down Expand Up @@ -111,7 +111,7 @@ def insert_schema_version(
version: ResolvedVersion,
deleted: bool,
schema: TypedSchema,
references: list[Reference] | None,
references: Sequence[Reference] | None,
) -> None:
with self.schema_lock_thread:
self.global_schema_id = max(self.global_schema_id, schema_id)
Expand Down Expand Up @@ -188,16 +188,11 @@ def subjects_for_schema(self, schema_id: SchemaId) -> list[Subject]:
subjects = []
with self.schema_lock_thread:
for subject, subject_data in self.subjects.items():
found = False

for version in subject_data.schemas.values():
if version.deleted is False and version.schema_id == schema_id:
found = True
subjects.append(subject)
break

if found:
subjects.append(subject)
break
return subjects

def find_schema_versions_by_schema_id(self, *, schema_id: SchemaId, include_deleted: bool) -> list[SchemaVersion]:
Expand Down
7 changes: 6 additions & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,12 @@ async def get_schema_id(
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name)
else:
parsed_schema, valid_subjects = self.serializer.get_schema_for_id(schema_id, include_subjects=True)
try:
parsed_schema, valid_subjects = await self.serializer.get_schema_for_id(schema_id, include_subjects=True)
except SchemaRetrievalError as schema_error:
# if the schema doesn't exist we treated as if the error was due to an invalid schema
raise InvalidSchema() from schema_error

subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)

if subject_name not in valid_subjects:
Expand Down
3 changes: 1 addition & 2 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async def write_new_schema_local(

all_schema_versions = self.database.find_subject_schemas(subject=subject, include_deleted=True)
if not all_schema_versions:
version = 1
version = ResolvedVersion(1)
schema_id = self.database.get_schema_id(new_schema)
LOG.debug(
"Registering new subject: %r, id: %r with version: %r with schema %r, schema_id: %r",
Expand Down Expand Up @@ -407,7 +407,6 @@ async def write_new_schema_local(
# We didn't find an existing schema and the schema is compatible so go and create one
version = self.database.get_next_version(subject=subject)
schema_id = self.database.get_schema_id(new_schema)

LOG.debug(
"Registering subject: %r, id: %r new version: %r with schema %s, schema_id: %r",
subject,
Expand Down
14 changes: 4 additions & 10 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ async def post_new_schema(
) -> SchemaId:
if schema.schema_type is SchemaType.PROTOBUF:
if references:
payload = {"schema": str(schema), "schemaType": schema.schema_type.value,
"references": references.json()}
payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()}
else:
payload = {"schema": str(schema), "schemaType": schema.schema_type.value}
else:
Expand Down Expand Up @@ -145,8 +144,7 @@ async def get_schema_for_id(
raise InvalidReferences from exc
parsed_references.append(reference)
if parsed_references:
return ParsedTypedSchema.parse(schema_type, json_result["schema"],
references=parsed_references), subjects
return ParsedTypedSchema.parse(schema_type, json_result["schema"], references=parsed_references), subjects
return ParsedTypedSchema.parse(schema_type, json_result["schema"]), subjects
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e
Expand All @@ -166,8 +164,7 @@ def __init__(
self.state_lock = asyncio.Lock()
session_auth: Optional[BasicAuth] = None
if self.config.get("registry_user") and self.config.get("registry_password"):
session_auth = BasicAuth(self.config.get("registry_user"), self.config.get("registry_password"),
encoding="utf8")
session_auth = BasicAuth(self.config.get("registry_user"), self.config.get("registry_password"), encoding="utf8")
if self.config.get("registry_ca"):
registry_url = f"https://{self.config['registry_host']}:{self.config['registry_port']}"
registry_client = SchemaRegistryClient(
Expand Down Expand Up @@ -239,10 +236,7 @@ async def get_schema_for_id(
assert self.registry_client, "must not call this method after the object is closed."
if schema_id in self.ids_to_schemas and not include_subjects:
return self.ids_to_schemas[schema_id], None
schema_typed, subjects = await self.registry_client.get_schema_for_id(
schema_id,
include_subjects=include_subjects
)
schema_typed, subjects = await self.registry_client.get_schema_for_id(schema_id, include_subjects=include_subjects)
schema_ser = str(schema_typed)
async with self.state_lock:
# todo: get rid of the schema caching and use the same caching used in UserRestProxy
Expand Down
6 changes: 1 addition & 5 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,8 @@ async def test_union_comparing_to_other_types(registry_async_client: Client) ->
res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(plain_schema)})
assert res.status_code == status_code

# TODO: add other cases of compatibility test for the subjects!
res = await registry_async_client.get(f"/schemas/ids/{initial_schema_id}", params={"includeSubjects": "True"})
if compatibility == "BACKWARD":
assert subject in res.json()["subjects"]
else:
assert subject not in res.json()["subjects"]
assert subject in res.json()["subjects"]

expected_results = [("BACKWARD", 200), ("FORWARD", 409), ("FULL", 409)]
for compatibility, status_code in expected_results:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,4 @@ async def test_deserialization_fails(default_config_path):
with pytest.raises(InvalidPayload):
await deserializer.deserialize(enc_bytes)

assert mock_registry_client.method_calls == [call.get_schema_for_id(1, include_subjects=False)]*2
assert mock_registry_client.method_calls == [call.get_schema_for_id(1, include_subjects=False)] * 2

0 comments on commit 9e79723

Please sign in to comment.