Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 9, 2023
1 parent 8bdb246 commit d5362f3
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 73 deletions.
6 changes: 3 additions & 3 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ def validate_config(config: Config) -> None:
f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}"
) from None

deafault_name_strategy = config["name_strategy"]
name_strategy = config["name_strategy"]
try:
NameStrategy(deafault_name_strategy)
NameStrategy(name_strategy)
except ValueError:
valid_strategies = list(NameStrategy)
raise InvalidConfiguration(
f"Invalid default name strategy: {deafault_name_strategy}, valid values are {valid_strategies}"
f"Invalid default name strategy: {name_strategy}, valid values are {valid_strategies}"
) from None

if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None:
Expand Down
19 changes: 10 additions & 9 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
"""
from __future__ import annotations

from cachetools import TTLCache
from dataclasses import dataclass, field
from karapace.config import NameStrategy
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
from threading import Lock, RLock
from typing import Iterable, Sequence
from typing import Iterable, MutableMapping, Sequence

import logging

Expand All @@ -33,7 +33,9 @@ def __init__(self) -> None:
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.topic_without_validation: set[TopicName] = set()
# this should be a set, but afaik there isn't a TTL set. I'm using this bad modeling
# instead of re-implement the feature in another custom data structure.
self.topic_without_validation: MutableMapping[TopicName, None] = TTLCache(maxsize=10000, ttl=600)

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -216,8 +218,7 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
return list(self.subjects.keys())
with self.schema_lock_thread:
return [
subject for subject in self.subjects if
self.find_subject_schemas(subject=subject, include_deleted=False)
subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False)
]

def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[ResolvedVersion, SchemaVersion]:
Expand All @@ -235,11 +236,11 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool:
return topic_name not in self.topic_without_validation

def override_topic_validation(self, *, topic_name: TopicName, require_validation: bool) -> None:
if require_validation:
self.topic_without_validation.discard(topic_name)
def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None:
if skip_validation:
self.topic_without_validation[topic_name] = None
else:
self.topic_without_validation.add(topic_name)
self.topic_without_validation.pop(topic_name)

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
with self.schema_lock_thread:
Expand Down
17 changes: 2 additions & 15 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from karapace.config import Config, create_client_ssl_context, NameStrategy, SubjectType
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.authentication import (
Expand All @@ -35,14 +35,6 @@
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import SchemaId, Subject, TopicName
from karapace.serialization import (
get_subject_name,
InvalidMessageSchema,
InvalidPayload,
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -788,7 +780,6 @@ async def get_schema_id(
SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None
)
schema_str = data.get(f"{subject_type}_schema")
naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic))

if schema_id is None and schema_str is None:
raise InvalidSchema()
Expand All @@ -814,11 +805,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
need_new_call=subject_not_included,
)

if (
self.config["name_strategy_validation"]
and naming_strategy != NameStrategy.no_validation
and subject_not_included(parsed_schema, valid_subjects)
):
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
raise InvalidSchema()

return schema_id
Expand Down
15 changes: 6 additions & 9 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
TopicAlreadyExistsError,
)
from karapace import constants
from karapace.config import Config, NameStrategy
from karapace.config import Config
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema
from karapace.in_memory_database import InMemoryDatabase
Expand Down Expand Up @@ -440,8 +440,8 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #

def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
assert isinstance(value, dict)
topic, strategy = value["topic"], value["require_validation"]
self.database.override_topic_validation(topic_name=TopicName(topic), require_validation=bool(strategy))
topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"])
self.database.override_topic_validation(topic_name=topic, require_validation=skip_validation)

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]
Expand Down Expand Up @@ -495,8 +495,7 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in
schema_references]
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
parsed_schema = parse_protobuf_schema_definition(
schema_str,
Expand Down Expand Up @@ -552,8 +551,7 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
elif message_type == MessageType.no_operation:
pass
except ValueError:
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value,
key["keytype"])
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])

else:
LOG.error(
Expand All @@ -576,8 +574,7 @@ def get_referenced_by(

def _resolve_and_validate(self, schema: TypedSchema) -> ValidatedTypedSchema:
references, dependencies = (
self.resolve_references(schema.references) if schema.references else (
schema.references, schema.dependencies)
self.resolve_references(schema.references) if schema.references else (schema.references, schema.dependencies)
)
return ValidatedTypedSchema.parse(
schema_type=schema.schema_type,
Expand Down
36 changes: 15 additions & 21 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@
"""
from __future__ import annotations

