Skip to content

Commit

Permalink
Changed after review
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Jun 27, 2023
1 parent a25b2c2 commit f2ca7cf
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 168 deletions.
7 changes: 5 additions & 2 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
}

def schema_id_registered_to_subject(self, *, subject: Subject, schema_id: SchemaId) -> bool:
if subject not in self.subjects:
try:
subject_schemas = self.subjects[subject]
except KeyError:
return False
subject_schemas = self.subjects[subject]

for schema in subject_schemas.schemas.values():
if schema.schema_id == schema_id:
return True

return False

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
Expand Down
206 changes: 71 additions & 135 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from abc import ABC, abstractmethod
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaConnectionError
from binascii import Error as B64DecodeError
from cachetools import TTLCache
from collections import namedtuple
from contextlib import AsyncExitStack, closing
from http import HTTPStatus
Expand All @@ -19,19 +17,19 @@
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache
from karapace.karapace import KarapaceBase
from karapace.rapu import HTTPRequest, HTTPResponse, JSON_CONTENT_TYPE
from karapace.schema_models import parse, TypedSchema
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError
from karapace.typing import SchemaId, Subject
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Dict, Final, List, MutableMapping, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Union

import aiohttp.web
import asyncio
import base64
import hashlib
import logging
import time

Expand Down Expand Up @@ -409,98 +407,6 @@ async def topic_publish(self, topic: str, content_type: str, *, request: HTTPReq
await proxy.topic_publish(topic, content_type, request=request)


class SchemaCacheProtocol(ABC):
@abstractmethod
def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]:
pass

@abstractmethod
def has_schema_id(self, schema_id: SchemaId) -> bool:
pass

@abstractmethod
def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None:
pass

@abstractmethod
def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]:
pass

@abstractmethod
def get_schema_str(self, schema_id: SchemaId) -> Optional[str]:
pass


class SchemaCache(SchemaCacheProtocol):
def __init__(self):
self._schema_hash_str_to_id: dict[str, SchemaId] = {}
self._id_to_schema_str: MutableMapping[SchemaId, TypedSchema] = TTLCache(maxsize=10000, ttl=600)

def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]:
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest()

maybe_id = self._schema_hash_str_to_id.get(fingerprint)

if maybe_id is not None:
if maybe_id not in self._id_to_schema_str:
del self._schema_hash_str_to_id[fingerprint]
return None

return maybe_id

def has_schema_id(self, schema_id: SchemaId) -> bool:
return schema_id in self._id_to_schema_str

def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None:
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest()
self._schema_hash_str_to_id[fingerprint] = schema_id
self._id_to_schema_str[schema_id] = schema

def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]:
return self._id_to_schema_str.get(schema_id)

def get_schema_str(self, schema_id: SchemaId) -> Optional[str]:
maybe_schema = self.get_schema(schema_id)
return None if maybe_schema is None else str(maybe_schema)


class EmptySchemaCache(SchemaCacheProtocol):
def get_schema_id(self, schema: TypedSchema) -> None:
return None

def has_schema_id(self, schema_id: SchemaId) -> bool:
return False

def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None:
raise NotImplementedError("Empty schema cache. Cannot set schemas.")

def get_schema(self, schema_id: SchemaId) -> None:
return None

def get_schema_str(self, schema_id: SchemaId) -> None:
return None


class TopicSchemaCache:
def __init__(self):
self._topic_cache: dict[Subject, SchemaCache] = {}
self._empty_schema_cache: Final = EmptySchemaCache()

def get_schema_id(self, topic: Subject, schema: TypedSchema) -> Optional[SchemaId]:
return self._topic_cache.get(topic, self._empty_schema_cache).get_schema_id(schema)

def has_schema_id(self, topic: Subject, schema_id: SchemaId) -> bool:
return self._topic_cache.get(topic, self._empty_schema_cache).has_schema_id(schema_id)

def set_schema(self, topic: str, schema_id: SchemaId, schema: TypedSchema) -> None:
schema_cache = self._topic_cache.setdefault(Subject(topic), SchemaCache())
schema_cache.set_schema(schema_id, schema)

def get_schema_str(self, topic: Subject, schema_id: SchemaId) -> Optional[str]:
schema_cache = self._topic_cache.get(topic, self._empty_schema_cache)
return schema_cache.get_schema_str(schema_id)


class UserRestProxy:
def __init__(self, config: Config, kafka_timeout: int, serializer):
self.config = config
Expand Down Expand Up @@ -837,58 +743,89 @@ def is_valid_schema_request(data: dict, prefix: str) -> bool:
return isinstance(schema, str)

