Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 9, 2023
1 parent 8bdb246 commit 474140a
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 145 deletions.
6 changes: 3 additions & 3 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ def validate_config(config: Config) -> None:
f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}"
) from None

deafault_name_strategy = config["name_strategy"]
name_strategy = config["name_strategy"]
try:
NameStrategy(deafault_name_strategy)
NameStrategy(name_strategy)
except ValueError:
valid_strategies = list(NameStrategy)
raise InvalidConfiguration(
f"Invalid default name strategy: {deafault_name_strategy}, valid values are {valid_strategies}"
f"Invalid default name strategy: {name_strategy}, valid values are {valid_strategies}"
) from None

if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None:
Expand Down
19 changes: 10 additions & 9 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
"""
from __future__ import annotations

from cachetools import TTLCache
from dataclasses import dataclass, field
from karapace.config import NameStrategy
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
from threading import Lock, RLock
from typing import Iterable, Sequence
from typing import Iterable, MutableMapping, Sequence

import logging

Expand All @@ -33,7 +33,9 @@ def __init__(self) -> None:
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.topic_without_validation: set[TopicName] = set()
# this should be a set, but afaik there isn't a TTL set. I'm using this bad modeling
# instead of re-implement the feature in another custom data structure.
self.topic_without_validation: MutableMapping[TopicName, None] = TTLCache(maxsize=10000, ttl=600)

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -216,8 +218,7 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
return list(self.subjects.keys())
with self.schema_lock_thread:
return [
subject for subject in self.subjects if
self.find_subject_schemas(subject=subject, include_deleted=False)
subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False)
]

def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[ResolvedVersion, SchemaVersion]:
Expand All @@ -235,11 +236,11 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool:
return topic_name not in self.topic_without_validation

def override_topic_validation(self, *, topic_name: TopicName, require_validation: bool) -> None:
if require_validation:
self.topic_without_validation.discard(topic_name)
def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None:
if skip_validation:
self.topic_without_validation[topic_name] = None
else:
self.topic_without_validation.add(topic_name)
self.topic_without_validation.pop(topic_name)

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
with self.schema_lock_thread:
Expand Down
17 changes: 2 additions & 15 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from karapace.config import Config, create_client_ssl_context, NameStrategy, SubjectType
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.authentication import (
Expand All @@ -35,14 +35,6 @@
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import SchemaId, Subject, TopicName
from karapace.serialization import (
get_subject_name,
InvalidMessageSchema,
InvalidPayload,
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -788,7 +780,6 @@ async def get_schema_id(
SchemaId(int(data[f"{subject_type}_schema_id"])) if f"{subject_type}_schema_id" in data else None
)
schema_str = data.get(f"{subject_type}_schema")
naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic))

if schema_id is None and schema_str is None:
raise InvalidSchema()
Expand All @@ -814,11 +805,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
need_new_call=subject_not_included,
)

if (
self.config["name_strategy_validation"]
and naming_strategy != NameStrategy.no_validation
and subject_not_included(parsed_schema, valid_subjects)
):
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
raise InvalidSchema()

return schema_id
Expand Down
20 changes: 8 additions & 12 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from avro.schema import Schema as AvroSchema
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
Expand All @@ -21,7 +20,7 @@
TopicAlreadyExistsError,
)
from karapace import constants
from karapace.config import Config, NameStrategy
from karapace.config import Config
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema
from karapace.in_memory_database import InMemoryDatabase
Expand All @@ -32,7 +31,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, StrEnum, Subject, TopicName
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand All @@ -59,7 +58,7 @@
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"


class MessageType(Enum):
class MessageType(StrEnum):
config = "CONFIG"
schema = "SCHEMA"
delete_subject = "DELETE_SUBJECT"
Expand Down Expand Up @@ -440,8 +439,8 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #

def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
assert isinstance(value, dict)
topic, strategy = value["topic"], value["require_validation"]
self.database.override_topic_validation(topic_name=TopicName(topic), require_validation=bool(strategy))
topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"])
self.database.override_topic_validation(topic_name=topic, require_validation=skip_validation)

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]
Expand Down Expand Up @@ -495,8 +494,7 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
elif schema_type_parsed == SchemaType.PROTOBUF:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in
schema_references]
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
parsed_schema = parse_protobuf_schema_definition(
schema_str,
Expand Down Expand Up @@ -552,8 +550,7 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
elif message_type == MessageType.no_operation:
pass
except ValueError:
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value,
key["keytype"])
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])

else:
LOG.error(
Expand All @@ -576,8 +573,7 @@ def get_referenced_by(

def _resolve_and_validate(self, schema: TypedSchema) -> ValidatedTypedSchema:
references, dependencies = (
self.resolve_references(schema.references) if schema.references else (
schema.references, schema.dependencies)
self.resolve_references(schema.references) if schema.references else (schema.references, schema.dependencies)
)
return ValidatedTypedSchema.parse(
schema_type=schema.schema_type,
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def update_require_validation_for_topic(
topic_name: TopicName,
require_validation: bool,
) -> None:
key = {"topic": topic_name, "keytype": MessageType.schema_validation.value, "magic": 0}
key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0}
value = {"skip_validation": require_validation, "topic": topic_name}
self.producer.send_message(key=key, value=value)

Expand Down
36 changes: 15 additions & 21 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@
"""
from __future__ import annotations