from urllib.parse import urlparse
from functools import partial

from avro.errors import SchemaParseException
from contextlib import AsyncExitStack
from enum import Enum, unique
from functools import partial
from http import HTTPStatus
from karapace.auth import HTTPAuthorizer, Operation, User
from karapace.compatibility import check_compatibility, CompatibilityModes
from karapace.compatibility.jsonschema.checks import is_incompatible
from karapace.config import Config, NameStrategy
from karapace.config import Config
from karapace.errors import (
IncompatibleSchema,
InvalidReferences,
Expand All @@ -40,6 +38,7 @@
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName
from karapace.utils import JSONDecodeError
from typing import Any
from urllib.parse import urlparse

import aiohttp
import async_timeout
Expand Down Expand Up @@ -576,8 +575,7 @@ async def schemas_get_versions(

deleted = request.query.get("deleted", "false").lower() == "true"
subject_versions = []
for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int,
include_deleted=deleted):
for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, include_deleted=deleted):
subject = subject_version["subject"]
if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"):
continue
Expand Down Expand Up @@ -618,8 +616,7 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type,
method="PUT")
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT")

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

Expand Down Expand Up @@ -655,8 +652,7 @@ async def config_subject_get(
self.r(
body={
"error_code": SchemaErrorCodes.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_ERROR_CODE.value,
"message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(
subject=subject),
"message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(subject=subject),
},
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
Expand Down Expand Up @@ -790,8 +786,7 @@ async def subject_delete(
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type,
method="DELETE")
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")

async def subject_version_get(
self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None
Expand Down Expand Up @@ -894,8 +889,7 @@ async def subject_version_delete(
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type,
method="DELETE")
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")

async def subject_version_schema_get(
self, content_type: str, *, subject: str, version: str, user: User | None = None
Expand Down Expand Up @@ -1119,8 +1113,7 @@ async def subjects_schema_post(
# When checking if schema is already registered, allow unvalidated schema in as
# there might be stored schemas that are non-compliant from the past.
new_schema = ParsedTypedSchema.parse(
schema_type=schema_type, schema_str=schema_str, references=references,
dependencies=new_schema_dependencies
schema_type=schema_type, schema_str=schema_str, references=references, dependencies=new_schema_dependencies
)
except InvalidSchema:
self.log.exception("No proper parser found")
Expand Down Expand Up @@ -1281,8 +1274,7 @@ async def subject_post(
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type,
method="POST")
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None:
require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)).value
Expand All @@ -1300,9 +1292,11 @@ async def set_topic_require_validation(
topic_name = TopicName(topic)

already_skipping_validation = not require_validation and self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name)
topic_name=topic_name
)
already_validating = require_validation and not self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name)
topic_name=topic_name
)

if already_validating or already_skipping_validation:
empty_response()
Expand All @@ -1321,7 +1315,7 @@ async def set_topic_require_validation(
body=None,
url=compute_forwarded_url(master_url=master_url, request_url=request.url),
content_type=content_type,
method="POST"
method="POST",
)

def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
Expand Down
4 changes: 1 addition & 3 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.utils import json_decode, json_encode
from typing import Any, Callable, MutableMapping
from urllib.parse import quote
Expand Down Expand Up @@ -106,7 +106,6 @@ def no_validation_strategy(topic_name: str, record_name: str) -> str:
NameStrategy.topic_name: topic_name_strategy,
NameStrategy.record_name: record_name_strategy,
NameStrategy.topic_record_name: topic_record_name_strategy,
NameStrategy.no_validation: no_validation_strategy,
}


