diff --git a/karapace/config.py b/karapace/config.py index fd7bd18fa..9212f6348 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -256,13 +256,13 @@ def validate_config(config: Config) -> None: f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}" ) from None - deafault_name_strategy = config["name_strategy"] + name_strategy = config["name_strategy"] try: - NameStrategy(deafault_name_strategy) + NameStrategy(name_strategy) except ValueError: valid_strategies = list(NameStrategy) raise InvalidConfiguration( - f"Invalid default name strategy: {deafault_name_strategy}, valid values are {valid_strategies}" + f"Invalid default name strategy: {name_strategy}, valid values are {valid_strategies}" ) from None if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None: diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index d13783f49..de50016ff 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -6,13 +6,13 @@ """ from __future__ import annotations +from cachetools import TTLCache from dataclasses import dataclass, field -from karapace.config import NameStrategy from karapace.schema_models import SchemaVersion, TypedSchema from karapace.schema_references import Reference, Referents from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName from threading import Lock, RLock -from typing import Iterable, Sequence +from typing import Iterable, MutableMapping, Sequence import logging @@ -33,7 +33,9 @@ def __init__(self) -> None: self.schemas: dict[SchemaId, TypedSchema] = {} self.schema_lock_thread = RLock() self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {} - self.topic_without_validation: set[TopicName] = set() + # this should be a set, but afaik there isn't a TTL set. I'm using this bad modeling + # instead of re-implement the feature in another custom data structure. + self.topic_without_validation: MutableMapping[TopicName, None] = TTLCache(maxsize=10000, ttl=600) # Content based deduplication of schemas. This is used to reduce memory # usage when the same schema is produce multiple times to the same or @@ -216,8 +218,7 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]: return list(self.subjects.keys()) with self.schema_lock_thread: return [ - subject for subject in self.subjects if - self.find_subject_schemas(subject=subject, include_deleted=False) + subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False) ] def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[ResolvedVersion, SchemaVersion]: @@ -235,11 +236,11 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool: return topic_name not in self.topic_without_validation - def override_topic_validation(self, *, topic_name: TopicName, require_validation: bool) -> None: - if require_validation: - self.topic_without_validation.discard(topic_name) + def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None: + if skip_validation: + self.topic_without_validation[topic_name] = None else: - self.topic_without_validation.add(topic_name) + self.topic_without_validation.pop(topic_name) def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None: with self.schema_lock_thread: diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 88b109bf3..b47dabad2 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -13,7 +13,7 @@ TopicAuthorizationFailedError, UnknownTopicOrPartitionError, ) -from karapace.config import Config, create_client_ssl_context, NameStrategy, SubjectType +from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema from karapace.kafka_rest_apis.admin import KafkaRestAdminClient from karapace.kafka_rest_apis.authentication import ( @@ -35,14 +35,6 @@ SchemaRegistrySerializer, SchemaRetrievalError, ) -from karapace.typing import SchemaId, Subject, TopicName -from karapace.serialization import ( - get_subject_name, - InvalidMessageSchema, - InvalidPayload, - SchemaRegistrySerializer, - SchemaRetrievalError, -) from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient from typing import Callable, Dict, List, Optional, Tuple, Union @@ -788,7 +780,6 @@ async def get_schema_id( SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None ) schema_str = data.get(f"{subject_type}_schema") - naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic)) if schema_id is None and schema_str is None: raise InvalidSchema() @@ -814,11 +805,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool: need_new_call=subject_not_included, ) - if ( - self.config["name_strategy_validation"] - and naming_strategy != NameStrategy.no_validation - and subject_not_included(parsed_schema, valid_subjects) - ): + if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects): raise InvalidSchema() return schema_id diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 222932292..24e8b43d4 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -21,7 +21,7 @@ TopicAlreadyExistsError, ) from karapace import constants -from karapace.config import Config, NameStrategy +from karapace.config import Config from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase @@ -440,8 +440,8 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument assert isinstance(value, dict) - topic, strategy = value["topic"], value["require_validation"] - self.database.override_topic_validation(topic_name=TopicName(topic), require_validation=bool(strategy)) + topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"]) + self.database.override_topic_validation(topic_name=topic, require_validation=skip_validation) def _handle_msg_schema_hard_delete(self, key: dict) -> None: subject, version = key["subject"], key["version"] @@ -495,8 +495,7 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: elif schema_type_parsed == SchemaType.PROTOBUF: try: if schema_references: - candidate_references = [reference_from_mapping(reference_data) for reference_data in - schema_references] + candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references] resolved_references, resolved_dependencies = self.resolve_references(candidate_references) parsed_schema = parse_protobuf_schema_definition( schema_str, @@ -552,8 +551,7 @@ def handle_msg(self, key: dict, value: dict | None) -> None: elif message_type == MessageType.no_operation: pass except ValueError: - LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, - key["keytype"]) + LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"]) else: LOG.error( @@ -576,8 +574,7 @@ def get_referenced_by( def _resolve_and_validate(self, schema: TypedSchema) -> ValidatedTypedSchema: references, dependencies = ( - self.resolve_references(schema.references) if schema.references else ( - schema.references, schema.dependencies) + self.resolve_references(schema.references) if schema.references else (schema.references, schema.dependencies) ) return ValidatedTypedSchema.parse( schema_type=schema.schema_type, diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 7deb770a9..96d279d0a 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -4,17 +4,15 @@ """ from __future__ import annotations -from urllib.parse import urlparse -from functools import partial - from avro.errors import SchemaParseException from contextlib import AsyncExitStack from enum import Enum, unique +from functools import partial from http import HTTPStatus from karapace.auth import HTTPAuthorizer, Operation, User from karapace.compatibility import check_compatibility, CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible -from karapace.config import Config, NameStrategy +from karapace.config import Config from karapace.errors import ( IncompatibleSchema, InvalidReferences, @@ -40,6 +38,7 @@ from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName from karapace.utils import JSONDecodeError from typing import Any +from urllib.parse import urlparse import aiohttp import async_timeout @@ -576,8 +575,7 @@ async def schemas_get_versions( deleted = request.query.get("deleted", "false").lower() == "true" subject_versions = [] - for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, - include_deleted=deleted): + for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, include_deleted=deleted): subject = subject_version["subject"] if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"): continue @@ -618,8 +616,7 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use self.no_master_error(content_type) else: url = f"{master_url}/config" - await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, - method="PUT") + await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT") self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) @@ -655,8 +652,7 @@ async def config_subject_get( self.r( body={ "error_code": SchemaErrorCodes.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_ERROR_CODE.value, - "message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format( - subject=subject), + "message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(subject=subject), }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -790,8 +786,7 @@ async def subject_delete( self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" - await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, - method="DELETE") + await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") async def subject_version_get( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None @@ -894,8 +889,7 @@ async def subject_version_delete( self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" - await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, - method="DELETE") + await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") async def subject_version_schema_get( self, content_type: str, *, subject: str, version: str, user: User | None = None @@ -1119,8 +1113,7 @@ async def subjects_schema_post( # When checking if schema is already registered, allow unvalidated schema in as # there might be stored schemas that are non-compliant from the past. new_schema = ParsedTypedSchema.parse( - schema_type=schema_type, schema_str=schema_str, references=references, - dependencies=new_schema_dependencies + schema_type=schema_type, schema_str=schema_str, references=references, dependencies=new_schema_dependencies ) except InvalidSchema: self.log.exception("No proper parser found") @@ -1281,8 +1274,7 @@ async def subject_post( self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" - await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, - method="POST") + await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST") async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None: require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)).value @@ -1300,9 +1292,11 @@ async def set_topic_require_validation( topic_name = TopicName(topic) already_skipping_validation = not require_validation and self.schema_registry.database.is_topic_requiring_validation( - topic_name=topic_name) + topic_name=topic_name + ) already_validating = require_validation and not self.schema_registry.database.is_topic_requiring_validation( - topic_name=topic_name) + topic_name=topic_name + ) if already_validating or already_skipping_validation: empty_response() @@ -1321,7 +1315,7 @@ async def set_topic_require_validation( body=None, url=compute_forwarded_url(master_url=master_url, request_url=request.url), content_type=content_type, - method="POST" + method="POST", ) def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None: diff --git a/karapace/serialization.py b/karapace/serialization.py index d4a1a77dd..badecd13c 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -20,7 +20,7 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType +from karapace.typing import ResolvedVersion, SchemaId, Subject from karapace.utils import json_decode, json_encode from typing import Any, Callable, MutableMapping from urllib.parse import quote @@ -106,7 +106,6 @@ def no_validation_strategy(topic_name: str, record_name: str) -> str: NameStrategy.topic_name: topic_name_strategy, NameStrategy.record_name: record_name_strategy, NameStrategy.topic_record_name: topic_record_name_strategy, - NameStrategy.no_validation: no_validation_strategy, } @@ -304,7 +303,6 @@ def __init__( self.ids_to_schemas: dict[int, TypedSchema] = {} self.ids_to_subjects: MutableMapping[int, list[Subject]] = TTLCache(maxsize=10000, ttl=600) self.schemas_to_ids: dict[str, SchemaId] = {} - self._topic_strategy_cache: MutableMapping[TopicName, NameStrategy] = TTLCache(maxsize=10000, ttl=600) async def close(self) -> None: if self.registry_client: diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index 5765d1ca9..cf77d4623 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -8,6 +8,10 @@ from kafka.errors import UnknownTopicOrPartitionError from karapace.client import Client from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient, SUBJECT_VALID_POSTFIX +from karapace.schema_models import ValidatedTypedSchema +from karapace.schema_type import SchemaType +from karapace.serialization import get_subject_name +from karapace.typing import NameStrategy, SubjectType from karapace.version import __version__ from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES @@ -706,7 +710,7 @@ async def test_produce_subjects_with_different_name_strategies( assert res.status_code == 422 # registering the required subject - subject_to_create = get_subject_name(topic_name, typed_schema, SubjectType.value, strategy) + subject_to_create = get_subject_name(topic_name, typed_schema, SubjectType, strategy) res = await registry_async_client.post( f"subjects/{subject_to_create}/versions", @@ -749,7 +753,7 @@ async def test_can_produce_anything_with_no_validation_policy( ), ) - res = await registry_async_client.post(f"/topic/{topic_name}/name_strategy/{NameStrategy.no_validation}", json={}) + res = await registry_async_client.post(f"/topic/{topic_name}/name_strategy/{NameStrategy.topic_name}", json={}) assert res.ok # with the no_validation strategy we can produce even if we use a totally random subject name diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 59a8c6f63..4e325a2a0 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -7,7 +7,6 @@ from http import HTTPStatus from kafka import KafkaProducer from karapace.client import Client -from karapace.config import ConfigDefaults, NameStrategy from karapace.rapu import is_success from karapace.schema_registry_apis import SchemaErrorMessages from karapace.utils import json_encode @@ -19,7 +18,7 @@ create_subject_name_factory, repeat_until_successful_request, ) -from typing import AsyncIterator, Callable, List, Tuple +from typing import List, Tuple import asyncio import json @@ -1079,6 +1078,7 @@ async def assert_schema_versions_failed(client: Client, trail: str, schema_id: i res = await client.get(f"/schemas/ids/{schema_id}/versions{trail}") assert res.status_code == response_code + async def register_schema(registry_async_client: Client, trail, subject: str, schema_str: str) -> Tuple[int, int]: # Register to get the id res = await registry_async_client.post( diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index d60c3f77e..bc6019951 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -5,7 +5,7 @@ from aiohttp.test_utils import TestClient, TestServer from karapace.config import DEFAULTS, set_config_defaults from karapace.rapu import HTTPResponse -from karapace.schema_registry_apis import KarapaceSchemaRegistryController, compute_forwarded_url +from karapace.schema_registry_apis import compute_forwarded_url, KarapaceSchemaRegistryController from unittest.mock import ANY, Mock, patch, PropertyMock import asyncio @@ -50,13 +50,15 @@ async def test_forward_when_not_ready(): ready_property_mock.assert_called_once() schema_registry.get_master.assert_called_once() mock_forward_func.assert_called_once_with( - request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", - method="GET" + request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET" ) def test_compute_forwarded_url() -> None: - assert compute_forwarded_url( - master_url="http://localhost:8081/another/fancy/path", - request_url="https://docs.python.org/3/library/urllib.parse.html", - ) == "http://localhost:8081/3/library/urllib.parse.html" + assert ( + compute_forwarded_url( + master_url="http://localhost:8081/another/fancy/path", + request_url="https://docs.python.org/3/library/urllib.parse.html", + ) + == "http://localhost:8081/3/library/urllib.parse.html" + ) diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index 435a0aa78..2e09fec82 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -3,8 +3,7 @@ See LICENSE for details """ from karapace.client import Path -from karapace.config import DEFAULTS, NameStrategy, read_config, SubjectType -from karapace.kafka_rest_apis import SUBJECT_VALID_POSTFIX +from karapace.config import DEFAULTS, read_config from karapace.schema_models import SchemaType, ValidatedTypedSchema from karapace.serialization import ( flatten_unions,