from urllib.parse import urlparse
from functools import partial

from avro.errors import SchemaParseException
from contextlib import AsyncExitStack
from enum import Enum, unique
from functools import partial
from http import HTTPStatus
from karapace.auth import HTTPAuthorizer, Operation, User
from karapace.compatibility import check_compatibility, CompatibilityModes
from karapace.compatibility.jsonschema.checks import is_incompatible
from karapace.config import Config, NameStrategy
from karapace.config import Config
from karapace.errors import (
IncompatibleSchema,
InvalidReferences,
Expand All @@ -40,6 +38,7 @@
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName
from karapace.utils import JSONDecodeError
from typing import Any
from urllib.parse import urlparse

import aiohttp
import async_timeout
Expand Down Expand Up @@ -576,8 +575,7 @@ async def schemas_get_versions(

deleted = request.query.get("deleted", "false").lower() == "true"
subject_versions = []
for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int,
include_deleted=deleted):
for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, include_deleted=deleted):
subject = subject_version["subject"]
if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"):
continue
Expand Down Expand Up @@ -618,8 +616,7 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type,
method="PUT")
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT")

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

Expand Down Expand Up @@ -655,8 +652,7 @@ async def config_subject_get(
self.r(
body={
"error_code": SchemaErrorCodes.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_ERROR_CODE.value,
"message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(
subject=subject),
"message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(subject=subject),
},
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
Expand Down Expand Up @@ -790,8 +786,7 @@ async def subject_delete(
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type,
method="DELETE")
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")

async def subject_version_get(
self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None
Expand Down Expand Up @@ -894,8 +889,7 @@ async def subject_version_delete(
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type,
method="DELETE")
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")

async def subject_version_schema_get(
self, content_type: str, *, subject: str, version: str, user: User | None = None
Expand Down Expand Up @@ -1119,8 +1113,7 @@ async def subjects_schema_post(
# When checking if schema is already registered, allow unvalidated schema in as
# there might be stored schemas that are non-compliant from the past.
new_schema = ParsedTypedSchema.parse(
schema_type=schema_type, schema_str=schema_str, references=references,
dependencies=new_schema_dependencies
schema_type=schema_type, schema_str=schema_str, references=references, dependencies=new_schema_dependencies
)
except InvalidSchema:
self.log.exception("No proper parser found")
Expand Down Expand Up @@ -1281,8 +1274,7 @@ async def subject_post(
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type,
method="POST")
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None:
require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)).value
Expand All @@ -1300,9 +1292,11 @@ async def set_topic_require_validation(
topic_name = TopicName(topic)

already_skipping_validation = not require_validation and self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name)
topic_name=topic_name
)
already_validating = require_validation and not self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name)
topic_name=topic_name
)

if already_validating or already_skipping_validation:
empty_response()
Expand All @@ -1321,7 +1315,7 @@ async def set_topic_require_validation(
body=None,
url=compute_forwarded_url(master_url=master_url, request_url=request.url),
content_type=content_type,
method="POST"
method="POST",
)

def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
Expand Down
5 changes: 1 addition & 4 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
from karapace.config import NameStrategy
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences
from karapace.kafka_rest_apis import SubjectType
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.typing import ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.utils import json_decode, json_encode
from typing import Any, Callable, MutableMapping
from urllib.parse import quote
Expand Down Expand Up @@ -106,7 +105,6 @@ def no_validation_strategy(topic_name: str, record_name: str) -> str:
NameStrategy.topic_name: topic_name_strategy,
NameStrategy.record_name: record_name_strategy,
NameStrategy.topic_record_name: topic_record_name_strategy,
NameStrategy.no_validation: no_validation_strategy,
}


Expand Down Expand Up @@ -304,7 +302,6 @@ def __init__(
self.ids_to_schemas: dict[int, TypedSchema] = {}
self.ids_to_subjects: MutableMapping[int, list[Subject]] = TTLCache(maxsize=10000, ttl=600)
self.schemas_to_ids: dict[str, SchemaId] = {}
self._topic_strategy_cache: MutableMapping[TopicName, NameStrategy] = TTLCache(maxsize=10000, ttl=600)

async def close(self) -> None:
if self.registry_client:
Expand Down
Loading

0 comments on commit 474140a

Please sign in to comment.