async def get_schema_id(self, data: dict, topic: str, prefix: str, schema_type: SchemaType) -> SchemaId:
"""
This method search and validate the SchemaId for a request, it acts as a guard (In case of something wrong
throws an error).
"""
log.debug("[resolve schema id] Retrieving schema id for %r", data)
# Schema id or schema str if schema id is none.
schema_id: SchemaId | None = SchemaId(data[f"{prefix}_schema_id"]) if f"{prefix}_schema_id" in data else None
schema_str = data.get(f"{prefix}_schema")
parsed_schema = (
None
if schema_str is None
else parse(
schema_type,
schema_str,
validate_avro_enum_symbols=True,
validate_avro_names=True,
)
schema_id: Union[SchemaId, None] = (
SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None
)
schema_str = data.get(f"{prefix}_schema")
parsed_schema = None if schema_str is None else ValidatedTypedSchema.parse(schema_type, schema_str)
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)

subject_name: Subject = Subject(self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type))
if schema_id is None and parsed_schema is None:
raise InvalidSchema()

if schema_id is None:
schema_id = self.topic_schema_cache.get_schema_id(subject_name, parsed_schema)
if schema_id is None:
log.debug("[resolve schema id] Registering / Retrieving ID for %s and schema %s", subject_name, schema_str)
schema_id = await self.serializer.get_id_for_schema(data[f"{prefix}_schema"], subject_name, schema_type)
if schema_id is not None:
log.debug("[resolve schema id] Found schema id %s from registry for subject %s", schema_id, subject_name)
self.topic_schema_cache.set_schema(subject_name, schema_id, parsed_schema)
else:
log.debug(
"[resolve schema id] Schema id not found from registry for subject %s and schema %s",
subject_name,
schema_str,
)
raise InvalidSchema()
else:
log.debug(
"[resolve schema id] schema ID %s found from cache for %s and schema %s",
schema_id,
subject_name,
schema_str,
)
return schema_id
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name)
else:
await self._check_subject_has_schema(schema_id, subject_name, parsed_schema)

return schema_id

async def _check_subject_has_schema(
self, schema_id: SchemaId, subject_name: Subject, parsed_schema: Union[TypedSchema, None]
) -> None:
"""
Checks in the local cache if the subject_name is registered to have the provided schema_id, if not tries
to query the registry (since someone in the meanwhile could have registered the id or the cache could be empty).
If the schema do not belong to the subject_name throws an InvalidSchema error.
"""
if not self.topic_schema_cache.has_schema_id(subject_name, schema_id):
log.debug("[resolve schema id] schema ID %s not found from cache for %s.", schema_id, subject_name)
registered = await self.serializer.get_schema_id_on_subject(subject_name, schema_id)
registered = await self.serializer.has_schema_id_on_subject(subject_name, schema_id)
if registered:
log.debug("[resolve schema id] schema ID %s is registed for %s.", schema_id, subject_name)
schema: TypedSchema = await self.serializer.get_schema_for_id(schema_id)

self.topic_schema_cache.set_schema(subject_name, schema_id, schema)
else:
raise InvalidSchema()
else:
schema = self.topic_schema_cache.get_schema(subject_name, schema_id)
log.debug("[resolve schema id] schema ID %s found from cache directly for %s.", schema_id, subject_name)

if parsed_schema is not None and schema != parsed_schema:
log.debug(
"[resolve schema id] schema ID passed as argument: %s is different from the stored in the cache: %s.",
str(parsed_schema),
str(schema),
)
raise InvalidSchema()

async def _query_schema_id_from_cache_or_registry(
self, parsed_schema: ValidatedTypedSchema, schema_str: str, subject_name: Subject
) -> SchemaId:
"""
Checks if the schema registered with a certain id match with the schema provided (you can provide
a valid id but place in the body a totally unrelated schema).
Also, here if we don't have a match we query the registry since the cache could be evicted in the meanwhile
or the schema could be registered without passing though the http proxy.
"""
schema_id = self.topic_schema_cache.get_schema_id(subject_name, parsed_schema)
if schema_id is None:
log.debug("[resolve schema id] Registering / Retrieving ID for %s and schema %s", subject_name, schema_str)
schema_id = await self.serializer.get_id_for_schema(parsed_schema, subject_name)
if schema_id is not None:
log.debug("[resolve schema id] Found schema id %s from registry for subject %s", schema_id, subject_name)
self.topic_schema_cache.set_schema(subject_name, schema_id, parsed_schema)
else:
log.debug(
"[resolve schema id] Schema id not found from registry for subject %s and schema %s",
subject_name,
schema_str,
)
raise InvalidSchema()
else:
log.debug(
"[resolve schema id] schema ID %s found from cache for %s and schema %s",
schema_id,
subject_name,
schema_str,
)
return schema_id