Expand Down Expand Up @@ -304,7 +303,6 @@ def __init__(
self.ids_to_schemas: dict[int, TypedSchema] = {}
self.ids_to_subjects: MutableMapping[int, list[Subject]] = TTLCache(maxsize=10000, ttl=600)
self.schemas_to_ids: dict[str, SchemaId] = {}
self._topic_strategy_cache: MutableMapping[TopicName, NameStrategy] = TTLCache(maxsize=10000, ttl=600)

async def close(self) -> None:
if self.registry_client:
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from kafka.errors import UnknownTopicOrPartitionError
from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient, SUBJECT_VALID_POSTFIX
from karapace.schema_models import ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.serialization import get_subject_name
from karapace.typing import NameStrategy, SubjectType
from karapace.version import __version__
from pytest import raises
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
Expand Down Expand Up @@ -706,7 +710,7 @@ async def test_produce_subjects_with_different_name_strategies(
assert res.status_code == 422

# registering the required subject
subject_to_create = get_subject_name(topic_name, typed_schema, SubjectType.value, strategy)
subject_to_create = get_subject_name(topic_name, typed_schema, SubjectType, strategy)

res = await registry_async_client.post(
f"subjects/{subject_to_create}/versions",
Expand Down Expand Up @@ -749,7 +753,7 @@ async def test_can_produce_anything_with_no_validation_policy(
),
)

res = await registry_async_client.post(f"/topic/{topic_name}/name_strategy/{NameStrategy.no_validation}", json={})
res = await registry_async_client.post(f"/topic/{topic_name}/name_strategy/{NameStrategy.topic_name}", json={})
assert res.ok

# with the no_validation strategy we can produce even if we use a totally random subject name
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from http import HTTPStatus
from kafka import KafkaProducer
from karapace.client import Client
from karapace.config import ConfigDefaults, NameStrategy
from karapace.rapu import is_success
from karapace.schema_registry_apis import SchemaErrorMessages
from karapace.utils import json_encode
Expand All @@ -19,7 +18,7 @@
create_subject_name_factory,
repeat_until_successful_request,
)
from typing import AsyncIterator, Callable, List, Tuple
from typing import List, Tuple

import asyncio
import json
Expand Down Expand Up @@ -1079,6 +1078,7 @@ async def assert_schema_versions_failed(client: Client, trail: str, schema_id: i
res = await client.get(f"/schemas/ids/{schema_id}/versions{trail}")
assert res.status_code == response_code


async def register_schema(registry_async_client: Client, trail, subject: str, schema_str: str) -> Tuple[int, int]:
# Register to get the id
res = await registry_async_client.post(
Expand Down
16 changes: 9 additions & 7 deletions tests/unit/test_schema_registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aiohttp.test_utils import TestClient, TestServer
from karapace.config import DEFAULTS, set_config_defaults
from karapace.rapu import HTTPResponse
from karapace.schema_registry_apis import KarapaceSchemaRegistryController, compute_forwarded_url
from karapace.schema_registry_apis import compute_forwarded_url, KarapaceSchemaRegistryController
from unittest.mock import ANY, Mock, patch, PropertyMock

import asyncio
Expand Down Expand Up @@ -50,13 +50,15 @@ async def test_forward_when_not_ready():
ready_property_mock.assert_called_once()
schema_registry.get_master.assert_called_once()
mock_forward_func.assert_called_once_with(
request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json",
method="GET"
request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET"
)


def test_compute_forwarded_url() -> None:
assert compute_forwarded_url(
master_url="http://localhost:8081/another/fancy/path",
request_url="https://docs.python.org/3/library/urllib.parse.html",
) == "http://localhost:8081/3/library/urllib.parse.html"
assert (
compute_forwarded_url(
master_url="http://localhost:8081/another/fancy/path",
request_url="https://docs.python.org/3/library/urllib.parse.html",
)
== "http://localhost:8081/3/library/urllib.parse.html"
)
Loading

0 comments on commit d5362f3

Please sign in to comment.