diff --git a/.gitignore b/.gitignore index c756da5bb..612ad46b2 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ venv .run .python-version .hypothesis/ +.DS_Store diff --git a/karapace/avro_dataclasses/models.py b/karapace/avro_dataclasses/models.py index dc9d09af8..78a64c8f4 100644 --- a/karapace/avro_dataclasses/models.py +++ b/karapace/avro_dataclasses/models.py @@ -68,7 +68,7 @@ def optional_parser(parser: Parser | None) -> Parser | None: return None def parse(value: object) -> object: - return None if value is None else parser(value) # type: ignore[misc] + return None if value is None else parser(value) return parse diff --git a/karapace/client.py b/karapace/client.py index 3bd97518b..d699e6a73 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -90,6 +90,7 @@ async def get( json: JsonData = None, headers: Optional[Headers] = None, auth: Optional[BasicAuth] = None, + params: Optional[Mapping[str, str]] = None, ) -> Result: path = self.path_for(path) if not headers: @@ -101,6 +102,7 @@ async def get( headers=headers, auth=auth, ssl=self.ssl_mode, + params=params, ) as res: # required for forcing the response body conversion to json despite missing valid Accept headers json_result = await res.json(content_type=None) diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index 6340df19c..222e38046 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -11,7 +11,7 @@ from karapace.schema_references import Reference, Referents from karapace.typing import ResolvedVersion, SchemaId, Subject from threading import Lock, RLock -from typing import Iterable +from typing import Iterable, Sequence import logging @@ -111,7 +111,7 @@ def insert_schema_version( version: ResolvedVersion, deleted: bool, schema: TypedSchema, - references: list[Reference] | None, + references: Sequence[Reference] | None, ) -> None: with self.schema_lock_thread: self.global_schema_id = max(self.global_schema_id, schema_id) @@ -184,6 +184,17 @@ def find_schemas(self, *, include_deleted: bool, latest_only: bool) -> dict[Subj res_schemas[subject] = selected_schemas return res_schemas + def subjects_for_schema(self, schema_id: SchemaId) -> list[Subject]: + subjects = [] + with self.schema_lock_thread: + for subject, subject_data in self.subjects.items(): + for version in subject_data.schemas.values(): + if version.deleted is False and version.schema_id == schema_id: + subjects.append(subject) + break + + return subjects + def find_schema_versions_by_schema_id(self, *, schema_id: SchemaId, include_deleted: bool) -> list[SchemaVersion]: schema_versions: list[SchemaVersion] = [] with self.schema_lock_thread: diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 4065c59a7..1b6bf4b6f 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -17,12 +17,15 @@ from karapace.kafka_rest_apis.admin import KafkaRestAdminClient from karapace.kafka_rest_apis.consumer_manager import ConsumerManager from karapace.kafka_rest_apis.error_codes import RESTErrorCodes +from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache from karapace.karapace import KarapaceBase from karapace.rapu import HTTPRequest, HTTPResponse, JSON_CONTENT_TYPE -from karapace.schema_reader import SchemaType +from karapace.schema_models import TypedSchema, ValidatedTypedSchema +from karapace.schema_type import SchemaType from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError +from karapace.typing import SchemaId, Subject from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient -from typing import Dict, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Tuple, Union import aiohttp.web import asyncio @@ -416,7 +419,7 @@ def __init__(self, config: Config, kafka_timeout: int, serializer): self.admin_client = None self.admin_lock = asyncio.Lock() self.metadata_cache = None - self.schemas_cache = {} + self.topic_schema_cache = TopicSchemaCache() self.consumer_manager = ConsumerManager(config=config, deserializer=self.serializer) self.init_admin_client() self._last_used = time.monotonic() @@ -739,18 +742,83 @@ def is_valid_schema_request(data: dict, prefix: str) -> bool: return False return isinstance(schema, str) - async def get_schema_id(self, data: dict, topic: str, prefix: str, schema_type: SchemaType) -> int: - log.debug("Retrieving schema id for %r", data) - if f"{prefix}_schema_id" in data and data[f"{prefix}_schema_id"] is not None: - log.debug("Will use schema id %d for serializing %s on topic %s", data[f"{prefix}_schema_id"], prefix, topic) - return int(data[f"{prefix}_schema_id"]) - schema_str = data[f"{prefix}_schema"] - log.debug("Registering / Retrieving ID for schema %s", schema_str) - if schema_str not in self.schemas_cache: - subject_name = self.serializer.get_subject_name(topic, data[f"{prefix}_schema"], prefix, schema_type) - schema_id = await self.serializer.get_id_for_schema(data[f"{prefix}_schema"], subject_name, schema_type) - self.schemas_cache[schema_str] = schema_id - return self.schemas_cache[schema_str] + async def get_schema_id( + self, + data: dict, + topic: str, + prefix: str, + schema_type: SchemaType, + ) -> SchemaId: + """ + This method search and validate the SchemaId for a request, it acts as a guard (In case of something wrong + throws an error). + + :raises InvalidSchema: + """ + log.debug("[resolve schema id] Retrieving schema id for %r", data) + schema_id: Union[SchemaId, None] = ( + SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None + ) + schema_str = data.get(f"{prefix}_schema") + + if schema_id is None and schema_str is None: + raise InvalidSchema() + + if schema_id is None: + parsed_schema = ValidatedTypedSchema.parse(schema_type, schema_str) + subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type) + schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name) + else: + + def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: + subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type) + return subject not in subjects + + parsed_schema, valid_subjects = await self._query_schema_and_subjects( + schema_id, + need_new_call=subject_not_included, + ) + + if subject_not_included(parsed_schema, valid_subjects): + raise InvalidSchema() + + return schema_id + + async def _query_schema_and_subjects( + self, schema_id: SchemaId, *, need_new_call: Optional[Callable[[TypedSchema, List[Subject]], bool]] + ) -> Tuple[TypedSchema, List[Subject]]: + try: + return await self.serializer.get_schema_for_id(schema_id, need_new_call=need_new_call) + except SchemaRetrievalError as schema_error: + # if the schema doesn't exist we treated as if the error was due to an invalid schema + raise InvalidSchema() from schema_error + + async def _query_schema_id_from_cache_or_registry( + self, + parsed_schema: ValidatedTypedSchema, + schema_str: str, + subject_name: Subject, + ) -> SchemaId: + """ + Checks if the schema registered with a certain id match with the schema provided (you can provide + a valid id but place in the body a totally unrelated schema). + Also, here if we don't have a match we query the registry since the cache could be evicted in the meanwhile + or the schema could be registered without passing though the http proxy. + """ + schema_id = self.topic_schema_cache.get_schema_id(subject_name, parsed_schema) + if schema_id is None: + log.debug("[resolve schema id] Registering / Retrieving ID for %s and schema %s", subject_name, schema_str) + schema_id = await self.serializer.upsert_id_for_schema(parsed_schema, subject_name) + log.debug("[resolve schema id] Found schema id %s from registry for subject %s", schema_id, subject_name) + self.topic_schema_cache.set_schema(subject_name, schema_id, parsed_schema) + else: + log.debug( + "[resolve schema id] schema ID %s found from cache for %s and schema %s", + schema_id, + subject_name, + schema_str, + ) + return schema_id async def validate_schema_info(self, data: dict, prefix: str, content_type: str, topic: str, schema_type: str): try: @@ -788,10 +856,14 @@ async def validate_schema_info(self, data: dict, prefix: str, content_type: str, status=HTTPStatus.REQUEST_TIMEOUT, ) except InvalidSchema: + if f"{prefix}_schema" in data: + err = f'schema = {data[f"{prefix}_schema"]}' + else: + err = f'schema_id = {data[f"{prefix}_schema_id"]}' KafkaRest.r( body={ "error_code": RESTErrorCodes.INVALID_DATA.value, - "message": f'Invalid schema. format = {schema_type.value}, schema = {data[f"{prefix}_schema"]}', + "message": f"Invalid schema. format = {schema_type.value}, {err}", }, content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, @@ -882,7 +954,7 @@ async def serialize( raise FormatError(f"Unknown format: {ser_format}") async def schema_serialize(self, obj: dict, schema_id: Optional[int]) -> bytes: - schema = await self.serializer.get_schema_for_id(schema_id) + schema, _ = await self.serializer.get_schema_for_id(schema_id) bytes_ = await self.serializer.serialize(schema, obj) return bytes_ diff --git a/karapace/kafka_rest_apis/schema_cache.py b/karapace/kafka_rest_apis/schema_cache.py new file mode 100644 index 000000000..00953a69d --- /dev/null +++ b/karapace/kafka_rest_apis/schema_cache.py @@ -0,0 +1,107 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from abc import ABC, abstractmethod +from cachetools import TTLCache +from karapace.schema_models import TypedSchema +from karapace.typing import SchemaId, Subject +from typing import Dict, Final, MutableMapping, Optional + +import hashlib + + +class SchemaCacheProtocol(ABC): + @abstractmethod + def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]: + pass + + @abstractmethod + def has_schema_id(self, schema_id: SchemaId) -> bool: + pass + + @abstractmethod + def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None: + pass + + @abstractmethod + def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]: + pass + + @abstractmethod + def get_schema_str(self, schema_id: SchemaId) -> Optional[str]: + pass + + +class TopicSchemaCache: + def __init__(self) -> None: + self._topic_cache: Dict[Subject, SchemaCache] = {} + self._empty_schema_cache: Final = EmptySchemaCache() + + def get_schema_id(self, topic: Subject, schema: TypedSchema) -> Optional[SchemaId]: + return self._topic_cache.get(topic, self._empty_schema_cache).get_schema_id(schema) + + def has_schema_id(self, topic: Subject, schema_id: SchemaId) -> bool: + return self._topic_cache.get(topic, self._empty_schema_cache).has_schema_id(schema_id) + + def set_schema(self, topic: str, schema_id: SchemaId, schema: TypedSchema) -> None: + schema_cache_with_defaults = self._topic_cache.setdefault(Subject(topic), SchemaCache()) + schema_cache_with_defaults.set_schema(schema_id, schema) + + def get_schema(self, topic: Subject, schema_id: SchemaId) -> Optional[TypedSchema]: + schema_cache = self._topic_cache.get(topic, self._empty_schema_cache) + return schema_cache.get_schema(schema_id) + + def get_schema_str(self, topic: Subject, schema_id: SchemaId) -> Optional[str]: + schema_cache = self._topic_cache.get(topic, self._empty_schema_cache) + return schema_cache.get_schema_str(schema_id) + + +class SchemaCache(SchemaCacheProtocol): + def __init__(self) -> None: + self._schema_hash_str_to_id: Dict[str, SchemaId] = {} + self._id_to_schema_str: MutableMapping[SchemaId, TypedSchema] = TTLCache(maxsize=10000, ttl=600) + + def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]: + fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest() + + maybe_id = self._schema_hash_str_to_id.get(fingerprint) + + if maybe_id is not None and maybe_id not in self._id_to_schema_str: + del self._schema_hash_str_to_id[fingerprint] + return None + + return maybe_id + + def has_schema_id(self, schema_id: SchemaId) -> bool: + return schema_id in self._id_to_schema_str + + def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None: + fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest() + self._schema_hash_str_to_id[fingerprint] = schema_id + self._id_to_schema_str[schema_id] = schema + + def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]: + return self._id_to_schema_str.get(schema_id) + + def get_schema_str(self, schema_id: SchemaId) -> Optional[str]: + maybe_schema = self.get_schema(schema_id) + return None if maybe_schema is None else str(maybe_schema) + + +class EmptySchemaCache(SchemaCacheProtocol): + def get_schema_id(self, schema: TypedSchema) -> None: + return None + + def has_schema_id(self, schema_id: SchemaId) -> bool: + return False + + def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None: + raise NotImplementedError("Empty schema cache. Cannot set schemas.") + + def get_schema(self, schema_id: SchemaId) -> None: + return None + + def get_schema_str(self, schema_id: SchemaId) -> None: + return None diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index c08ed1092..867eeb633 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -339,7 +339,7 @@ async def write_new_schema_local( all_schema_versions = self.database.find_subject_schemas(subject=subject, include_deleted=True) if not all_schema_versions: - version = 1 + version = ResolvedVersion(1) schema_id = self.database.get_schema_id(new_schema) LOG.debug( "Registering new subject: %r, id: %r with version: %r with schema %r, schema_id: %r", @@ -407,7 +407,6 @@ async def write_new_schema_local( # We didn't find an existing schema and the schema is compatible so go and create one version = self.database.get_next_version(subject=subject) schema_id = self.database.get_schema_id(new_schema) - LOG.debug( "Registering subject: %r, id: %r new version: %r with schema %s, schema_id: %r", subject, diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index ab3de2a04..90948942a 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -461,7 +461,7 @@ async def schemas_get( self, content_type: str, *, request: HTTPRequest, user: User | None = None, schema_id: str ) -> None: try: - schema_id_int = int(schema_id) + parsed_schema_id = SchemaId(int(schema_id)) except ValueError: self.r( body={ @@ -473,7 +473,7 @@ async def schemas_get( ) fetch_max_id = request.query.get("fetchMaxId", "false").lower() == "true" - schema = self.schema_registry.schemas_get(schema_id_int, fetch_max_id=fetch_max_id) + schema = self.schema_registry.schemas_get(parsed_schema_id, fetch_max_id=fetch_max_id) def _has_subject_with_id() -> bool: schema_versions = self.schema_registry.database.find_schemas(include_deleted=True, latest_only=False) @@ -482,7 +482,7 @@ def _has_subject_with_id() -> bool: continue for schema_version in schema_versions: if ( - schema_version.schema_id == schema_id_int + schema_version.schema_id == parsed_schema_id and not schema_version.deleted and self._auth is not None and self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}") @@ -504,13 +504,17 @@ def _has_subject_with_id() -> bool: content_type=content_type, status=HTTPStatus.NOT_FOUND, ) - response_body = {"schema": schema.schema_str} + + subjects = self.schema_registry.database.subjects_for_schema(parsed_schema_id) + + response_body = {"schema": schema.schema_str, "subjects": subjects} if schema.schema_type is not SchemaType.AVRO: response_body["schemaType"] = schema.schema_type if schema.references: response_body["references"] = [r.to_dict() for r in schema.references] if fetch_max_id: response_body["maxId"] = schema.max_id + self.r(response_body, content_type) async def schemas_get_versions( diff --git a/karapace/serialization.py b/karapace/serialization.py index 8e2d974fd..35403bb1c 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -4,6 +4,7 @@ """ from aiohttp import BasicAuth from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter +from cachetools import TTLCache from google.protobuf.message import DecodeError from jsonschema import ValidationError from karapace.client import Client @@ -12,8 +13,9 @@ from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import Reference, reference_from_mapping +from karapace.typing import SchemaId, Subject from karapace.utils import json_decode, json_encode -from typing import Any, Dict, Optional, Tuple +from typing import Any, Callable, Dict, List, MutableMapping, Optional, Tuple from urllib.parse import quote import asyncio @@ -86,7 +88,7 @@ def __init__( async def post_new_schema( self, subject: str, schema: ValidatedTypedSchema, references: Optional[Reference] = None - ) -> int: + ) -> SchemaId: if schema.schema_type is SchemaType.PROTOBUF: if references: payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()} @@ -97,9 +99,9 @@ async def post_new_schema( result = await self.client.post(f"subjects/{quote(subject)}/versions", json=payload) if not result.ok: raise SchemaRetrievalError(result.json()) - return result.json()["id"] + return SchemaId(result.json()["id"]) - async def get_latest_schema(self, subject: str) -> Tuple[int, ParsedTypedSchema]: + async def get_latest_schema(self, subject: str) -> Tuple[SchemaId, ParsedTypedSchema]: result = await self.client.get(f"subjects/{quote(subject)}/versions/latest") if not result.ok: raise SchemaRetrievalError(result.json()) @@ -108,17 +110,20 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ParsedTypedSchema] raise SchemaRetrievalError(f"Invalid result format: {json_result}") try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) - return json_result["id"], ParsedTypedSchema.parse(schema_type, json_result["schema"]) + return SchemaId(json_result["id"]), ParsedTypedSchema.parse(schema_type, json_result["schema"]) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e - async def get_schema_for_id(self, schema_id: int) -> ParsedTypedSchema: + async def get_schema_for_id(self, schema_id: SchemaId) -> Tuple[TypedSchema, List[Subject]]: result = await self.client.get(f"schemas/ids/{schema_id}") if not result.ok: raise SchemaRetrievalError(result.json()["message"]) json_result = result.json() if "schema" not in json_result: raise SchemaRetrievalError(f"Invalid result format: {json_result}") + + subjects = json_result.get("subjects") + try: schema_type = SchemaType(json_result.get("schemaType", "AVRO")) @@ -133,8 +138,15 @@ async def get_schema_for_id(self, schema_id: int) -> ParsedTypedSchema: raise InvalidReferences from exc parsed_references.append(reference) if parsed_references: - return ParsedTypedSchema.parse(schema_type, json_result["schema"], references=parsed_references) - return ParsedTypedSchema.parse(schema_type, json_result["schema"]) + return ( + ParsedTypedSchema.parse( + schema_type, + json_result["schema"], + references=parsed_references, + ), + subjects, + ) + return ParsedTypedSchema.parse(schema_type, json_result["schema"]), subjects except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e @@ -165,29 +177,35 @@ def __init__( self.subject_name_strategy = NAME_STRATEGIES[name_strategy] self.registry_client: Optional[SchemaRegistryClient] = registry_client self.ids_to_schemas: Dict[int, TypedSchema] = {} - self.schemas_to_ids: Dict[str, int] = {} + self.ids_to_subjects: MutableMapping[int, List[Subject]] = TTLCache(maxsize=10000, ttl=600) + self.schemas_to_ids: Dict[str, SchemaId] = {} async def close(self) -> None: if self.registry_client: await self.registry_client.close() self.registry_client = None - def get_subject_name(self, topic_name: str, schema: str, subject_type: str, schema_type: SchemaType) -> str: - schema_typed = ParsedTypedSchema.parse(schema_type, schema) + def get_subject_name( + self, + topic_name: str, + schema: TypedSchema, + subject_type: str, + schema_type: SchemaType, + ) -> Subject: namespace = "dummy" if schema_type is SchemaType.AVRO: - if isinstance(schema_typed.schema, avro.schema.NamedSchema): - namespace = schema_typed.schema.namespace + if isinstance(schema.schema, avro.schema.NamedSchema): + namespace = schema.schema.namespace if schema_type is SchemaType.JSONSCHEMA: - namespace = schema_typed.to_dict().get("namespace", "dummy") + namespace = schema.to_dict().get("namespace", "dummy") # Protobuf does not use namespaces in terms of AVRO if schema_type is SchemaType.PROTOBUF: namespace = "" - return f"{self.subject_name_strategy(topic_name, namespace)}-{subject_type}" + + return Subject(f"{self.subject_name_strategy(topic_name, namespace)}-{subject_type}") async def get_schema_for_subject(self, subject: str) -> TypedSchema: assert self.registry_client, "must not call this method after the object is closed." - schema_id, schema = await self.registry_client.get_latest_schema(subject) async with self.state_lock: schema_ser = str(schema) @@ -195,15 +213,15 @@ async def get_schema_for_subject(self, subject: str) -> TypedSchema: self.ids_to_schemas[schema_id] = schema return schema - async def get_id_for_schema(self, schema: str, subject: str, schema_type: SchemaType) -> int: + async def upsert_id_for_schema(self, schema_typed: ValidatedTypedSchema, subject: str) -> SchemaId: assert self.registry_client, "must not call this method after the object is closed." - try: - schema_typed = ParsedTypedSchema.parse(schema_type, schema) - except InvalidSchema as e: - raise InvalidPayload(f"Schema string {schema} is invalid") from e + schema_ser = str(schema_typed) + if schema_ser in self.schemas_to_ids: return self.schemas_to_ids[schema_ser] + + # note: the post is idempotent, so it is like a get or insert (aka upsert) schema_id = await self.registry_client.post_new_schema(subject, schema_typed) async with self.state_lock: @@ -211,16 +229,25 @@ async def get_id_for_schema(self, schema: str, subject: str, schema_type: Schema self.ids_to_schemas[schema_id] = schema_typed return schema_id - async def get_schema_for_id(self, schema_id: int) -> TypedSchema: + async def get_schema_for_id( + self, + schema_id: SchemaId, + *, + need_new_call: Optional[Callable[[TypedSchema, List[Subject]], bool]] = None, + ) -> Tuple[TypedSchema, List[Subject]]: assert self.registry_client, "must not call this method after the object is closed." - if schema_id in self.ids_to_schemas: - return self.ids_to_schemas[schema_id] - schema_typed = await self.registry_client.get_schema_for_id(schema_id) + if schema_id in self.ids_to_subjects: + if need_new_call is None or not need_new_call(self.ids_to_schemas[schema_id], self.ids_to_subjects[schema_id]): + return self.ids_to_schemas[schema_id], self.ids_to_subjects[schema_id] + + schema_typed, subjects = await self.registry_client.get_schema_for_id(schema_id) schema_ser = str(schema_typed) async with self.state_lock: + # todo: get rid of the schema caching and use the same caching used in UserRestProxy self.schemas_to_ids[schema_ser] = schema_id self.ids_to_schemas[schema_id] = schema_typed - return schema_typed + self.ids_to_subjects[schema_id] = subjects + return schema_typed, subjects async def serialize(self, schema: TypedSchema, value: dict) -> bytes: schema_id = self.schemas_to_ids[str(schema)] @@ -242,7 +269,7 @@ async def deserialize(self, bytes_: bytes) -> dict: if start_byte != START_BYTE: raise InvalidMessageHeader(f"Start byte is {start_byte:x} and should be {START_BYTE:x}") try: - schema = await self.get_schema_for_id(schema_id) + schema, _ = await self.get_schema_for_id(schema_id) if schema is None: raise InvalidPayload("No schema with ID from payload") ret_val = read_value(self.config, schema, bio) diff --git a/karapace/typing.py b/karapace/typing.py index 93a64ccbb..fb73c9370 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -18,4 +18,7 @@ Subject = NewType("Subject", str) Version = Union[int, str] ResolvedVersion = NewType("ResolvedVersion", int) +# note: the SchemaID is a unique id among all the schemas (and each version should be assigned to a different id) +# basically the same SchemaID refer always to the same TypedSchema. SchemaId = NewType("SchemaId", int) +TopicName = NewType("TopicName", str) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 8a06468c5..9c92c4fd9 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -8,7 +8,7 @@ accept-types==0.4.1 # via -r requirements.txt aiohttp==3.8.4 # via -r requirements.txt -aiokafka==0.8.0 +aiokafka==0.8.1 # via -r requirements.txt aiosignal==1.3.1 # via @@ -35,6 +35,8 @@ blinker==1.6.2 # via flask brotli==1.0.9 # via geventhttpclient +cachetools==5.2.0 + # via -r requirements.txt certifi==2023.5.7 # via # geventhttpclient @@ -53,11 +55,17 @@ commonmark==0.9.1 # rich configargparse==1.5.3 # via locust +exceptiongroup==1.1.1 + # via + # -r requirements.txt + # anyio + # hypothesis + # pytest execnet==1.9.0 # via pytest-xdist fancycompleter==0.9.1 # via pdbpp -filelock==3.12.0 +filelock==3.12.2 # via -r requirements-dev.in flask==2.3.2 # via @@ -66,7 +74,7 @@ flask==2.3.2 # locust flask-basicauth==0.2.0 # via locust -flask-cors==3.0.10 +flask-cors==4.0.0 # via locust frozenlist==1.3.3 # via @@ -81,7 +89,7 @@ geventhttpclient==2.0.9 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.75.7 +hypothesis==6.79.3 # via -r requirements-dev.in idna==3.4 # via @@ -89,6 +97,12 @@ idna==3.4 # anyio # requests # yarl +importlib-metadata==6.7.0 + # via flask +importlib-resources==5.12.0 + # via + # -r requirements.txt + # jsonschema iniconfig==2.0.0 # via pytest isodate==0.6.1 @@ -105,7 +119,7 @@ kafka-python @ https://github.com/aiven/kafka-python/archive/1b95333c9628152066f # aiokafka locust==2.15.1 # via -r requirements-dev.in -markupsafe==2.1.2 +markupsafe==2.1.3 # via # jinja2 # werkzeug @@ -125,7 +139,11 @@ packaging==23.1 # pytest pdbpp==0.10.3 # via -r requirements-dev.in -pluggy==1.0.0 +pkgutil-resolve-name==1.3.10 + # via + # -r requirements.txt + # jsonschema +pluggy==1.2.0 # via pytest protobuf==3.20.3 # via -r requirements.txt @@ -145,7 +163,7 @@ pyrsistent==0.19.3 # via # -r requirements.txt # jsonschema -pytest==7.3.1 +pytest==7.4.0 # via # -r requirements-dev.in # pytest-timeout @@ -166,12 +184,11 @@ rich==12.5.1 # via -r requirements.txt roundrobin==0.0.4 # via locust -sentry-sdk==1.24.0 +sentry-sdk==1.26.0 # via -r requirements-dev.in six==1.16.0 # via # -r requirements.txt - # flask-cors # geventhttpclient # isodate # python-dateutil @@ -183,19 +200,22 @@ sortedcontainers==2.4.0 # via hypothesis tenacity==8.2.2 # via -r requirements.txt -typing-extensions==4.6.2 +tomli==2.0.1 + # via pytest +typing-extensions==4.6.3 # via # -r requirements.txt # locust -ujson==5.7.0 + # rich +ujson==5.8.0 # via -r requirements.txt -urllib3==1.26.16 +urllib3==2.0.3 # via # requests # sentry-sdk watchfiles==0.19.0 # via -r requirements.txt -werkzeug==2.3.4 +werkzeug==2.3.6 # via # flask # locust @@ -207,7 +227,12 @@ yarl==1.9.2 # via # -r requirements.txt # aiohttp -zope-event==4.6 +zipp==3.15.0 + # via + # -r requirements.txt + # importlib-metadata + # importlib-resources +zope-event==5.0 # via gevent zope-interface==6.0 # via gevent diff --git a/requirements/requirements-typing.in b/requirements/requirements-typing.in index 77b6ff0b2..c55f35548 100644 --- a/requirements/requirements-typing.in +++ b/requirements/requirements-typing.in @@ -4,3 +4,4 @@ mypy types-jsonschema sentry-sdk +types-cachetools diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index ea4657ecb..c7c4efcf0 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -8,23 +8,25 @@ certifi==2023.5.7 # via # -c requirements-dev.txt # sentry-sdk -mypy==1.3.0 +mypy==1.4.1 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy -sentry-sdk==1.24.0 +sentry-sdk==1.26.0 # via -r requirements-typing.in tomli==2.0.1 # via # -c requirements-dev.txt # mypy +types-cachetools==5.3.0.5 + # via -r requirements-typing.in types-jsonschema==4.17.0.8 # via -r requirements-typing.in -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -c requirements-dev.txt # mypy -urllib3==1.26.16 +urllib3==2.0.3 # via # -c requirements-dev.txt # sentry-sdk diff --git a/requirements/requirements.in b/requirements/requirements.in index b076daf9d..e475fb7a5 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -13,6 +13,7 @@ ujson<6 watchfiles<1 xxhash~=3.0 rich~=12.5.0 +cachetools==5.2.0 # Patched dependencies # diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 270a14cd4..0179e0966 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -8,7 +8,7 @@ accept-types==0.4.1 # via -r requirements.in aiohttp==3.8.4 # via -r requirements.in -aiokafka==0.8.0 +aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp @@ -24,10 +24,14 @@ attrs==23.1.0 # jsonschema avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c76ca2.tar.gz#subdirectory=lang/py # via -r requirements.in +cachetools==5.2.0 + # via -r requirements.in charset-normalizer==3.1.0 # via aiohttp commonmark==0.9.1 # via rich +exceptiongroup==1.1.1 + # via anyio frozenlist==1.3.3 # via # aiohttp @@ -36,6 +40,8 @@ idna==3.4 # via # anyio # yarl +importlib-resources==5.12.0 + # via jsonschema isodate==0.6.1 # via -r requirements.in jsonschema==4.17.3 @@ -52,6 +58,8 @@ networkx==2.8.8 # via -r requirements.in packaging==23.1 # via aiokafka +pkgutil-resolve-name==1.3.10 + # via jsonschema protobuf==3.20.3 # via -r requirements.in pygments==2.15.1 @@ -70,9 +78,11 @@ sniffio==1.3.0 # via anyio tenacity==8.2.2 # via -r requirements.in -typing-extensions==4.6.2 - # via -r requirements.in -ujson==5.7.0 +typing-extensions==4.6.3 + # via + # -r requirements.in + # rich +ujson==5.8.0 # via -r requirements.in watchfiles==0.19.0 # via -r requirements.in @@ -80,3 +90,5 @@ xxhash==3.2.0 # via -r requirements.in yarl==1.9.2 # via aiohttp +zipp==3.15.0 + # via importlib-resources diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py index bcae23ee2..9f6a26fb7 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_client.py @@ -15,7 +15,7 @@ async def test_remote_client(registry_async_client: Client) -> None: subject = new_random_name("subject") sc_id = await reg_cli.post_new_schema(subject, schema_avro) assert sc_id >= 0 - stored_schema = await reg_cli.get_schema_for_id(sc_id) + stored_schema, _ = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}" stored_id, stored_schema = await reg_cli.get_latest_schema(subject) assert stored_id == sc_id @@ -29,7 +29,7 @@ async def test_remote_client_tls(registry_async_client_tls: Client) -> None: subject = new_random_name("subject") sc_id = await reg_cli.post_new_schema(subject, schema_avro) assert sc_id >= 0 - stored_schema = await reg_cli.get_schema_for_id(sc_id) + stored_schema, _ = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}" stored_id, stored_schema = await reg_cli.get_latest_schema(subject) assert stored_id == sc_id diff --git a/tests/integration/test_client_protobuf.py b/tests/integration/test_client_protobuf.py index accca0421..7de39a7cd 100644 --- a/tests/integration/test_client_protobuf.py +++ b/tests/integration/test_client_protobuf.py @@ -16,7 +16,7 @@ async def test_remote_client_protobuf(registry_async_client): subject = new_random_name("subject") sc_id = await reg_cli.post_new_schema(subject, schema_protobuf, None) assert sc_id >= 0 - stored_schema = await reg_cli.get_schema_for_id(sc_id) + stored_schema, _ = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" stored_id, stored_schema = await reg_cli.get_latest_schema(subject) assert stored_id == sc_id @@ -31,7 +31,7 @@ async def test_remote_client_protobuf2(registry_async_client): subject = new_random_name("subject") sc_id = await reg_cli.post_new_schema(subject, schema_protobuf, None) assert sc_id >= 0 - stored_schema = await reg_cli.get_schema_for_id(sc_id) + stored_schema, _ = await reg_cli.get_schema_for_id(sc_id) assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}" stored_id, stored_schema = await reg_cli.get_latest_schema(subject) assert stored_id == sc_id diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 855b7c9c3..8b73a7ffd 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -12,9 +12,10 @@ new_topic, REST_HEADERS, schema_avro_json, + schema_avro_json_evolution, second_obj, - second_schema_json, test_objects_avro, + test_objects_avro_evolution, wait_for_topics, ) @@ -144,12 +145,22 @@ async def test_avro_publish_primitive_schema(rest_async_client, admin_client): async def test_avro_publish(rest_async_client, registry_async_client, admin_client): tn = new_topic(admin_client) other_tn = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[tn, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1) header = REST_HEADERS["avro"] # check succeeds with 1 record and brand new schema - res = await registry_async_client.post(f"subjects/{other_tn}/versions", json={"schema": second_schema_json}) + res = await registry_async_client.post(f"subjects/{other_tn}/versions", json={"schema": schema_avro_json_evolution}) assert res.ok new_schema_id = res.json()["id"] + + # test checks schema id use for key and value, register schema for both with topic naming strategy + for pl_type in ["key", "value"]: + res = await registry_async_client.post( + f"subjects/{tn}-{pl_type}/versions", json={"schema": schema_avro_json_evolution} + ) + assert res.ok + assert res.json()["id"] == new_schema_id + urls = [f"/topics/{tn}", f"/topics/{tn}/partitions/0"] for url in urls: partition_id = 0 if "partition" in url else None @@ -158,13 +169,16 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie res = await rest_async_client.post(url, correct_payload, headers=header) check_successful_publish_response(res, test_objects_avro, partition_id) # check succeeds with prepublished schema - pre_publish_payload = {f"{pl_type}_schema_id": new_schema_id, "records": [{pl_type: o} for o in second_obj]} + pre_publish_payload = { + f"{pl_type}_schema_id": new_schema_id, + "records": [{pl_type: o} for o in test_objects_avro_evolution], + } res = await rest_async_client.post(f"/topics/{tn}", json=pre_publish_payload, headers=header) - check_successful_publish_response(res, second_obj, partition_id) + check_successful_publish_response(res, test_objects_avro_evolution, partition_id) # unknown schema id unknown_payload = {f"{pl_type}_schema_id": 666, "records": [{pl_type: o} for o in second_obj]} res = await rest_async_client.post(url, json=unknown_payload, headers=header) - assert res.status_code == 408 + assert res.status_code == 422 # mismatched schema # TODO -> maybe this test is useless, since it tests registry behavior # mismatch_payload = {f"{pl_type}_schema_id": new_schema_id,"records": [{pl_type: o} for o in test_objects]} @@ -391,8 +405,10 @@ async def test_publish_to_nonexisting_topic(rest_async_client): assert res.json()["error_code"] == 40401, "Error code should be for topic not found" -async def test_publish_incompatible_schema(rest_async_client, admin_client): +async def test_publish_with_incompatible_data(rest_async_client, registry_async_client, admin_client): topic_name = new_topic(admin_client) + subject_1 = f"{topic_name}-value" + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) url = f"/topics/{topic_name}" @@ -406,9 +422,40 @@ async def test_publish_incompatible_schema(rest_async_client, admin_client): }, ], } - schema_2 = { + + res = await registry_async_client.post( + f"subjects/{subject_1}/versions", + json={"schema": json.dumps(schema_1)}, + ) + schema_1_id = res.json()["id"] + + res = await rest_async_client.post( + url, + json={"value_schema_id": json.dumps(schema_1_id), "records": [{"value": {"name": "Foobar"}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 + + res = await rest_async_client.post( + url, + json={"value_schema_id": json.dumps(schema_1_id), "records": [{"value": {"temperature": 25}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 422 + res_json = res.json() + assert res_json["error_code"] == 42205 + assert "message" in res_json + assert "Object does not fit to stored schema" in res_json["message"] + + +async def test_publish_with_incompatible_schema(rest_async_client, admin_client): + topic_name = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + url = f"/topics/{topic_name}" + + schema_1 = { "type": "record", - "name": "Schema2", + "name": "Schema1", "fields": [ { "name": "name", @@ -416,6 +463,16 @@ async def test_publish_incompatible_schema(rest_async_client, admin_client): }, ], } + schema_2 = { + "type": "record", + "name": "Schema2", + "fields": [ + { + "name": "temperature", + "type": "int", + }, + ], + } res = await rest_async_client.post( url, @@ -426,7 +483,7 @@ async def test_publish_incompatible_schema(rest_async_client, admin_client): res = await rest_async_client.post( url, - json={"value_schema": json.dumps(schema_2), "records": [{"value": {"name2": "Foobar"}}]}, + json={"value_schema": json.dumps(schema_2), "records": [{"value": {"temperature": 25}}]}, headers=REST_HEADERS["avro"], ) assert res.status_code == 408 @@ -436,9 +493,13 @@ async def test_publish_incompatible_schema(rest_async_client, admin_client): assert "Error when registering schema" in res_json["message"] -async def test_publish_with_incompatible_data(rest_async_client, registry_async_client, admin_client): +async def test_publish_with_schema_id_of_another_subject(rest_async_client, registry_async_client, admin_client): + """ + Karapace issue 658: https://github.com/aiven/karapace/issues/658 + """ topic_name = new_topic(admin_client) subject_1 = f"{topic_name}-value" + subject_2 = "some-other-subject-value" await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) url = f"/topics/{topic_name}" @@ -453,30 +514,49 @@ async def test_publish_with_incompatible_data(rest_async_client, registry_async_ }, ], } + schema_2 = { + "type": "record", + "name": "Schema2", + "fields": [ + { + "name": "temperature", + "type": "int", + }, + ], + } + # Register schemas to get the ids res = await registry_async_client.post( f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema_1)}, ) + assert res.status_code == 200 schema_1_id = res.json()["id"] - res = await rest_async_client.post( - url, - json={"value_schema_id": json.dumps(schema_1_id), "records": [{"value": {"name": "Foobar"}}]}, - headers=REST_HEADERS["avro"], + res = await registry_async_client.post( + f"subjects/{subject_2}/versions", + json={"schema": json.dumps(schema_2)}, ) assert res.status_code == 200 + schema_2_id = res.json()["id"] res = await rest_async_client.post( url, - json={"value_schema_id": json.dumps(schema_1_id), "records": [{"value": {"temperature": 25}}]}, + json={"value_schema_id": schema_2_id, "records": [{"value": {"temperature": 25}}]}, headers=REST_HEADERS["avro"], ) assert res.status_code == 422 res_json = res.json() assert res_json["error_code"] == 42205 assert "message" in res_json - assert "Object does not fit to stored schema" in res_json["message"] + assert "Invalid schema. format = AVRO, schema_id = 2" in res_json["message"] + + res = await rest_async_client.post( + url, + json={"value_schema_id": schema_1_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 async def test_brokers(rest_async_client): diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 3c987badb..f51e7d453 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -955,10 +955,16 @@ async def test_union_comparing_to_other_types(registry_async_client: Client) -> schema = [{"type": "array", "name": "listofstrings", "items": "string"}, "string"] res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 + + initial_schema_id = res.json()["id"] + plain_schema = {"type": "string"} res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(plain_schema)}) assert res.status_code == status_code + res = await registry_async_client.get(f"/schemas/ids/{initial_schema_id}") + assert subject in res.json()["subjects"] + expected_results = [("BACKWARD", 200), ("FORWARD", 409), ("FULL", 409)] for compatibility, status_code in expected_results: subject = subject_name_factory() @@ -2346,7 +2352,7 @@ async def test_malformed_kafka_message( sleep=1, ) res_data = res.json() - expected_payload = {"schema": json_encode({"foo": "bar"}, compact=True)} + expected_payload = {"schema": json_encode({"foo": "bar"}, compact=True), "subjects": ["foo"]} assert res_data == expected_payload, res_data diff --git a/tests/unit/test_protobuf_serialization.py b/tests/unit/test_protobuf_serialization.py index f126695df..c66cf50f7 100644 --- a/tests/unit/test_protobuf_serialization.py +++ b/tests/unit/test_protobuf_serialization.py @@ -14,6 +14,7 @@ SchemaRegistrySerializer, START_BYTE, ) +from karapace.typing import Subject from tests.utils import schema_protobuf, test_fail_objects_protobuf, test_objects_protobuf from unittest.mock import call, Mock @@ -37,7 +38,9 @@ async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySeriali async def test_happy_flow(default_config_path): mock_protobuf_registry_client = Mock() schema_for_id_one_future = asyncio.Future() - schema_for_id_one_future.set_result(ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf))) + schema_for_id_one_future.set_result( + (ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), [Subject("stub")]) + ) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)))) @@ -53,7 +56,7 @@ async def test_happy_flow(default_config_path): assert len(serializer.ids_to_schemas) == 1 assert 1 in serializer.ids_to_schemas - assert mock_protobuf_registry_client.method_calls == [call.get_latest_schema("top")] + assert mock_protobuf_registry_client.method_calls == [call.get_latest_schema("top"), call.get_schema_for_id(1)] async def test_happy_flow_references(default_config_path): @@ -105,7 +108,7 @@ async def test_happy_flow_references(default_config_path): mock_protobuf_registry_client = Mock() schema_for_id_one_future = asyncio.Future() - schema_for_id_one_future.set_result(ref_schema) + schema_for_id_one_future.set_result((ref_schema, [Subject("stub")])) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ref_schema)) @@ -121,7 +124,7 @@ async def test_happy_flow_references(default_config_path): assert len(serializer.ids_to_schemas) == 1 assert 1 in serializer.ids_to_schemas - assert mock_protobuf_registry_client.method_calls == [call.get_latest_schema("top")] + assert mock_protobuf_registry_client.method_calls == [call.get_latest_schema("top"), call.get_schema_for_id(1)] async def test_happy_flow_references_two(default_config_path): @@ -192,7 +195,7 @@ async def test_happy_flow_references_two(default_config_path): mock_protobuf_registry_client = Mock() schema_for_id_one_future = asyncio.Future() - schema_for_id_one_future.set_result(ref_schema_two) + schema_for_id_one_future.set_result((ref_schema_two, [Subject("mock")])) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ref_schema_two)) @@ -208,7 +211,7 @@ async def test_happy_flow_references_two(default_config_path): assert len(serializer.ids_to_schemas) == 1 assert 1 in serializer.ids_to_schemas - assert mock_protobuf_registry_client.method_calls == [call.get_latest_schema("top")] + assert mock_protobuf_registry_client.method_calls == [call.get_latest_schema("top"), call.get_schema_for_id(1)] async def test_serialization_fails(default_config_path): diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index 676b65bc8..c76d4f4c2 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -14,6 +14,7 @@ START_BYTE, write_value, ) +from karapace.typing import Subject from tests.utils import schema_avro_json, test_objects_avro from unittest.mock import call, Mock @@ -43,6 +44,9 @@ async def test_happy_flow(default_config_path): get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json))) mock_registry_client.get_latest_schema.return_value = get_latest_schema_future + schema_for_id_one_future = asyncio.Future() + schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) + mock_registry_client.get_schema_for_id.return_value = schema_for_id_one_future serializer = await make_ser_deser(default_config_path, mock_registry_client) assert len(serializer.ids_to_schemas) == 0 @@ -52,7 +56,7 @@ async def test_happy_flow(default_config_path): assert len(serializer.ids_to_schemas) == 1 assert 1 in serializer.ids_to_schemas - assert mock_registry_client.method_calls == [call.get_latest_schema("top")] + assert mock_registry_client.method_calls == [call.get_latest_schema("top"), call.get_schema_for_id(1)] def test_flatten_unions_record() -> None: @@ -259,7 +263,7 @@ async def test_serialization_fails(default_config_path): async def test_deserialization_fails(default_config_path): mock_registry_client = Mock() schema_for_id_one_future = asyncio.Future() - schema_for_id_one_future.set_result(ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json)) + schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) mock_registry_client.get_schema_for_id.return_value = schema_for_id_one_future deserializer = await make_ser_deser(default_config_path, mock_registry_client) @@ -277,7 +281,7 @@ async def test_deserialization_fails(default_config_path): mock_registry_client.reset_mock() # but we can pass in a perfectly fine doc belonging to a diff schema - schema = await mock_registry_client.get_schema_for_id(1) + schema, _ = await mock_registry_client.get_schema_for_id(1) schema = copy.deepcopy(schema.to_dict()) schema["name"] = "BadUser" schema["fields"][0]["type"] = "int" diff --git a/tests/utils.py b/tests/utils.py index 565990249..2e575b1f4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -50,6 +50,21 @@ } ) +schema_avro_json_evolution = json.dumps( + { + "namespace": "example.avro", + "type": "record", + "name": "example.avro.User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": "int"}, + {"name": "favorite_color", "type": "string"}, + {"name": "favorite_quark", "type": "string", "default": "def"}, + ], + } +) + + test_objects_jsonschema = [{"foo": 100}, {"foo": 200}] test_objects_avro = [ @@ -58,6 +73,13 @@ {"name": "Third Foo", "favorite_number": 5, "favorite_color": "quux"}, ] +test_objects_avro_evolution = [ + {"name": "First Foo", "favorite_number": 2, "favorite_color": "bar", "favorite_quark": "up"}, + {"name": "Second Foo", "favorite_number": 3, "favorite_color": "baz", "favorite_quark": "down"}, + {"name": "Third Foo", "favorite_number": 5, "favorite_color": "quux", "favorite_quark": "charm"}, +] + + # protobuf schemas in tests must be filtered by trim_margin() from kotlin_wrapper module schema_protobuf = """