async def validate_schema_info(self, data: dict, prefix: str, content_type: str, topic: str, schema_type: str):
Expand Down Expand Up @@ -1079,8 +1016,7 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
sub_code=code,
)
try:
schema_type = formats["embedded_format"]
await self.validate_schema_info(data, prefix, content_type, topic, schema_type)
await self.validate_schema_info(data, prefix, content_type, topic, formats["embedded_format"])
except InvalidMessageSchema as e:
KafkaRest.unprocessable_entity(
message=str(e),
Expand Down
107 changes: 107 additions & 0 deletions karapace/kafka_rest_apis/schema_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from abc import ABC, abstractmethod
from cachetools import TTLCache
from karapace.schema_models import TypedSchema
from karapace.typing import SchemaId, Subject
from typing import Dict, Final, MutableMapping, Optional

import hashlib


class SchemaCacheProtocol(ABC):
@abstractmethod
def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]:
pass

@abstractmethod
def has_schema_id(self, schema_id: SchemaId) -> bool:
pass

@abstractmethod
def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None:
pass

@abstractmethod
def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]:
pass

@abstractmethod
def get_schema_str(self, schema_id: SchemaId) -> Optional[str]:
pass


class TopicSchemaCache:
def __init__(self) -> None:
self._topic_cache: Dict[Subject, SchemaCache] = {}
self._empty_schema_cache: Final = EmptySchemaCache()

def get_schema_id(self, topic: Subject, schema: TypedSchema) -> Optional[SchemaId]:
return self._topic_cache.get(topic, self._empty_schema_cache).get_schema_id(schema)

def has_schema_id(self, topic: Subject, schema_id: SchemaId) -> bool:
return self._topic_cache.get(topic, self._empty_schema_cache).has_schema_id(schema_id)

def set_schema(self, topic: str, schema_id: SchemaId, schema: TypedSchema) -> None:
schema_cache_with_defaults = self._topic_cache.setdefault(Subject(topic), SchemaCache())
schema_cache_with_defaults.set_schema(schema_id, schema)

def get_schema(self, topic: Subject, schema_id: SchemaId) -> Optional[TypedSchema]:
schema_cache = self._topic_cache.get(topic, self._empty_schema_cache)
return schema_cache.get_schema(schema_id)

def get_schema_str(self, topic: Subject, schema_id: SchemaId) -> Optional[str]:
schema_cache = self._topic_cache.get(topic, self._empty_schema_cache)
return schema_cache.get_schema_str(schema_id)


class SchemaCache(SchemaCacheProtocol):
def __init__(self) -> None:
self._schema_hash_str_to_id: Dict[str, SchemaId] = {}
self._id_to_schema_str: MutableMapping[SchemaId, TypedSchema] = TTLCache(maxsize=10000, ttl=600)

def get_schema_id(self, schema: TypedSchema) -> Optional[SchemaId]:
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest()

maybe_id = self._schema_hash_str_to_id.get(fingerprint)

if maybe_id is not None and maybe_id not in self._id_to_schema_str:
del self._schema_hash_str_to_id[fingerprint]
return None

return maybe_id

def has_schema_id(self, schema_id: SchemaId) -> bool:
return schema_id in self._id_to_schema_str

def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None:
fingerprint = hashlib.sha1(str(schema).encode("utf8")).hexdigest()
self._schema_hash_str_to_id[fingerprint] = schema_id
self._id_to_schema_str[schema_id] = schema

def get_schema(self, schema_id: SchemaId) -> Optional[TypedSchema]:
return self._id_to_schema_str.get(schema_id)

def get_schema_str(self, schema_id: SchemaId) -> Optional[str]:
maybe_schema = self.get_schema(schema_id)
return None if maybe_schema is None else str(maybe_schema)


class EmptySchemaCache(SchemaCacheProtocol):
def get_schema_id(self, schema: TypedSchema) -> None:
return None

def has_schema_id(self, schema_id: SchemaId) -> bool:
return False

def set_schema(self, schema_id: SchemaId, schema: TypedSchema) -> None:
raise NotImplementedError("Empty schema cache. Cannot set schemas.")

def get_schema(self, schema_id: SchemaId) -> None:
return None

def get_schema_str(self, schema_id: SchemaId) -> None:
return None
Loading

0 comments on commit f2ca7cf

Please sign in to comment.