From ffc555061c6512cf0850204b531ba9ca050e5821 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Tue, 24 Oct 2023 10:09:51 +0200 Subject: [PATCH 1/3] feat: enable users to disable validation only on specific topics --- README.rst | 2 +- karapace/in_memory_database.py | 12 ++- karapace/kafka_rest_apis/__init__.py | 105 ++++++++++++++++---- karapace/schema_reader.py | 13 ++- karapace/schema_registry.py | 17 +++- karapace/schema_registry_apis.py | 84 +++++++++++++++- karapace/serialization.py | 10 +- karapace/typing.py | 3 +- tests/integration/test_rest.py | 39 ++++++++ tests/unit/test_schema_registry_api.py | 12 ++- tests/unit/test_serialization.py | 18 ++-- tests/unit/test_validation_check_wrapper.py | 55 ++++++++++ 12 files changed, 327 insertions(+), 43 deletions(-) create mode 100644 tests/unit/test_validation_check_wrapper.py diff --git a/README.rst b/README.rst index 288685281..d8c7bcd3f 100644 --- a/README.rst +++ b/README.rst @@ -464,7 +464,7 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ - Name strategy to use when storing schemas from the kafka rest proxy service. You can opt between ``name_strategy`` , ``record_name`` and ``topic_record_name`` * - ``name_strategy_validation`` - ``true`` - - If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest + - If enabled, validate that given schema is registered under the expected subjects requireds by the specified name strategy when producing messages from Kafka Rest. Otherwise no validation are performed * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index 222e38046..ea863faa0 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, field from karapace.schema_models import SchemaVersion, TypedSchema from karapace.schema_references import Reference, Referents -from karapace.typing import ResolvedVersion, SchemaId, Subject +from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName from threading import Lock, RLock from typing import Iterable, Sequence @@ -32,6 +32,7 @@ def __init__(self) -> None: self.schemas: dict[SchemaId, TypedSchema] = {} self.schema_lock_thread = RLock() self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {} + self.topic_without_validation: set[TopicName] = set() # Content based deduplication of schemas. This is used to reduce memory # usage when the same schema is produce multiple times to the same or @@ -229,6 +230,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di if schema_version.deleted is False } + def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool: + return topic_name not in self.topic_without_validation + + def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None: + if skip_validation: + self.topic_without_validation.add(topic_name) + else: + self.topic_without_validation.discard(topic_name) + def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None: with self.schema_lock_thread: for schema_version in self.subjects[subject].schemas.values(): diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 3f3d08dd7..f58c982ba 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from aiokafka import AIOKafkaProducer from aiokafka.errors import KafkaConnectionError from binascii import Error as B64DecodeError @@ -31,12 +33,13 @@ get_subject_name, InvalidMessageSchema, InvalidPayload, + SchemaRegistryClient, SchemaRegistrySerializer, SchemaRetrievalError, ) -from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType +from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType, TopicName from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Final, MutableMapping, NewType import asyncio import base64 @@ -44,7 +47,7 @@ import logging import time -SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value] +SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value_] PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"} RECORD_CODES = [42201, 42202] KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"} @@ -66,10 +69,10 @@ def __init__(self, config: Config) -> None: super().__init__(config=config) self._add_kafka_rest_routes() self.serializer = SchemaRegistrySerializer(config=config) - self.proxies: Dict[str, "UserRestProxy"] = {} + self.proxies: dict[str, UserRestProxy] = {} self._proxy_lock = asyncio.Lock() log.info("REST proxy starting with (delegated authorization=%s)", self.config.get("rest_authorization", False)) - self._idle_proxy_janitor_task: Optional[asyncio.Task] = None + self._idle_proxy_janitor_task: asyncio.Task | None = None async def close(self) -> None: if self._idle_proxy_janitor_task is not None: @@ -415,13 +418,56 @@ async def topic_publish(self, topic: str, content_type: str, *, request: HTTPReq await proxy.topic_publish(topic, content_type, request=request) +LastTimeCheck = NewType("LastTimeCheck", float) + +DEFAULT_CACHE_INTERVAL_NS: Final = 120 * 1_000_000_000 # 120 seconds + + +class ValidationCheckWrapper: + def __init__( + self, + registry_client: SchemaRegistryClient, + topic_name: TopicName, + cache_interval_ns: float = DEFAULT_CACHE_INTERVAL_NS, + ): + self._last_check = 0 + # by default if not specified otherwise, let's be conservative + self._require_validation = True + self._topic_name = topic_name + self._registry_client = registry_client + self._cache_interval_ns = cache_interval_ns + + async def _query_registry(self) -> bool: + require_validation = await self._registry_client.topic_require_validation(self._topic_name) + return require_validation + + async def require_validation(self) -> bool: + if (time.monotonic_ns() - self._last_check) > self._cache_interval_ns: + self._require_validation = await self._query_registry() + self._last_check = time.monotonic_ns() + + return self._require_validation + + @classmethod + async def construct_new( + cls, + registry_client: SchemaRegistryClient, + topic_name: TopicName, + cache_interval_ns: float = DEFAULT_CACHE_INTERVAL_NS, + ) -> ValidationCheckWrapper: + validation_checker = cls(registry_client, topic_name, cache_interval_ns) + validation_checker._require_validation = await validation_checker._query_registry() + validation_checker._last_check = time.monotonic_ns() + return validation_checker + + class UserRestProxy: def __init__( self, config: Config, kafka_timeout: int, serializer: SchemaRegistrySerializer, - auth_expiry: Optional[datetime.datetime] = None, + auth_expiry: datetime.datetime | None = None, ): self.config = config self.kafka_timeout = kafka_timeout @@ -440,8 +486,18 @@ def __init__( self._auth_expiry = auth_expiry self._async_producer_lock = asyncio.Lock() - self._async_producer: Optional[AIOKafkaProducer] = None + self._async_producer: AIOKafkaProducer | None = None self.naming_strategy = NameStrategy(self.config["name_strategy"]) + self.topic_validation: MutableMapping[TopicName,] = {} + + async def is_validation_required(self, topic_name: TopicName) -> bool: + if topic_name not in self.topic_validation: + self.topic_validation[topic_name] = await ValidationCheckWrapper.construct_new( + self.serializer.registry_client, + topic_name, + ) + + return await self.topic_validation[topic_name].require_validation() def __str__(self) -> str: return f"UserRestProxy(username={self.config['sasl_plain_username']})" @@ -601,7 +657,7 @@ async def get_topic_config(self, topic: str) -> dict: async with self.admin_lock: return self.admin_client.get_topic_config(topic) - async def cluster_metadata(self, topics: Optional[List[str]] = None) -> dict: + async def cluster_metadata(self, topics: list[str] | None = None) -> dict: async with self.admin_lock: if self._metadata_birth is None or time.monotonic() - self._metadata_birth > self.metadata_max_age: self._cluster_metadata = None @@ -671,7 +727,7 @@ async def aclose(self) -> None: self.admin_client = None self.consumer_manager = None - async def publish(self, topic: str, partition_id: Optional[str], content_type: str, request: HTTPRequest) -> None: + async def publish(self, topic: str, partition_id: str | None, content_type: str, request: HTTPRequest) -> None: formats: dict = request.content_type data: dict = request.json _ = await self.get_topic_info(topic, content_type) @@ -769,7 +825,7 @@ async def get_schema_id( :raises InvalidSchema: """ log.debug("[resolve schema id] Retrieving schema id for %r", data) - schema_id: Union[SchemaId, None] = ( + schema_id: SchemaId | None = ( SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None ) schema_str = data.get(f"{subject_type}_schema") @@ -788,8 +844,9 @@ async def get_schema_id( ) schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name) else: + is_validation_required = await self.is_validation_required(topic_name=TopicName(topic)) - def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: + def subject_not_included(schema: TypedSchema, subjects: list[Subject]) -> bool: subject = get_subject_name(topic, schema, subject_type, self.naming_strategy) return subject not in subjects @@ -798,14 +855,18 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: need_new_call=subject_not_included, ) - if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects): + if ( + self.config["name_strategy_validation"] + and is_validation_required + and subject_not_included(parsed_schema, valid_subjects) + ): raise InvalidSchema() return schema_id async def _query_schema_and_subjects( - self, schema_id: SchemaId, *, need_new_call: Optional[Callable[[TypedSchema, List[Subject]], bool]] - ) -> Tuple[TypedSchema, List[Subject]]: + self, schema_id: SchemaId, *, need_new_call: Callable[[TypedSchema, list[Subject]], bool] | None + ) -> tuple[TypedSchema, list[Subject]]: try: return await self.serializer.get_schema_for_id(schema_id, need_new_call=need_new_call) except SchemaRetrievalError as schema_error: @@ -896,10 +957,10 @@ async def _prepare_records( content_type: str, data: dict, ser_format: str, - key_schema_id: Optional[int], - value_schema_id: Optional[int], - default_partition: Optional[int] = None, - ) -> List[Tuple]: + key_schema_id: int | None, + value_schema_id: int | None, + default_partition: int | None = None, + ) -> list[tuple]: prepared_records = [] for record in data["records"]: key = record.get("key") @@ -950,8 +1011,8 @@ async def serialize( self, content_type: str, obj=None, - ser_format: Optional[str] = None, - schema_id: Optional[int] = None, + ser_format: str | None = None, + schema_id: int | None = None, ) -> bytes: if not obj: return b"" @@ -975,7 +1036,7 @@ async def serialize( return await self.schema_serialize(obj, schema_id) raise FormatError(f"Unknown format: {ser_format}") - async def schema_serialize(self, obj: dict, schema_id: Optional[int]) -> bytes: + async def schema_serialize(self, obj: dict, schema_id: int | None) -> bytes: schema, _ = await self.serializer.get_schema_for_id(schema_id) bytes_ = await self.serializer.serialize(schema, obj) return bytes_ @@ -1038,7 +1099,7 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte sub_code=RESTErrorCodes.INVALID_DATA.value, ) - async def produce_messages(self, *, topic: str, prepared_records: List) -> List: + async def produce_messages(self, *, topic: str, prepared_records: list) -> list: producer = await self._maybe_create_async_producer() produce_futures = [] diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9b25b3ad4..44afb2b7b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -8,7 +8,6 @@ from avro.schema import Schema as AvroSchema from contextlib import closing, ExitStack -from enum import Enum from jsonschema.validators import Draft7Validator from kafka import KafkaConsumer, TopicPartition from kafka.errors import ( @@ -32,7 +31,7 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonObject, ResolvedVersion, SchemaId, StrEnum, Subject, TopicName from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -59,10 +58,11 @@ METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions" -class MessageType(Enum): +class MessageType(StrEnum): config = "CONFIG" schema = "SCHEMA" delete_subject = "DELETE_SUBJECT" + schema_validation = "SCHEMA_VALIDATION" no_operation = "NOOP" @@ -429,6 +429,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # LOG.info("Deleting subject: %r, value: %r", subject, value) self.database.delete_subject(subject=subject, version=version) + def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument + assert isinstance(value, dict) + topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"]) + self.database.override_topic_validation(topic_name=topic, skip_validation=skip_validation) + def _handle_msg_schema_hard_delete(self, key: dict) -> None: subject, version = key["subject"], key["version"] @@ -532,6 +537,8 @@ def handle_msg(self, key: dict, value: dict | None) -> None: self._handle_msg_schema(key, value) elif message_type == MessageType.delete_subject: self._handle_msg_delete_subject(key, value) + elif message_type == MessageType.schema_validation: + self._handle_msg_schema_validation(key, value) elif message_type == MessageType.no_operation: pass except ValueError: diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 867eeb633..311baaa2a 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -27,9 +27,9 @@ from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema -from karapace.schema_reader import KafkaSchemaReader +from karapace.schema_reader import KafkaSchemaReader, MessageType from karapace.schema_references import LatestVersionReference, Reference -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version +from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName, Version from typing import Mapping, Sequence import asyncio @@ -466,6 +466,19 @@ def send_schema_message( value = None self.producer.send_message(key=key, value=value) + def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool: + return self.database.is_topic_requiring_validation(topic_name=topic_name) + + def update_require_validation_for_topic( + self, + *, + topic_name: TopicName, + skip_validation: bool, + ) -> None: + key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0} + value = {"skip_validation": skip_validation, "topic": topic_name} + self.producer.send_message(key=key, value=value) + def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None: key = {"subject": subject, "magic": 0, "keytype": "CONFIG"} value = {"compatibilityLevel": compatibility_level.value} diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 0ba18f5cd..373cf00a9 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -7,6 +7,7 @@ from avro.errors import SchemaParseException from contextlib import AsyncExitStack from enum import Enum, unique +from functools import partial from http import HTTPStatus from karapace.auth import HTTPAuthorizer, Operation, User from karapace.compatibility import check_compatibility, CompatibilityModes @@ -28,20 +29,30 @@ SubjectSoftDeletedException, VersionNotFoundException, ) -from karapace.karapace import KarapaceBase +from karapace.karapace import empty_response, KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry, validate_version -from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId +from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName from karapace.utils import JSONDecodeError from typing import Any +from urllib.parse import urlparse import aiohttp import async_timeout +def compute_forwarded_url(master_url: str, request_url: str) -> str: + parsed_master_url = urlparse(master_url) + parser_request_url = urlparse(request_url) + return parser_request_url._replace( + scheme=parsed_master_url.scheme, + netloc=parsed_master_url.netloc, + ).geturl() + + @unique class SchemaErrorCodes(Enum): HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value @@ -302,6 +313,33 @@ def _add_schema_registry_routes(self) -> None: json_body=False, auth=self._auth, ) + self.route( + "/topics//require_validation", + callback=self.is_topic_requiring_validation, + method="GET", + schema_request=True, + with_request=False, + json_body=False, + auth=None, + ) + self.route( + "/topics//disable_validation", + callback=partial(self.set_topic_require_validation, skip_validation=True), + method="POST", + schema_request=True, + with_request=True, + json_body=False, + auth=None, + ) + self.route( + "/topics//enable_validation", + callback=partial(self.set_topic_require_validation, skip_validation=False), + method="POST", + schema_request=True, + with_request=True, + json_body=False, + auth=None, + ) async def close(self) -> None: async with AsyncExitStack() as stack: @@ -1244,6 +1282,48 @@ async def subject_post( url = f"{master_url}/subjects/{subject}/versions" await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST") + async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None: + require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)) + reply = {"require_validation": require_validation} + self.r(reply, content_type) + + async def set_topic_require_validation( + self, + content_type: str, + request: HTTPRequest, + *, + topic: str, + skip_validation: bool, + ) -> None: + topic_name = TopicName(topic) + + already_skipping_validation = skip_validation and not self.schema_registry.database.is_topic_requiring_validation( + topic_name=topic_name + ) + already_validating = not skip_validation and self.schema_registry.database.is_topic_requiring_validation( + topic_name=topic_name + ) + + if already_validating or already_skipping_validation: + empty_response() + + are_we_master, master_url = await self.schema_registry.get_master() + + if are_we_master: + self.schema_registry.update_require_validation_for_topic( + topic_name=topic_name, + skip_validation=skip_validation, + ) + empty_response() + else: + await self._forward_request_remote( + request=request, + body=None, + url=compute_forwarded_url(master_url=master_url, request_url=request.url), + content_type=content_type, + method="POST", + ) + def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None: schema_id = self.schema_registry.database.get_schema_id_if_exists( subject=subject, schema=schema, include_deleted=include_deleted diff --git a/karapace/serialization.py b/karapace/serialization.py index c199bad7a..2ffcb80c9 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -11,6 +11,7 @@ from google.protobuf.message import DecodeError from jsonschema import ValidationError from karapace.client import Client +from karapace.config import NameStrategy from karapace.dependency import Dependency from karapace.errors import InvalidReferences from karapace.protobuf.exception import ProtobufTypeException @@ -18,7 +19,7 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType +from karapace.typing import ResolvedVersion, SchemaId, Subject, SubjectType, TopicName from karapace.utils import json_decode, json_encode from typing import Any, Callable, MutableMapping from urllib.parse import quote @@ -201,6 +202,13 @@ async def get_schema( """ return await self._get_schema_recursive(subject, set(), version) + async def topic_require_validation(self, topic_name: TopicName) -> bool: + result = await self.client.get(f"/topics/{topic_name}/require_validation") + if not result.ok or "require_validation" not in result.json(): + raise SchemaRetrievalError(result.json()["message"]) + + return bool(result.json()["require_validation"]) + async def get_schema_for_id(self, schema_id: SchemaId) -> tuple[TypedSchema, list[Subject]]: result = await self.client.get(f"schemas/ids/{schema_id}", params={"includeSubjects": "True"}) if not result.ok: diff --git a/karapace/typing.py b/karapace/typing.py index 48c6bd815..8ec0b0bec 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -46,6 +46,7 @@ class NameStrategy(StrEnum, Enum): @unique class SubjectType(StrEnum, Enum): key = "key" - value = "value" + # value it's a property of the Enum class, avoiding the collision. + value_ = "value" # partition it's a function of `str` and StrEnum its inherits from it. partition_ = "partition" diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index e7282309d..d19c259ec 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -8,6 +8,8 @@ from karapace.client import Client from karapace.kafka_admin import KafkaAdminClient from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX +from karapace.schema_models import ValidatedTypedSchema +from karapace.schema_type import SchemaType from karapace.version import __version__ from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( @@ -610,6 +612,43 @@ async def test_publish_with_schema_id_of_another_subject_novalidation( assert res.status_code == 200 +async def test_can_produce_anything_with_no_validation_policy( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +) -> None: + first_topic = new_topic(admin_client) + second_topic = new_topic(admin_client) + + await wait_for_topics(rest_async_client, topic_names=[first_topic, second_topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + + typed_schema = ValidatedTypedSchema.parse( + SchemaType.AVRO, + json.dumps( + { + "type": "record", + "name": "Schema1", + "fields": [ + { + "name": "name", + "type": "string", + }, + ], + } + ), + ) + + res = await registry_async_client.post( + "subjects/random_subject_name/versions", + json={"schema": str(typed_schema)}, + ) + assert res.status_code == 200 + + # with the no_validation strategy we can produce even if we use a totally random subject name + res = await registry_async_client.post(f"/topics/{first_topic}/disable_validation", json={}) + assert res.ok + + async def test_brokers(rest_async_client: Client) -> None: res = await rest_async_client.get("/brokers") assert res.ok diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 3b3cc1356..bc6019951 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -5,7 +5,7 @@ from aiohttp.test_utils import TestClient, TestServer from karapace.config import DEFAULTS, set_config_defaults from karapace.rapu import HTTPResponse -from karapace.schema_registry_apis import KarapaceSchemaRegistryController +from karapace.schema_registry_apis import compute_forwarded_url, KarapaceSchemaRegistryController from unittest.mock import ANY, Mock, patch, PropertyMock import asyncio @@ -52,3 +52,13 @@ async def test_forward_when_not_ready(): mock_forward_func.assert_called_once_with( request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET" ) + + +def test_compute_forwarded_url() -> None: + assert ( + compute_forwarded_url( + master_url="http://localhost:8081/another/fancy/path", + request_url="https://docs.python.org/3/library/urllib.parse.html", + ) + == "http://localhost:8081/3/library/urllib.parse.html" + ) diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index 2e09fec82..d2d62258d 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -365,9 +365,9 @@ async def test_deserialization_fails(default_config_path: Path): (Subject("foo-key"), NameStrategy.topic_name, SubjectType.key), (Subject("io.aiven.data.Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-io.aiven.data.Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("foo-value"), NameStrategy.topic_name, SubjectType.value), - (Subject("io.aiven.data.Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-io.aiven.data.Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("foo-value"), NameStrategy.topic_name, SubjectType.value_), + (Subject("io.aiven.data.Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-io.aiven.data.Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_avro(expected_subject: Subject, strategy: NameStrategy, subject_type: SubjectType): @@ -382,8 +382,8 @@ def test_name_strategy_for_avro(expected_subject: Subject, strategy: NameStrateg ( (Subject("Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_json_schema(expected_subject: Subject, strategy: NameStrategy, subject_type: SubjectType): @@ -398,8 +398,8 @@ def test_name_strategy_for_json_schema(expected_subject: Subject, strategy: Name ( (Subject("Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_avro_without_namespace( @@ -418,8 +418,8 @@ def test_name_strategy_for_avro_without_namespace( ( (Subject("Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_protobuf(expected_subject: Subject, strategy: NameStrategy, subject_type: SubjectType): diff --git a/tests/unit/test_validation_check_wrapper.py b/tests/unit/test_validation_check_wrapper.py new file mode 100644 index 000000000..1c0b6f85e --- /dev/null +++ b/tests/unit/test_validation_check_wrapper.py @@ -0,0 +1,55 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from karapace.kafka_rest_apis import ValidationCheckWrapper +from karapace.typing import TopicName +from unittest import mock +from unittest.mock import AsyncMock + + +async def test_validation_test_wrapper_elapse_works(): + with mock.patch("time.monotonic_ns") as mock_time: + mock_time.return_value = 0 + + mock_registry_client = AsyncMock() + mock_registry_client.topic_require_validation.return_value = True + + wrapper = await ValidationCheckWrapper.construct_new( + mock_registry_client, + TopicName("test_topic"), + cache_interval_ns=300, + ) + + result_1 = await wrapper.require_validation() + mock_time.return_value = 200 + result_2 = await wrapper.require_validation() + + assert result_1 is True + assert result_2 is True + + mock_registry_client.topic_require_validation.assert_called_once() + + +async def test_result_queried_twice_because_cache_evicted(): + with mock.patch("time.monotonic_ns") as mock_time: + mock_time.return_value = 0 + + mock_registry_client = AsyncMock() + mock_registry_client.topic_require_validation.return_value = True + + wrapper = await ValidationCheckWrapper.construct_new( + mock_registry_client, + TopicName("test_topic"), + cache_interval_ns=300, + ) + + result_1 = await wrapper.require_validation() + mock_time.return_value = 301 + mock_registry_client.topic_require_validation.return_value = False + result_2 = await wrapper.require_validation() + + assert result_1 is True + assert result_2 is False + + assert mock_registry_client.topic_require_validation.call_count == 2 From 20b7256d7000ceb7e304da588d343b63aacccb11 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Tue, 21 Nov 2023 17:40:19 +0100 Subject: [PATCH 2/3] wip: adding integration tests for the name strategy and the various types of formats --- tests/integration/conftest.py | 240 ++++++++++++- tests/integration/test_rest.py | 327 +++++++++++++++++- tests/integration/test_rest_consumer.py | 3 +- .../test_rest_consumer_protobuf.py | 14 +- tests/integration/utils/cluster.py | 14 +- 5 files changed, 553 insertions(+), 45 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 688f24fe1..450903ecd 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,12 +7,13 @@ from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient -from contextlib import closing, ExitStack +from contextlib import asynccontextmanager, closing, ExitStack from dataclasses import asdict from filelock import FileLock from kafka import KafkaProducer from karapace.client import Client -from karapace.config import Config, set_config_defaults, write_config +from karapace.config import Config, ConfigDefaults, set_config_defaults, write_config +from karapace.dataclasses import default_dataclass from karapace.kafka_admin import KafkaAdminClient, NewTopic from karapace.kafka_rest_apis import KafkaRest from pathlib import Path @@ -30,7 +31,7 @@ from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request -from typing import AsyncIterator, Iterator, List, Optional +from typing import AsyncContextManager, AsyncIterator, Callable, Iterator, List, Optional from urllib.parse import urlparse import asyncio @@ -215,13 +216,14 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaAdminClient]: yield KafkaAdminClient(bootstrap_servers=kafka_servers.bootstrap_servers) -@pytest.fixture(scope="function", name="rest_async") -async def fixture_rest_async( +@asynccontextmanager +async def _kafka_rest_async( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, + custom_values: Optional[ConfigDefaults] = None, ) -> AsyncIterator[Optional[KafkaRest]]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -234,14 +236,17 @@ async def fixture_rest_async( config_path = tmp_path / "karapace_config.json" - config = set_config_defaults( - { - "admin_metadata_max_age": 2, - "bootstrap_uri": kafka_servers.bootstrap_servers, - # Use non-default max request size for REST producer. - "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, - } - ) + override_values = { + "admin_metadata_max_age": 2, + "bootstrap_uri": kafka_servers.bootstrap_servers, + # Use non-default max request size for REST producer. + "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, + } + + if custom_values is not None: + override_values.update(custom_values) + + config = set_config_defaults(override_values) write_config(config_path, config) rest = KafkaRest(config=config) @@ -253,8 +258,49 @@ async def fixture_rest_async( await rest.close() -@pytest.fixture(scope="function", name="rest_async_client") -async def fixture_rest_async_client( +@pytest.fixture(scope="function", name="rest_async") +async def fixture_rest_async( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client: Client, +) -> AsyncIterator[Optional[KafkaRest]]: + async with _kafka_rest_async( + request, + loop, + tmp_path, + kafka_servers, + registry_async_client, + ) as kafka_rest_async: + yield kafka_rest_async + + +@pytest.fixture(scope="function", name="rest_async_from_config") +def fixture_rest_async_from_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client_from_config: Callable[[ConfigDefaults], AsyncIterator[RegistryDescription]], +) -> Callable[[ConfigDefaults], AsyncContextManager[Optional[KafkaRest]]]: + @asynccontextmanager + async def async_kafka_from_custom_config(config: ConfigDefaults) -> KafkaRest: + async with registry_async_client_from_config(config) as registry_async_client: + async with _kafka_rest_async( + request, + loop, + tmp_path, + kafka_servers, + registry_async_client, + ) as kafka_rest_async: + yield kafka_rest_async + + return async_kafka_from_custom_config + + +@asynccontextmanager +async def _rest_async_client( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument rest_async: KafkaRest, @@ -288,6 +334,43 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument await client.close() +@pytest.fixture(scope="function", name="rest_async_client") +async def fixture_rest_async_client( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + rest_async: KafkaRest, + aiohttp_client: AiohttpClient, +) -> AsyncIterator[Client]: + async with _rest_async_client( + request, + loop, + rest_async, + aiohttp_client, + ) as rest_async_client: + yield rest_async_client + + +@pytest.fixture(scope="function", name="rest_async_client_from_config") +async def fixture_rest_async_client_from_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + rest_async_from_config: Callable[[ConfigDefaults], AsyncContextManager[Optional[KafkaRest]]], + aiohttp_client: AiohttpClient, +) -> Callable[[ConfigDefaults], AsyncContextManager[Client]]: + @asynccontextmanager + async def async_client_from_custom_config(config: ConfigDefaults) -> Client: + async with rest_async_from_config(config) as rest_async: + async with _rest_async_client( + request, + loop, + rest_async, + aiohttp_client, + ) as rest_async_client: + yield rest_async_client + + return async_client_from_custom_config + + @pytest.fixture(scope="function", name="rest_async_novalidation") async def fixture_rest_async_novalidation( request: SubRequest, @@ -453,13 +536,14 @@ async def fixture_registry_async_pair( yield [server.endpoint.to_url() for server in endpoints] -@pytest.fixture(scope="function", name="registry_cluster") -async def fixture_registry_cluster( +@asynccontextmanager +async def _registry_cluster( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, + custom_values: Optional[ConfigDefaults] = None, ) -> AsyncIterator[RegistryDescription]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -477,12 +561,56 @@ async def fixture_registry_cluster( config_templates=[config], data_dir=session_logdir / _clear_test_name(request.node.name), port_range=port_range, + custom_values=custom_values, ) as servers: yield servers[0] -@pytest.fixture(scope="function", name="registry_async_client") -async def fixture_registry_async_client( +@pytest.fixture(scope="function", name="registry_cluster") +async def fixture_registry_cluster( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + session_logdir: Path, + kafka_servers: KafkaServers, + port_range: PortRangeInclusive, + custom_values: Optional[ConfigDefaults] = None, +) -> AsyncIterator[RegistryDescription]: + async with _registry_cluster( + request, + loop, + session_logdir, + kafka_servers, + port_range, + custom_values, + ) as registry_description: + yield registry_description + + +@pytest.fixture(scope="function", name="registry_cluster_from_custom_config") +def fixture_registry_cluster_with_custom_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + session_logdir: Path, + kafka_servers: KafkaServers, + port_range: PortRangeInclusive, +) -> Callable[[ConfigDefaults], AsyncContextManager[RegistryDescription]]: + @asynccontextmanager + async def registry_from_custom_config(config: ConfigDefaults) -> RegistryDescription: + async with _registry_cluster( + request, + loop, + session_logdir, + kafka_servers, + port_range, + config, + ) as registry_description: + yield registry_description + + return registry_from_custom_config + + +@asynccontextmanager +async def _registry_async_client( request: SubRequest, registry_cluster: RegistryDescription, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument @@ -508,6 +636,80 @@ async def fixture_registry_async_client( await client.close() +@pytest.fixture(scope="function", name="registry_async_client") +async def fixture_registry_async_client( + request: SubRequest, + registry_cluster: RegistryDescription, + loop: asyncio.AbstractEventLoop, +) -> Client: + async with _registry_async_client( + request, + registry_cluster, + loop, + ) as client: + yield client + + +@pytest.fixture(scope="function", name="registry_async_client_from_config") +def fixture_registry_async_client_custom_config( + request: SubRequest, + registry_cluster_from_custom_config: Callable[[ConfigDefaults], AsyncIterator[RegistryDescription]], + loop: asyncio.AbstractEventLoop, +) -> Callable[[ConfigDefaults], AsyncContextManager[Client]]: + @asynccontextmanager + async def client_from_custom_config(config: ConfigDefaults) -> Client: + async with registry_cluster_from_custom_config(config) as registry_description: + async with _registry_async_client(request, registry_description, loop) as client: + yield client + + return client_from_custom_config + + +@default_dataclass +class RestClientAndRegistryClient: + registry_client: Client + rest_client: Client + + +@pytest.fixture(scope="function", name="rest_async_client_and_rest_async_client_from_config") +def fixture_rest_async_client_and_rest_async_client_from_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + aiohttp_client: AiohttpClient, + session_logdir: Path, + kafka_servers: KafkaServers, + tmp_path: Path, + port_range: PortRangeInclusive, +) -> Callable[[ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient]]: + @asynccontextmanager + async def client_from_custom_config(config: ConfigDefaults) -> RestClientAndRegistryClient: + # ugly but without python 3.9 we cannot join those :( + async with _registry_cluster( + request, + loop, + session_logdir, + kafka_servers, + port_range, + config, + ) as registry_description: + async with _registry_async_client(request, registry_description, loop) as registry_async_client: + async with _kafka_rest_async( + request, loop, tmp_path, kafka_servers, registry_async_client, config + ) as kafka_rest_async: + async with _rest_async_client( + request, + loop, + kafka_rest_async, + aiohttp_client, + ) as rest_async_client: + yield RestClientAndRegistryClient( + registry_client=registry_async_client, + rest_client=rest_async_client, + ) + + return client_from_custom_config + + @pytest.fixture(scope="function", name="credentials_folder") def fixture_credentials_folder() -> str: integration_test_folder = os.path.dirname(__file__) diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index d19c259ec..26cb3e1c4 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -6,12 +6,16 @@ from kafka import KafkaProducer from karapace.client import Client +from karapace.config import ConfigDefaults from karapace.kafka_admin import KafkaAdminClient from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX from karapace.schema_models import ValidatedTypedSchema from karapace.schema_type import SchemaType +from karapace.serialization import get_subject_name +from karapace.typing import NameStrategy, SubjectType from karapace.version import __version__ -from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES +from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES, RestClientAndRegistryClient +from tests.unit.test_serialization import TYPED_AVRO_SCHEMA, TYPED_PROTOBUF_SCHEMA from tests.utils import ( new_random_name, new_topic, @@ -23,10 +27,12 @@ test_objects_avro_evolution, wait_for_topics, ) +from typing import AsyncContextManager, Callable import asyncio import base64 import json +import pytest import time NEW_TOPIC_TIMEOUT = 10 @@ -278,22 +284,289 @@ async def test_list_topics(rest_async_client, admin_client) -> None: assert tn1 in topic_list and tn2 in topic_list, f"Topic list contains all topics tn1={tn1} and tn2={tn2}" -async def test_publish(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: +async def test_publish_topic_json(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: + topic = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + partition_url = f"/topics/{topic}/partitions/0" + res = await rest_async_client.post( + partition_url, + json={"records": [{"value": {"foo": "bar"}}]}, + headers=REST_HEADERS["json"], + ) + res_json = res.json() + assert res.ok + assert "offsets" in res_json + for o in res_json["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +async def test_publish_topic_binary(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) - topic_url = f"/topics/{topic}" partition_url = f"/topics/{topic}/partitions/0" - # Proper Json / Binary - for url in [topic_url, partition_url]: - for payload, h in [({"value": {"foo": "bar"}}, "json"), ({"value": "Zm9vCg=="}, "binary")]: - res = await rest_async_client.post(url, json={"records": [payload]}, headers=REST_HEADERS[h]) - res_json = res.json() - assert res.ok - assert "offsets" in res_json - if "partition" in url: - for o in res_json["offsets"]: - assert "partition" in o - assert o["partition"] == 0 + res = await rest_async_client.post( + partition_url, json={"records": [{"value": "Zm9vCg=="}]}, headers=REST_HEADERS["binary"] + ) + res_json = res.json() + assert res.ok + assert "offsets" in res_json + for o in res_json["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +async def test_publish_partition_json(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: + topic = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}" + + res = await rest_async_client.post( + topic_url, + json={"records": [{"value": {"foo": "bar"}}]}, + headers=REST_HEADERS["json"], + ) + res_json = res.json() + assert res.ok + assert "offsets" in res_json + + +async def test_publish_partition_binary(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: + topic = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}" + res = await rest_async_client.post( + topic_url, + json={"records": [{"value": "Zm9vCg=="}]}, + headers=REST_HEADERS["binary"], + ) + res_json = res.json() + assert res.ok + for o in res_json["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_partition_protobuf( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": "Value for attr1", "attr2": "Value for attr2"}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_PROTOBUF_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.ok + for o in res.json()["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_topic_protobuf( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": "Value for attr1", "attr2": "Value for attr2"}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_PROTOBUF_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.ok + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_topic_avro( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": {"string": "sample data"}, "attr2": None}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_AVRO_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.ok + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_partition_avro( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": {"string": "sample data"}, "attr2": None}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_AVRO_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.ok + for o in res.json()["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 # Produce messages to a topic without key and without explicit partition to verify that @@ -615,12 +888,12 @@ async def test_publish_with_schema_id_of_another_subject_novalidation( async def test_can_produce_anything_with_no_validation_policy( rest_async_client: Client, registry_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, ) -> None: - first_topic = new_topic(admin_client) - second_topic = new_topic(admin_client) + topic = new_topic(admin_client) + url = f"/topics/{topic}" - await wait_for_topics(rest_async_client, topic_names=[first_topic, second_topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) typed_schema = ValidatedTypedSchema.parse( SchemaType.AVRO, @@ -643,9 +916,25 @@ async def test_can_produce_anything_with_no_validation_policy( json={"schema": str(typed_schema)}, ) assert res.status_code == 200 + schema_id = res.json()["id"] + + res = await rest_async_client.post( + url, + json={"value_schema_id": schema_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert not res.ok + assert res.status_code == 422 # with the no_validation strategy we can produce even if we use a totally random subject name - res = await registry_async_client.post(f"/topics/{first_topic}/disable_validation", json={}) + res = await registry_async_client.post(f"/topics/{topic}/disable_validation", json={}) + assert res.ok + + await rest_async_client.post( + url, + json={"value_schema_id": schema_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) assert res.ok diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index ed0186f9f..5e35701dd 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -2,6 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from karapace.client import Client from karapace.kafka_rest_apis.consumer_manager import KNOWN_FORMATS from tests.utils import ( consumer_valid_payload, @@ -22,7 +23,7 @@ @pytest.mark.parametrize("trail", ["", "/"]) -async def test_create_and_delete(rest_async_client, trail): +async def test_create_and_delete(rest_async_client: Client, trail: str): header = REST_HEADERS["json"] group_name = "test_group" resp = await rest_async_client.post(f"/consumers/{group_name}{trail}", json=consumer_valid_payload, headers=header) diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index 52662aeb9..d915c8983 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -23,7 +23,12 @@ @pytest.mark.parametrize("schema_type", ["protobuf"]) @pytest.mark.parametrize("trail", ["", "/"]) -async def test_publish_consume_protobuf(rest_async_client, admin_client, trail, schema_type): +async def test_publish_consume_protobuf( + rest_async_client: Client, + admin_client: KafkaAdminClient, + trail: str, + schema_type: str, +): header = REST_HEADERS[schema_type] group_name = "e2e_protobuf_group" instance_id = await new_consumer(rest_async_client, group_name, fmt=schema_type, trail=trail) @@ -54,7 +59,12 @@ async def test_publish_consume_protobuf(rest_async_client, admin_client, trail, @pytest.mark.parametrize("schema_type", ["protobuf"]) @pytest.mark.parametrize("trail", ["", "/"]) -async def test_publish_consume_protobuf_second(rest_async_client, admin_client, trail, schema_type): +async def test_publish_consume_protobuf_second( + rest_async_client: Client, + admin_client: KafkaAdminClient, + trail: str, + schema_type: str, +): header = REST_HEADERS[schema_type] group_name = "e2e_proto_second" instance_id = await new_consumer(rest_async_client, group_name, fmt=schema_type, trail=trail) diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index 31c06e4bd..c8611ca07 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -2,14 +2,16 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from contextlib import asynccontextmanager, ExitStack from dataclasses import dataclass -from karapace.config import Config, set_config_defaults, write_config +from karapace.config import Config, ConfigDefaults, set_config_defaults, write_config from pathlib import Path from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.utils import new_random_name, popen_karapace_all -from typing import AsyncIterator, List +from typing import AsyncIterator @dataclass(frozen=True) @@ -30,10 +32,11 @@ class RegistryDescription: @asynccontextmanager async def start_schema_registry_cluster( - config_templates: List[Config], + config_templates: list[Config], data_dir: Path, port_range: PortRangeInclusive, -) -> AsyncIterator[List[RegistryDescription]]: + custom_values: ConfigDefaults | None = None, +) -> AsyncIterator[list[RegistryDescription]]: """Start a cluster of schema registries, one process per `config_templates`.""" for template in config_templates: assert "bootstrap_uri" in template, "base_config must have the value `bootstrap_uri` set" @@ -76,6 +79,9 @@ async def start_schema_registry_cluster( log_path = group_dir / f"{pos}.log" error_path = group_dir / f"{pos}.error" + if custom_values is not None: + config.update(custom_values) + config = set_config_defaults(config) write_config(config_path, config) From c5cfd46284d7cf58b47730c29a3a6b86e531d47a Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Wed, 22 Nov 2023 13:39:39 +0100 Subject: [PATCH 3/3] implemented feedback from the review --- karapace/key_format.py | 3 ++ karapace/schema_reader.py | 5 ++- karapace/schema_registry.py | 2 +- karapace/schema_registry_apis.py | 73 +++++++++++++++++++++++--------- 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/karapace/key_format.py b/karapace/key_format.py index e39d4c49e..44d961148 100644 --- a/karapace/key_format.py +++ b/karapace/key_format.py @@ -70,6 +70,9 @@ def format_key( corrected_key["subject"] = key["subject"] if "version" in key: corrected_key["version"] = key["version"] + if "topic" in key: + corrected_key["topic"] = key["topic"] + # Magic is the last element corrected_key["magic"] = key["magic"] return json_encode(corrected_key, binary=True, sort_keys=False, compact=True) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 44afb2b7b..997fdea29 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -429,9 +429,10 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # LOG.info("Deleting subject: %r, value: %r", subject, value) self.database.delete_subject(subject=subject, version=version) - def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument + def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: assert isinstance(value, dict) - topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"]) + topic = TopicName(key["topic"]) + skip_validation = bool(value["skip_validation"]) self.database.override_topic_validation(topic_name=topic, skip_validation=skip_validation) def _handle_msg_schema_hard_delete(self, key: dict) -> None: diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 311baaa2a..4847fd063 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -476,7 +476,7 @@ def update_require_validation_for_topic( skip_validation: bool, ) -> None: key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0} - value = {"skip_validation": skip_validation, "topic": topic_name} + value = {"skip_validation": skip_validation} self.producer.send_message(key=key, value=value) def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 373cf00a9..c171251ab 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -156,11 +156,10 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None: status=HTTPStatus.SERVICE_UNAVAILABLE, ) else: - url = f"{master_url}{request.url.path}" await self._forward_request_remote( request=request, body=request.json, - url=url, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), content_type=request.get_header("Content-Type"), method=request.method, ) @@ -320,7 +319,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=False, json_body=False, - auth=None, + auth=self._auth, ) self.route( "/topics//disable_validation", @@ -329,7 +328,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, - auth=None, + auth=self._auth, ) self.route( "/topics//enable_validation", @@ -338,7 +337,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, - auth=None, + auth=self._auth, ) async def close(self) -> None: @@ -621,8 +620,13 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/config" - await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT") + await self._forward_request_remote( + request=request, + body=body, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="PUT", + ) self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) @@ -692,9 +696,12 @@ async def config_subject_set( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/config/{subject}" await self._forward_request_remote( - request=request, body=request.json, url=url, content_type=content_type, method="PUT" + request=request, + body=request.json, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="PUT", ) self.r({"compatibility": compatibility_level.value}, content_type) @@ -717,9 +724,12 @@ async def config_subject_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/config/{subject}" await self._forward_request_remote( - request=request, body=request.json, url=url, content_type=content_type, method="PUT" + request=request, + body=request.json, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="PUT", ) self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) @@ -791,8 +801,13 @@ async def subject_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}?permanent={permanent}" - await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") + await self._forward_request_remote( + request=request, + body={}, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url) + f"?permanent={permanent}"), + content_type=content_type, + method="DELETE", + ) async def subject_version_get( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None @@ -894,8 +909,13 @@ async def subject_version_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" - await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") + await self._forward_request_remote( + request=request, + body={}, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url) + f"?permanent={permanent}"), + content_type=content_type, + method="DELETE", + ) async def subject_version_schema_get( self, content_type: str, *, subject: str, version: str, user: User | None = None @@ -1279,10 +1299,22 @@ async def subject_post( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}/versions" - await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST") + await self._forward_request_remote( + request=request, + body=body, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="POST", + ) - async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None: + async def is_topic_requiring_validation( + self, + content_type: str, + *, + topic: str, + user: User | None = None, + ) -> None: + self._check_authorization(user, Operation.Read, "Config:") require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)) reply = {"require_validation": require_validation} self.r(reply, content_type) @@ -1294,7 +1326,10 @@ async def set_topic_require_validation( *, topic: str, skip_validation: bool, + user: User | None = None, ) -> None: + self._check_authorization(user, Operation.Write, "Config:") + topic_name = TopicName(topic) already_skipping_validation = skip_validation and not self.schema_registry.database.is_topic_requiring_validation( @@ -1319,7 +1354,7 @@ async def set_topic_require_validation( await self._forward_request_remote( request=request, body=None, - url=compute_forwarded_url(master_url=master_url, request_url=request.url), + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), content_type=content_type, method="POST", )